summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/backends/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/dmf/mr/backends/kafka')
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java403
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java126
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt386
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java749
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java161
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java235
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/LiveLockAvoidance.java45
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java100
8 files changed, 2205 insertions, 0 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java
new file mode 100644
index 0000000..f7f5ba7
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java
@@ -0,0 +1,403 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.kafka;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+
+import com.att.dmf.mr.backends.Consumer;
+import com.att.dmf.mr.constants.CambriaConstants;
+
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+/**
+ * A consumer instance that's created per-request. These are stateless so that
+ * clients can connect to this service as a proxy.
+ *
+ * @author Ram
+ *
+ */
+public class Kafka011Consumer implements Consumer {
+ private enum State {
+ OPENED, CLOSED
+ }
+
+ // @Autowired
+ // KafkaLiveLockAvoider kafkaLiveLockAvoider;
+ /**
+ * KafkaConsumer() is constructor. It has following 4 parameters:-
+ *
+ * @param topic
+ * @param group
+ * @param id
+ * @param cc
+ *
+ */
+
+ public Kafka011Consumer(String topic, String group, String id, KafkaConsumer<String, String> cc,
+ KafkaLiveLockAvoider2 klla) throws Exception {
+ fTopic = topic;
+ fGroup = group;
+ fId = id;
+ fCreateTimeMs = System.currentTimeMillis();
+ fLastTouch = fCreateTimeMs;
+ fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String, String>>();
+ fLogTag = fGroup + "(" + fId + ")/" + fTopic;
+ offset = 0;
+ state = Kafka011Consumer.State.OPENED;
+ kConsumer = cc;
+ fKafkaLiveLockAvoider = klla;
+ synchronized (kConsumer) {
+ kConsumer.subscribe(Arrays.asList(topic));
+ }
+ }
+
+ private Consumer.Message makeMessage(final ConsumerRecord<String, String> msg) {
+ return new Consumer.Message() {
+ @Override
+ public long getOffset() {
+ offset = msg.offset();
+ return offset;
+ }
+
+ @Override
+ public String getMessage() {
+ return new String(msg.value());
+ }
+ };
+ }
+
+ @Override
+ public synchronized Consumer.Message nextMessage() {
+
+ try {
+ if (fPendingMsgs.size() > 0) {
+ return makeMessage(fPendingMsgs.take());
+ }
+ } catch (InterruptedException x) {
+ log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
+ x);
+ }
+
+ Callable<Boolean> run = new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ try {
+ ConsumerRecords<String, String> records;
+ synchronized (kConsumer) {
+ records = kConsumer.poll(500);
+ }
+ for (ConsumerRecord<String, String> record : records) {
+ // foundMsgs = true;
+ fPendingMsgs.offer(record);
+ }
+
+ } catch (KafkaException x) {
+ log.debug(fLogTag + ": KafkaException " + x.getMessage());
+
+ } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
+ log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. "
+ + x.getMessage());
+
+ }
+
+ // return null;
+ return true;
+ }
+ };
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ RunnableFuture future = new FutureTask(run);
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ service.execute(future);
+ try {
+ future.get(5, TimeUnit.SECONDS); // wait 1
+ // second
+ } catch (TimeoutException ex) {
+ // timed out. Try to stop the code if possible.
+ String apiNodeId = null;
+ try {
+ apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
+ } catch (UnknownHostException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+
+ try {
+ if (fKafkaLiveLockAvoider != null)
+ fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
+ } catch (Exception e) {
+ log.error("unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup);
+ }
+
+ forcePollOnConsumer();
+ future.cancel(true);
+ } catch (Exception ex) {
+ // timed out. Try to stop the code if possible.
+ future.cancel(true);
+ }
+ service.shutdown();
+
+ return null;
+
+ }
+
+ /**
+ * getName() method returns string type value. returns 3 parameters in
+ * string:- fTopic,fGroup,fId
+ *
+ * @Override
+ */
+ public String getName() {
+ return fTopic + " : " + fGroup + " : " + fId;
+ }
+
+ /**
+ * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
+ * variable value
+ *
+ * @Override
+ *
+ */
+ public long getCreateTimeMs() {
+ return fCreateTimeMs;
+ }
+
+ public org.apache.kafka.clients.consumer.KafkaConsumer<String, String> getConsumer() {
+ return kConsumer;
+ }
+
+ /**
+ * getLastAccessMs() method returns long type value. returns fLastTouch
+ * variable value
+ *
+ * @Override
+ *
+ */
+ public long getLastAccessMs() {
+ return fLastTouch;
+ }
+
+ /**
+ * getOffset() method returns long type value. returns offset variable value
+ *
+ * @Override
+ *
+ */
+ public long getOffset() {
+ return offset;
+ }
+
+ /**
+ * commit offsets commitOffsets() method will be called on closed of
+ * KafkaConsumer.
+ *
+ * @Override
+ *
+ *
+ * public void commitOffsets() { if (getState() ==
+ * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
+ * on closed KafkaConsumer " + getName()); return; }
+ * fConnector.commitOffsets(); }
+ */
+
+ /**
+ * updating fLastTouch with current time in ms
+ */
+ public void touch() {
+ fLastTouch = System.currentTimeMillis();
+ }
+
+ /**
+ * getLastTouch() method returns long type value. returns fLastTouch
+ * variable value
+ *
+ */
+ public long getLastTouch() {
+ return fLastTouch;
+ }
+
+ /**
+ * setting the kafkaConsumer state to closed
+ */
+ // public synchronized boolean close() {
+ public boolean close() {
+ if (getState() == Kafka011Consumer.State.CLOSED) {
+
+ log.error("close() called on closed KafkaConsumer " + getName());
+ return true;
+ }
+
+ // fConnector.shutdown();
+ boolean retVal = kafkaConnectorshuttask();
+ return retVal;
+
+ }
+
+ /* time out if the kafka shutdown fails for some reason */
+
+ private boolean kafkaConnectorshuttask() {
+ Callable<Boolean> run = new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+
+ try {
+ // System.out.println("attempt to delete " + kConsumer);
+ kConsumer.close();
+
+ } catch (Exception e) {
+ log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
+ throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
+ // return false;
+ }
+ log.info("Kafka connection closure with in 15 seconds by a Executors task");
+
+ return true;
+ }
+ };
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ RunnableFuture future = new FutureTask(run);
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ service.execute(future);
+ try {
+ future.get(200, TimeUnit.SECONDS); // wait 1
+ // second
+ } catch (TimeoutException ex) {
+ // timed out. Try to stop the code if possible.
+ log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task");
+ future.cancel(true);
+ setState(Kafka011Consumer.State.OPENED);
+ } catch (Exception ex) {
+ // timed out. Try to stop the code if possible.
+ log.error("Exception occured Occured - Kafka connection closure with in 300 seconds by a Executors task"
+ + ex);
+ future.cancel(true);
+ setState(Kafka011Consumer.State.OPENED);
+ return false;
+ }
+ service.shutdown();
+ setState(Kafka011Consumer.State.CLOSED);
+ return true;
+ }
+
+ public void forcePollOnConsumer() {
+ Kafka011ConsumerUtil.forcePollOnConsumer(fTopic, fGroup, fId);
+
+ }
+
+ /**
+ * getConsumerGroup() returns Consumer group
+ *
+ * @return
+ */
+ public String getConsumerGroup() {
+ return fGroup;
+ }
+
+ /**
+ * getConsumerId returns Consumer Id
+ *
+ * @return
+ */
+ public String getConsumerId() {
+ return fId;
+ }
+
+ /**
+ * getState returns kafkaconsumer state
+ *
+ * @return
+ */
+ private Kafka011Consumer.State getState() {
+ return this.state;
+ }
+
+ /**
+ * setState() sets the kafkaConsumer state
+ *
+ * @param state
+ */
+ private void setState(Kafka011Consumer.State state) {
+ this.state = state;
+ }
+
+ // private ConsumerConnector fConnector;
+ private final String fTopic;
+ private final String fGroup;
+ private final String fId;
+ private final String fLogTag;
+ // private final KafkaStream<byte[], byte[]> fStream;
+ private KafkaConsumer<String, String> kConsumer;
+ private long fCreateTimeMs;
+ private long fLastTouch;
+ private long offset;
+ private Kafka011Consumer.State state;
+ private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
+ private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;
+ //private ArrayList<Kafka011Consumer> fconsumerList;
+ @Override
+ public void commitOffsets() {
+ if (getState() == Kafka011Consumer.State.CLOSED) {
+ log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
+ return;
+ }
+ kConsumer.commitSync();
+ // fConsumer.close();
+
+ }
+
+ @Override
+ public void setOffset(long offsetval) {
+ offset = offsetval;
+ }
+
+
+ public void setConsumerCache(KafkaConsumerCache cache) {
+ }
+
+ //@Override
+ //public Message nextMessage(ArrayList<?> l) {
+ // TODO Auto-generated method stub
+ //return null;
+ //}
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java
new file mode 100644
index 0000000..ea9407b
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java
@@ -0,0 +1,126 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.kafka;
+
+import java.util.ArrayList;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+/**
+ * A consumer Util class for force polling when a rebalance issue is anticipated
+ *
+ * @author Ram
+ *
+ */
+public class Kafka011ConsumerUtil {
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011ConsumerUtil.class);
+
+ /**
+ * @param fconsumercache
+ * @param fTopic
+ * @param fGroup
+ * @param fId
+ * @return
+ */
+ public static boolean forcePollOnConsumer(final String fTopic, final String fGroup, final String fId) {
+
+ Thread forcepollThread = new Thread(new Runnable() {
+ public void run() {
+ try {
+
+ ArrayList<Kafka011Consumer> kcsList = null;
+
+ kcsList = KafkaConsumerCache.getInstance().getConsumerListForCG(fTopic + "::" + fGroup + "::", fId);
+ if (null != kcsList) {
+ for (int counter = 0; counter < kcsList.size(); counter++) {
+
+ Kafka011Consumer kc1 = kcsList.get(counter);
+
+ try {
+ ConsumerRecords<String, String> recs = kc1.getConsumer().poll(0);
+ log.info("soft poll on " + kc1);
+ } catch (java.util.ConcurrentModificationException e) {
+ log.error("Error occurs for " + e);
+ }
+
+ }
+
+ }
+
+ } catch (Exception e) {
+ log.error("Failed and go to Exception block for " + fGroup + " " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+ });
+
+ forcepollThread.start();
+
+ return false;
+
+ }
+
+ /**
+ * @param fconsumercache
+ * @param group
+ * @return
+ */
+ public static boolean forcePollOnConsumer(final String group) {
+
+ Thread forcepollThread = new Thread(new Runnable() {
+ public void run() {
+ try {
+ ArrayList<Kafka011Consumer> kcsList = new ArrayList<Kafka011Consumer>();
+ kcsList = KafkaConsumerCache.getInstance().getConsumerListForCG(group);
+
+ if (null != kcsList) {
+
+ for (int counter = 0; counter < kcsList.size(); counter++) {
+
+ Kafka011Consumer kc1 = kcsList.get(counter);
+ log.info("soft poll on remote nodes " + kc1);
+ ConsumerRecords<String, String> recs = kc1.getConsumer().poll(0);
+ }
+
+ }
+
+ } catch (java.util.ConcurrentModificationException e) {
+ log.error("Error occurs for " + e);
+ } catch (Exception e) {
+ log.error("Failed and go to Exception block for " + group + " " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+ });
+
+ forcepollThread.start();
+ return false;
+
+ }
+
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt
new file mode 100644
index 0000000..dd6259f
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt
@@ -0,0 +1,386 @@
+package com.att.dmf.mr.backends.kafka;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.KafkaException;
+
+import com.att.dmf.mr.backends.Consumer;
+
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+/**
+ * A consumer instance that's created per-request. These are stateless so that
+ * clients can connect to this service as a proxy.
+ *
+ * @author peter
+ *
+ */
+public class KafkaConsumer implements Consumer {
+ private enum State {
+ OPENED, CLOSED
+ }
+
+ /**
+ * KafkaConsumer() is constructor. It has following 4 parameters:-
+ *
+ * @param topic
+ * @param group
+ * @param id
+ * @param cc
+ *
+ */
+
+ public KafkaConsumer(String topic, String group, String id, Properties prop) throws Exception {
+ fTopic = topic;
+ fGroup = group;
+ fId = id;
+ // fConnector = cc;
+
+ fCreateTimeMs = System.currentTimeMillis();
+ fLastTouch = fCreateTimeMs;
+ fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String,String>> ();
+ fLogTag = fGroup + "(" + fId + ")/" + fTopic;
+ offset = 0;
+
+ state = KafkaConsumer.State.OPENED;
+
+ // final Map<String, Integer> topicCountMap = new HashMap<String,
+ // Integer>();
+ // topicCountMap.put(fTopic, 1);
+ // log.info(fLogTag +" kafka Consumer started at "
+ // +System.currentTimeMillis());
+ // final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
+ // fConnector.createMessageStreams(topicCountMap);
+ // final List<KafkaStream<byte[], byte[]>> streams =
+ // consumerMap.get(fTopic);
+
+ kConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(prop);
+ // System.out.println("I am in Consumer APP " + topic + "-- " +
+ // fConsumer);
+ kConsumer.subscribe(Arrays.asList(topic));
+ log.info(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
+ System.out.println("-----id " +id);
+
+
+ try { ConsumerRecords<String, String> records =
+ kConsumer.poll(500); System.out.println("---" +
+ records.count());
+
+ for (ConsumerRecord<String, String> record : records) {
+ System.out.printf("offset = %d, key = %s, value = %s",
+ record.offset(), record.key(), record.value()); String t =
+ record.value();
+
+ }
+ }catch(Exception e){
+ System.out.println( e);
+ }
+ System.out.println(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
+ kConsumer.commitSync();
+ // fConsumer.close();
+
+
+ /*
+ * ConsumerRecords<String, String> records = fConsumer.poll(500);
+ * System.out.println("---" + records.count());
+ *
+ * for (ConsumerRecord<String, String> record : records) {
+ * System.out.printf("offset = %d, key = %s, value = %s",
+ * record.offset(), record.key(), record.value()); String t =
+ * record.value();
+ *
+ * }
+ *
+ *
+ * fConsumer.commitSync(); fConsumer.close();
+ */
+
+ // fStream = streams.iterator().next();
+ }
+
+
+
+ private Consumer.Message makeMessage ( final ConsumerRecord<String,String> msg )
+ {
+ return new Consumer.Message()
+ {
+ @Override
+ public long getOffset ()
+ {
+ return msg.offset ();
+ }
+
+ @Override
+ public String getMessage ()
+ {
+ return new String ( msg.value () );
+ }
+ };
+ }
+
+ @Override
+ public synchronized Consumer.Message nextMessage ()
+ {
+
+ try
+ {
+ if ( fPendingMsgs.size () > 0 )
+ {
+ return makeMessage ( fPendingMsgs.take () );
+ }
+ }
+ catch ( InterruptedException x )
+ {
+ log.warn ( "After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage () + ")", x );
+ }
+
+
+ try
+ {
+ boolean foundMsgs = false;
+ System.out.println("entering into pollingWWWWWWWWWWWWWWWWW");
+ final ConsumerRecords<String,String> records = kConsumer.poll ( 100 );
+ System.out.println("polling doneXXXXXXXXXXXXXXXXXXXXXXXXXXX....");
+ for ( ConsumerRecord<String,String> record : records )
+ {
+ foundMsgs = true;
+ fPendingMsgs.offer ( record );
+ }
+
+ }
+ catch ( KafkaException x )
+ {
+ log.debug ( fLogTag + ": KafkaException " + x.getMessage () );
+
+ }
+ catch ( java.lang.IllegalStateException | java.lang.IllegalArgumentException x )
+ {
+ log.error ( fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. " + x.getMessage () );
+
+ }
+
+ return null;
+ }
+
+
+
+ /**
+ * getName() method returns string type value. returns 3 parameters in
+ * string:- fTopic,fGroup,fId
+ *
+ * @Override
+ */
+ public String getName() {
+ return fTopic + " : " + fGroup + " : " + fId;
+ }
+
+ /**
+ * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
+ * variable value
+ *
+ * @Override
+ *
+ */
+ public long getCreateTimeMs() {
+ return fCreateTimeMs;
+ }
+
+ public org.apache.kafka.clients.consumer.KafkaConsumer getConsumer() {
+ return kConsumer;
+ }
+
+ /**
+ * getLastAccessMs() method returns long type value. returns fLastTouch
+ * variable value
+ *
+ * @Override
+ *
+ */
+ public long getLastAccessMs() {
+ return fLastTouch;
+ }
+
+
+
+ /**
+ * getOffset() method returns long type value. returns offset variable value
+ *
+ * @Override
+ *
+ */
+ public long getOffset() {
+ return offset;
+ }
+
+ /**
+ * commit offsets commitOffsets() method will be called on closed of
+ * KafkaConsumer.
+ *
+ * @Override
+ *
+ *
+ * public void commitOffsets() { if (getState() ==
+ * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
+ * on closed KafkaConsumer " + getName()); return; }
+ * fConnector.commitOffsets(); }
+ */
+
+ /**
+ * updating fLastTouch with current time in ms
+ */
+ public void touch() {
+ fLastTouch = System.currentTimeMillis();
+ }
+
+ /**
+ * getLastTouch() method returns long type value. returns fLastTouch
+ * variable value
+ *
+ */
+ public long getLastTouch() {
+ return fLastTouch;
+ }
+
+ /**
+ * setting the kafkaConsumer state to closed
+ */
+ public synchronized boolean close() {
+
+ if (getState() == KafkaConsumer.State.CLOSED) {
+
+ log.warn("close() called on closed KafkaConsumer " + getName());
+ return true;
+ }
+
+ setState(KafkaConsumer.State.CLOSED);
+ // fConnector.shutdown();
+ boolean retVal = kafkaConnectorshuttask();
+ return retVal;
+
+ }
+
+ /* time out if the kafka shutdown fails for some reason */
+
+ private boolean kafkaConnectorshuttask() {
+ Callable<Boolean> run = new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ // your code to be timed
+ try {
+ System.out.println("consumer closing....." + kConsumer);
+ kConsumer.close();
+ } catch (Exception e) {
+ log.info("@@@@@@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
+ }
+ log.info("Kafka connection closure with in 15 seconds by a Executors task");
+ return true;
+ }
+ };
+
+ RunnableFuture future = new FutureTask(run);
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ service.execute(future);
+ Boolean result = null;
+ try {
+ result = (Boolean) future.get(15, TimeUnit.SECONDS); // wait 1
+ // second
+ } catch (TimeoutException ex) {
+ // timed out. Try to stop the code if possible.
+ log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task");
+ future.cancel(true);
+ } catch (Exception ex) {
+ // timed out. Try to stop the code if possible.
+ log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task" + ex);
+ future.cancel(true);
+ return false;
+ }
+ service.shutdown();
+ return true;
+ }
+
+ /**
+ * getConsumerGroup() returns Consumer group
+ *
+ * @return
+ */
+ public String getConsumerGroup() {
+ return fGroup;
+ }
+
+ /**
+ * getConsumerId returns Consumer Id
+ *
+ * @return
+ */
+ public String getConsumerId() {
+ return fId;
+ }
+
+ /**
+ * getState returns kafkaconsumer state
+ *
+ * @return
+ */
+ private KafkaConsumer.State getState() {
+ return this.state;
+ }
+
+ /**
+ * setState() sets the kafkaConsumer state
+ *
+ * @param state
+ */
+ private void setState(KafkaConsumer.State state) {
+ this.state = state;
+ }
+
+ // private ConsumerConnector fConnector;
+ private final String fTopic;
+ private final String fGroup;
+ private final String fId;
+ private final String fLogTag;
+ // private final KafkaStream<byte[], byte[]> fStream;
+ private final org.apache.kafka.clients.consumer.KafkaConsumer<String, String> kConsumer;
+ private long fCreateTimeMs;
+ private long fLastTouch;
+ private long offset;
+ private KafkaConsumer.State state;
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumer.class);
+ private final LinkedBlockingQueue<ConsumerRecord<String,String>> fPendingMsgs;
+ // private static final Logger log =
+ // LoggerFactory.getLogger(KafkaConsumer.class);
+
+ @Override
+ public void commitOffsets() {
+ if (getState() == KafkaConsumer.State.CLOSED) {
+ log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
+ return;
+ }
+ kConsumer.commitSync();
+ // fConsumer.close();
+
+ }
+
+
+
+ @Override
+ public void setOffset(long offsetval) {
+ // TODO Auto-generated method stub
+ offset = offsetval;
+ }
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
new file mode 100644
index 0000000..4340cae
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
@@ -0,0 +1,749 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.kafka;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Resource;
+
+import org.I0Itec.zkclient.exception.ZkException;
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.http.annotation.NotThreadSafe;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.ComponentScan;
+
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.dmf.mr.backends.Consumer;
+import com.att.dmf.mr.backends.MetricsSet;
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.dmf.mr.exception.DMaaPErrorMessages;
+import com.att.dmf.mr.utils.ConfigurationReader;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.metrics.CdmTimer;
+
+/**
+ * @NotThreadSafe but expected to be used within KafkaConsumerFactory, which
+ * must be
+ * @author peter
+ *
+ */
+@NotThreadSafe
+public class KafkaConsumerCache {
+
+ private static KafkaConsumerCache kafkaconscache = null;
+
+ public static KafkaConsumerCache getInstance() {
+ if (kafkaconscache == null)
+ kafkaconscache = new KafkaConsumerCache();
+
+ return kafkaconscache;
+ }
+
+ private static final String kSetting_ConsumerHandoverWaitMs = "cambria.consumer.cache.handoverWaitMs";
+ private static final int kDefault_ConsumerHandoverWaitMs = 500;
+
+ private static final String kSetting_SweepEverySeconds = "cambria.consumer.cache.sweepFreqSeconds";
+ private static final String kSetting_TouchEveryMs = "cambria.consumer.cache.touchFreqMs";
+
+ private static final String kSetting_ZkBasePath = "cambria.consumer.cache.zkBasePath";
+ private static final String kDefault_ZkBasePath = CambriaConstants.kDefault_ZkRoot + "/consumerCache";
+
+ // kafka defaults to timing out a client after 6 seconds of inactivity, but
+ // it heartbeats even when the client isn't fetching. Here, we don't
+ // want to prematurely rebalance the consumer group. Assuming clients are
+ // hitting
+ // the server at least every 30 seconds, timing out after 2 minutes should
+ // be okay.
+ // FIXME: consider allowing the client to specify its expected call rate?
+ private static final long kDefault_MustTouchEveryMs = 1000 * 60 * 2;
+
+ // check for expirations pretty regularly
+ private static final long kDefault_SweepEverySeconds = 15;
+
+ private enum Status {
+ NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
+ }
+
+ // @Qualifier("kafkalockavoid")
+
+ // @Resource
+ // @Qualifier("kafkalockavoid")
+ // KafkaLiveLockAvoider2 kafkaLiveLockAvoider;
+
+ @Autowired
+ private DMaaPErrorMessages errorMessages;
+
+ // KafkaLiveLockAvoider kafkaLiveLockAvoider = new KafkaLiveLockAvoider();
+ /**
+ * User defined exception class for kafka consumer cache
+ *
+ * @author nilanjana.maity
+ *
+ */
+ public class KafkaConsumerCacheException extends Exception {
+ /**
+ * To throw the exception
+ *
+ * @param t
+ */
+ KafkaConsumerCacheException(Throwable t) {
+ super(t);
+ }
+
+ /**
+ *
+ * @param s
+ */
+ public KafkaConsumerCacheException(String s) {
+ super(s);
+ }
+
+ private static final long serialVersionUID = 1L;
+ }
+
+ /**
+ * Creates a KafkaConsumerCache object. Before it is used, you must call
+ * startCache()
+ *
+ * @param apiId
+ * @param s
+ * @param metrics
+ */
+ public KafkaConsumerCache() {
+
+ String strkSetting_ZkBasePath = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ kSetting_ZkBasePath);
+ if (null == strkSetting_ZkBasePath)
+ strkSetting_ZkBasePath = kDefault_ZkBasePath;
+ fBaseZkPath = strkSetting_ZkBasePath;
+
+ fConsumers = new ConcurrentHashMap<String, Kafka011Consumer>();
+ fSweepScheduler = Executors.newScheduledThreadPool(1);
+
+ curatorConsumerCache = null;
+
+ status = Status.NOT_STARTED;
+ // Watcher for consumer rebalancing across nodes. Kafka011 rebalancing
+ // work around
+
+ listener = new ConnectionStateListener() {
+ public void stateChanged(CuratorFramework client, ConnectionState newState) {
+ if (newState == ConnectionState.LOST) {
+
+ log.info("ZooKeeper connection expired");
+ handleConnectionLoss();
+ } else if (newState == ConnectionState.READ_ONLY) {
+ log.warn("ZooKeeper connection set to read only mode.");
+ } else if (newState == ConnectionState.RECONNECTED) {
+ log.info("ZooKeeper connection re-established");
+ handleReconnection();
+ } else if (newState == ConnectionState.SUSPENDED) {
+ log.warn("ZooKeeper connection has been suspended.");
+ handleConnectionSuspended();
+ }
+ }
+ };
+ }
+
+ /**
+ * Start the cache service. This must be called before any get/put
+ * operations.
+ *
+ * @param mode
+ * DMAAP or cambria
+ * @param curator
+ * @throws IOException
+ * @throws KafkaConsumerCacheException
+ */
+ public void startCache(String mode, CuratorFramework curator) throws KafkaConsumerCacheException {
+
+ if (fApiId == null) {
+ throw new IllegalArgumentException("API Node ID must be specified.");
+ }
+
+ try {
+
+ if (mode != null && mode.equals(CambriaConstants.DMAAP)) {
+ curator = getCuratorFramework(curator);
+ }
+ curator.getConnectionStateListenable().addListener(listener);
+ setStatus(Status.CONNECTED);
+ curatorConsumerCache = new PathChildrenCache(curator, fBaseZkPath, true);
+ curatorConsumerCache.start();
+ curatorConsumerCache.getListenable().addListener(new PathChildrenCacheListener() {
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
+ switch (event.getType()) {
+ case CHILD_ADDED: {
+ try {
+ final String apiId = new String(event.getData().getData());
+ final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
+
+ log.info(apiId + " started consumer " + consumer);
+ } catch (Exception ex) {
+ log.info("#Error Occured during Adding child" + ex);
+ }
+ break;
+ }
+ case CHILD_UPDATED: {
+ final String apiId = new String(event.getData().getData());
+ final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
+
+ if (fConsumers.containsKey(consumer)) {
+ log.info(apiId + " claimed consumer " + consumer + " from " + fApiId
+ + " but wont hand over");
+ // Commented so that it dont give the connection
+ // until the active node is running for this client
+ // id.
+ dropClaimedConsumer(consumer);
+ }
+
+ break;
+ }
+ case CHILD_REMOVED: {
+ final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
+
+ if (fConsumers.containsKey(consumer)) {
+ log.info("Someone wanted consumer " + consumer
+ + " gone; but not removing it from the cache");
+ dropConsumer(consumer, false);
+ }
+
+ break;
+ }
+
+ default:
+ break;
+ }
+ }
+ });
+
+ // initialize the ZK path
+ EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
+ ensurePath.ensure(curator.getZookeeperClient());
+
+ // final long freq = fSettings.getLong(kSetting_SweepEverySeconds,
+ // kDefault_SweepEverySeconds);
+ long freq = kDefault_SweepEverySeconds;
+ String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ kSetting_SweepEverySeconds);
+ if (null != strkSetting_SweepEverySeconds) {
+ freq = Long.parseLong(strkSetting_SweepEverySeconds);
+ }
+
+ fSweepScheduler.scheduleAtFixedRate(new sweeper(), freq, freq, TimeUnit.SECONDS);
+ log.info("KafkaConsumerCache started");
+ log.info("sweeping cached clients every " + freq + " seconds");
+ } catch (ZkException e) {
+ log.error("@@@@@@ ZK Exception occured for " + e);
+ throw new KafkaConsumerCacheException(e);
+ } catch (Exception e) {
+ log.error("@@@@@@ Exception occured for " + e);
+ throw new KafkaConsumerCacheException(e);
+ }
+ }
+
+ /**
+ * Getting the curator oject to start the zookeeper connection estabished
+ *
+ * @param curator
+ * @return curator object
+ */
+ public static CuratorFramework getCuratorFramework(CuratorFramework curator) {
+ if (curator.getState() == CuratorFrameworkState.LATENT) {
+ curator.start();
+
+ try {
+ curator.blockUntilConnected();
+ } catch (InterruptedException e) {
+ // Ignore
+ log.error("error while setting curator framework :" + e.getMessage());
+ }
+ }
+
+ return curator;
+ }
+
+ /**
+ * Stop the cache service.
+ */
+ public void stopCache() {
+ setStatus(Status.DISCONNECTED);
+
+ final CuratorFramework curator = ConfigurationReader.getCurator();
+
+ if (curator != null) {
+ try {
+ curator.getConnectionStateListenable().removeListener(listener);
+ curatorConsumerCache.close();
+ log.info("Curator client closed");
+ } catch (ZkInterruptedException e) {
+ log.warn("Curator client close interrupted: " + e.getMessage());
+ } catch (IOException e) {
+ log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache" + e.getMessage());
+ }
+
+ curatorConsumerCache = null;
+ }
+
+ if (fSweepScheduler != null) {
+ fSweepScheduler.shutdownNow();
+ log.info("cache sweeper stopped");
+ }
+
+ if (fConsumers != null) {
+ fConsumers.clear();
+ fConsumers = null;
+ }
+
+ setStatus(Status.NOT_STARTED);
+
+ log.info("Consumer cache service stopped");
+ }
+
+ /**
+ * Get a cached consumer by topic, group, and id, if it exists (and remains
+ * valid) In addition, this method waits for all other consumer caches in
+ * the cluster to release their ownership and delete their version of this
+ * consumer.
+ *
+ * @param topic
+ * @param consumerGroupId
+ * @param clientId
+ * @return a consumer, or null
+ */
+ public Kafka011Consumer getConsumerFor(String topic, String consumerGroupId, String clientId)
+ throws KafkaConsumerCacheException {
+ if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
+ throw new KafkaConsumerCacheException("The cache service is unavailable.");
+
+ final String consumerKey = makeConsumerKey(topic, consumerGroupId, clientId);
+ final Kafka011Consumer kc = fConsumers.get(consumerKey);
+
+ if (kc != null) {
+ log.debug("Consumer cache hit for [" + consumerKey + "], last was at " + kc.getLastTouch());
+ kc.touch();
+ fMetrics.onKafkaConsumerCacheHit();
+ } else {
+ log.debug("Consumer cache miss for [" + consumerKey + "]");
+ fMetrics.onKafkaConsumerCacheMiss();
+ }
+
+ return kc;
+ }
+
+ /**
+ * Get a cached consumer by topic, group, and id, if it exists (and remains
+ * valid) In addition, this method waits for all other consumer caches in
+ * the cluster to release their ownership and delete their version of this
+ * consumer.
+ *
+ * @param topic
+ * @param consumerGroupId
+ * @param clientId
+ * @return a consumer, or null
+ */
+ public ArrayList<Kafka011Consumer> getConsumerListForCG(String topicgroup, String clientId)
+ throws KafkaConsumerCacheException {
+ if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
+ throw new KafkaConsumerCacheException("The cache service is unavailable.");
+ ArrayList<Kafka011Consumer> kcl = new ArrayList<Kafka011Consumer>();
+ // final String consumerKey = makeConsumerKey(topic, consumerGroupId,
+ // clientId);
+ Enumeration<String> strEnum = fConsumers.keys();
+ String consumerLocalKey = null;
+ while (strEnum.hasMoreElements()) {
+ consumerLocalKey = strEnum.nextElement();
+
+ if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) {
+
+ // System.out.println("consumer key returning from
+ // getConsumerListforCG +++++++++ " + consumerLocalKey
+ // + " " + fConsumers.get(consumerLocalKey));
+ kcl.add(fConsumers.get(consumerLocalKey));
+
+ }
+ }
+
+ return kcl;
+ }
+
+ public ArrayList<Kafka011Consumer> getConsumerListForCG(String group) throws KafkaConsumerCacheException {
+ if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
+ throw new KafkaConsumerCacheException("The cache service is unavailable.");
+ ArrayList<Kafka011Consumer> kcl = new ArrayList<Kafka011Consumer>();
+ // final String consumerKey = makeConsumerKey(topic, consumerGroupId,
+ // clientId);
+ Enumeration<String> strEnum = fConsumers.keys();
+ String consumerLocalKey = null;
+ while (strEnum.hasMoreElements()) {
+ consumerLocalKey = strEnum.nextElement();
+
+ if (consumerLocalKey.startsWith(group)) {
+
+ // System.out.println("consumer key returning from
+ // getConsumerListforCG +++++++++ " + consumerLocalKey
+ // + " " + fConsumers.get(consumerLocalKey));
+ kcl.add(fConsumers.get(consumerLocalKey));
+
+ }
+ }
+
+ return kcl;
+ }
+
+ /**
+ * Put a consumer into the cache by topic, group and ID
+ *
+ * @param topic
+ * @param consumerGroupId
+ * @param consumerId
+ * @param consumer
+ * @throws KafkaConsumerCacheException
+ */
+ public void putConsumerFor(String topic, String consumerGroupId, String consumerId, Kafka011Consumer consumer)
+ throws KafkaConsumerCacheException {
+ if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
+ throw new KafkaConsumerCacheException("The cache service is unavailable.");
+
+ final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
+ fConsumers.put(consumerKey, consumer);
+
+ // String appId = "node-instance-"+i;
+
+ log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId);
+ }
+
+ public Collection<? extends Consumer> getConsumers() {
+ return new LinkedList<Kafka011Consumer>(fConsumers.values());
+ }
+
+ /**
+ * This method is to drop all the consumer
+ */
+ public void dropAllConsumers() {
+ for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
+ dropConsumer(entry.getKey(), true);
+ }
+
+ // consumers should be empty here
+ if (fConsumers.size() > 0) {
+ log.warn("During dropAllConsumers, the consumer map is not empty.");
+ fConsumers.clear();
+ }
+ }
+
+ /**
+ * Drop a consumer from our cache due to a timeout
+ *
+ * @param key
+ */
+ private void dropTimedOutConsumer(String key) {
+ fMetrics.onKafkaConsumerTimeout();
+
+ if (!fConsumers.containsKey(key)) {
+ log.warn("Attempted to drop a timed out consumer which was not in our cache: " + key);
+ return;
+ }
+
+ // First, drop this consumer from our cache
+ boolean isdrop = dropConsumer(key, true);
+ if (!isdrop) {
+ return;
+ }
+ final CuratorFramework curator = ConfigurationReader.getCurator();
+
+ try {
+ curator.delete().guaranteed().forPath(fBaseZkPath + "/" + key);
+ log.info(" ^ deleted " + fBaseZkPath + "/" + key);
+ } catch (NoNodeException e) {
+ log.warn("A consumer was deleted from " + fApiId
+ + "'s cache, but no Cambria API node had ownership of it in ZooKeeper");
+ } catch (Exception e) {
+ log.debug("Unexpected exception while deleting consumer: " + e.getMessage());
+ log.info(" %%%%%%@# Unexpected exception while deleting consumer: " + e.getMessage());
+ }
+
+ try {
+ int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
+ String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ kSetting_ConsumerHandoverWaitMs);
+ if (strkSetting_ConsumerHandoverWaitMs != null)
+ consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
+ Thread.sleep(consumerHandoverWaitMs);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ log.info("Dropped " + key + " consumer due to timeout");
+ }
+
+ /**
+ * Drop a consumer from our cache due to another API node claiming it as
+ * their own.
+ *
+ * @param key
+ */
+ private void dropClaimedConsumer(String key) {
+ // if the consumer is still in our cache, it implies a claim.
+ if (fConsumers.containsKey(key)) {
+ fMetrics.onKafkaConsumerClaimed();
+ log.info("Consumer [" + key + "] claimed by another node.");
+ }
+ log.info("^dropping claimed Kafka consumer " + key);
+ dropConsumer(key, false);
+ }
+
+ /**
+ * Removes the consumer from the cache and closes its connection to the
+ * kafka broker(s).
+ *
+ * @param key
+ * @param dueToTimeout
+ */
+ private boolean dropConsumer(String key, boolean dueToTimeout) {
+ final Kafka011Consumer kc = fConsumers.get(key);
+ log.info("closing Kafka consumer " + key + " object " + kc);
+ if (kc != null) {
+ // log.info("closing Kafka consumer " + key);
+ if (kc.close()) {
+ fConsumers.remove(key);
+
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // private final rrNvReadable fSettings;
+ private MetricsSet fMetrics;
+ private final String fBaseZkPath;
+ private final ScheduledExecutorService fSweepScheduler;
+ private String fApiId;
+
+ public void setfMetrics(final MetricsSet metrics) {
+ this.fMetrics = metrics;
+ }
+
+ public void setfApiId(final String id) {
+ this.fApiId = id;
+ }
+
+ private final ConnectionStateListener listener;
+
+ private ConcurrentHashMap<String, Kafka011Consumer> fConsumers;
+ private PathChildrenCache curatorConsumerCache;
+
+ private volatile Status status;
+
+ private void handleReconnection() {
+
+ log.info("Reading current cache data from ZK and synchronizing local cache");
+ final List<ChildData> cacheData = curatorConsumerCache.getCurrentData();
+ // Remove all the consumers in this API nodes cache that now belong to
+ // other API nodes.
+ for (ChildData cachedConsumer : cacheData) {
+ final String consumerId = ZKPaths.getNodeFromPath(cachedConsumer.getPath());
+ final String owningApiId = (cachedConsumer.getData() != null) ? new String(cachedConsumer.getData())
+ : "undefined";
+ if (!fApiId.equals(owningApiId)) {
+ fConsumers.remove(consumerId); // Commented to avoid removing
+ // the value cache hashmap but the lock still exists.
+ // This is not considered in kafka consumer Factory
+ log.info("@@@ Validating current cache data from ZK and synchronizing local cache" + owningApiId
+ + " removing " + consumerId);
+ }
+ }
+
+ setStatus(Status.CONNECTED);
+ }
+
+ private void handleConnectionSuspended() {
+ log.info("Suspending cache until ZK connection is re-established");
+
+ setStatus(Status.SUSPENDED);
+ }
+
+ private void handleConnectionLoss() {
+ log.info("Clearing consumer cache (shutting down all Kafka consumers on this node)");
+
+ setStatus(Status.DISCONNECTED);
+
+ closeAllCachedConsumers();
+ fConsumers.clear();
+ }
+
+ private void closeAllCachedConsumers() {
+ for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
+ try {
+ entry.getValue().close();
+ } catch (Exception e) {
+ log.info("@@@@@@ Error occurd while closing Clearing All cache " + e);
+ }
+ }
+ }
+
+ private static String makeConsumerKey(String topic, String consumerGroupId, String clientId) {
+ return topic + "::" + consumerGroupId + "::" + clientId;
+ }
+
+ /**
+ * This method is to get a lock
+ *
+ * @param topic
+ * @param consumerGroupId
+ * @param consumerId
+ * @throws KafkaConsumerCacheException
+ */
+ public void signalOwnership(final String topic, final String consumerGroupId, final String consumerId)
+ throws KafkaConsumerCacheException {
+ // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
+ final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
+ final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership");
+
+ try {
+ final String consumerPath = fBaseZkPath + "/" + consumerKey;
+ log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
+ final CuratorFramework curator = ConfigurationReader.getCurator();
+
+ try {
+ curator.setData().forPath(consumerPath, fApiId.getBytes());
+ } catch (KeeperException.NoNodeException e) {
+ curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes());
+ }
+ log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey);
+ timer.end();
+ } catch (Exception e) {
+ log.error(fApiId + " failed to claim ownership of consumer " + consumerKey);
+ throw new KafkaConsumerCacheException(e);
+ }
+
+ log.info("Backing off to give the Kafka broker time to clean up the ZK data for this consumer");
+
+ try {
+ int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
+ String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ kSetting_ConsumerHandoverWaitMs);
+ if (strkSetting_ConsumerHandoverWaitMs != null)
+ consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
+ Thread.sleep(consumerHandoverWaitMs);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+
+ public KafkaLiveLockAvoider2 getkafkaLiveLockAvoiderObj() {
+ return null;
+ }
+
+ public void sweep() {
+ final LinkedList<String> removals = new LinkedList<String>();
+ long mustTouchEveryMs = kDefault_MustTouchEveryMs;
+ String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ kSetting_TouchEveryMs);
+ if (null != strkSetting_TouchEveryMs) {
+ mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
+ }
+
+ // final long mustTouchEveryMs =
+ // fSettings.getLong(kSetting_TouchEveryMs, kDefault_MustTouchEveryMs);
+ final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
+
+ for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) {
+ final long lastTouchMs = e.getValue().getLastTouch();
+ log.debug("consumer #####1" + e.getKey() + " " + lastTouchMs + " < " + oldestAllowedTouchMs);
+
+ if (lastTouchMs < oldestAllowedTouchMs) {
+ log.info("consumer " + e.getKey() + " has expired");
+ removals.add(e.getKey());
+ }
+ }
+
+ for (String key : removals) {
+ dropTimedOutConsumer(key);
+ }
+ }
+
+ /**
+ * Creating a thread to run the sweep method
+ *
+ * @author nilanjana.maity
+ *
+ */
+ private class sweeper implements Runnable {
+ /**
+ * run method
+ */
+ public void run() {
+ sweep();
+ }
+ }
+
+ /**
+ * This method is to drop consumer
+ *
+ * @param topic
+ * @param consumerGroup
+ * @param clientId
+ */
+ public void dropConsumer(String topic, String consumerGroup, String clientId) {
+ dropConsumer(makeConsumerKey(topic, consumerGroup, clientId), false);
+ }
+
+ private Status getStatus() {
+ return this.status;
+ }
+
+ private void setStatus(Status status) {
+ this.status = status;
+ }
+
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
+ // private static final Logger log =
+ // LoggerFactory.getLogger(KafkaConsumerCache.class);
+} \ No newline at end of file
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java
new file mode 100644
index 0000000..805701a
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java
@@ -0,0 +1,161 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.kafka;
+
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Watcher;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
+
+//@ComponentScan(basePackages="com.att.dmf.mr.backends.kafka")
+@Component
+public class KafkaLiveLockAvoider2 {
+
+ public static final String ZNODE_ROOT = "/live-lock-avoid";
+ public static final String ZNODE_LOCKS = "/locks";
+ public static final String ZNODE_UNSTICK_TASKS ="/unstick-tasks";
+
+ private static String locksPath = ZNODE_ROOT+ZNODE_LOCKS;
+ private static String tasksPath = ZNODE_ROOT+ZNODE_UNSTICK_TASKS;
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaLiveLockAvoider2.class.getName());
+
+ @Autowired
+ @Qualifier("curator")
+ private CuratorFramework curatorFramework;
+
+ @PostConstruct
+ public void init() {
+ System.out.println("Welcome......................................................................................");
+ try {
+ if (curatorFramework.checkExists().forPath(locksPath) == null) {
+ curatorFramework.create().creatingParentsIfNeeded().forPath(locksPath);
+ }
+ if (curatorFramework.checkExists().forPath(tasksPath) == null) {
+ curatorFramework.create().creatingParentsIfNeeded().forPath(tasksPath);
+ }
+
+ } catch (Exception e) {
+ //e.printStackTrace();
+ log.error("Error during creation of permanent Znodes under /live-lock-avoid ",e);
+
+ }
+
+
+ }
+ public void unlockConsumerGroup(String appId, String groupName) throws Exception {
+
+ log.info("Signalling unlock to all conumsers of in group [{}] now, " , groupName);
+
+ String fullLockPath = String.format("%s/%s", locksPath, groupName );
+ String fullTasksPath = null;
+
+ try {
+
+ //Use the Curator recipe for a Mutex lock, only one process can be broadcasting unlock instructions for a group
+ InterProcessMutex lock = new InterProcessMutex(curatorFramework, fullLockPath);
+ if ( lock.acquire(100L, TimeUnit.MILLISECONDS) )
+ {
+ try
+ {
+ List<String> taskNodes = curatorFramework.getChildren().forPath(tasksPath);
+ for (String taskNodeName : taskNodes) {
+ if(!taskNodeName.equals(appId)) {
+
+ fullTasksPath = String.format("%s/%s/%s", tasksPath, taskNodeName, groupName);
+ log.info("Writing groupName {} to path {}",groupName, fullTasksPath);
+
+
+ if(curatorFramework.checkExists().forPath(fullTasksPath) != null) {
+ curatorFramework.delete().forPath(fullTasksPath);
+ }
+ curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(fullTasksPath);
+ }
+ }
+
+
+ }
+ finally
+ {
+ //Curator lock recipe requires a acquire() to be followed by a release()
+ lock.release();
+ }
+ }else {
+ log.info("Could not obtain the avoider lock, another process has the avoider lock? {}", !lock.isAcquiredInThisProcess() );
+ }
+
+
+ } catch (Exception e) {
+ log.error("Error setting up either lock ZNode {} or task ZNode {}",fullLockPath, fullTasksPath,e);
+ throw e;
+ }
+
+
+ }
+
+ /*
+ * Shoud be called once per MR server instance.
+ *
+ */
+ public void startNewWatcherForServer(String appId, LiveLockAvoidance avoidanceCallback) {
+ LockInstructionWatcher instructionWatcher = new LockInstructionWatcher(curatorFramework,avoidanceCallback,this);
+ assignNewProcessNode(appId, instructionWatcher);
+
+ }
+
+
+ protected void assignNewProcessNode(String appId, Watcher processNodeWatcher ) {
+
+ String taskHolderZnodePath = ZNODE_ROOT+ZNODE_UNSTICK_TASKS+"/"+appId;
+ //Watcher processNodeWatcher = createWatcher();
+
+ try {
+
+ if(curatorFramework.checkExists().forPath(taskHolderZnodePath) != null) {
+ curatorFramework.delete().deletingChildrenIfNeeded().forPath(taskHolderZnodePath);
+
+ }
+ curatorFramework.create().forPath(taskHolderZnodePath);
+ //setup the watcher
+ curatorFramework.getChildren().usingWatcher(processNodeWatcher).inBackground().forPath(taskHolderZnodePath);
+ log.info("Done creating task holder and watcher for APP name: {}",appId);
+
+ } catch (Exception e) {
+ log.error("Could not add new processing node for name {}", appId, e);
+ }
+
+ }
+
+
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
new file mode 100644
index 0000000..30209f0
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
@@ -0,0 +1,235 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.kafka;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.json.JSONException;
+import org.springframework.beans.factory.annotation.Qualifier;
+
+import com.att.dmf.mr.backends.Publisher;
+import com.att.dmf.mr.constants.CambriaConstants;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+
+//import kafka.FailedToSendMessageException;
+//import kafka.javaapi.producer.Producer;
+//import kafka.producer.KeyedMessage;
+//import kafka.producer.ProducerConfig;
+//import kafka.producer.KeyedMessage;
+
+/**
+ * Sends raw JSON objects into Kafka.
+ *
+ * Could improve space: BSON rather than JSON?
+ *
+ * @author peter
+ *
+ */
+
+public class KafkaPublisher implements Publisher {
+ /**
+ * constructor initializing
+ *
+ * @param settings
+ * @throws rrNvReadable.missingReqdSetting
+ */
+ public KafkaPublisher(@Qualifier("propertyReader") rrNvReadable settings) throws rrNvReadable.missingReqdSetting {
+ //fSettings = settings;
+
+ final Properties props = new Properties();
+ /*transferSetting(fSettings, props, "metadata.broker.list", "localhost:9092");
+ transferSetting(fSettings, props, "request.required.acks", "1");
+ transferSetting(fSettings, props, "message.send.max.retries", "5");
+ transferSetting(fSettings, props, "retry.backoff.ms", "150"); */
+ String kafkaConnUrl= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka.metadata.broker.list");
+ if(null==kafkaConnUrl){
+
+ kafkaConnUrl="localhost:9092";
+ }
+ //String jaaspath="C:/ATT/Apps/dmaapCodedmaap-framework/dmaap/bundleconfig-local/etc/appprops/kafka_pub_jaas.conf";
+ // props.put("bootstrap.servers", bootSever);
+ //System.setProperty("java.security.auth.login.config",jaaspath);
+
+ /*transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+ transferSetting( props, "security.protocol", "SASL_PLAINTEXT");
+ transferSetting( props, "sasl.mechanism", "PLAIN");*/
+ transferSetting( props, "bootstrap.servers",kafkaConnUrl);
+ //transferSetting( props, "metadata.broker.list", kafkaConnUrl);
+ transferSetting( props, "request.required.acks", "1");
+ transferSetting( props, "message.send.max.retries", "5");
+ transferSetting(props, "retry.backoff.ms", "150");
+
+ //props.put("serializer.class", "kafka.serializer.StringEncoder");
+
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+ //fConfig = new ProducerConfig(props);
+ //fProducer = new Producer<String, String>(fConfig);
+ fProducer = new KafkaProducer<>(props);
+ }
+
+ /**
+ * Send a message with a given topic and key.
+ *
+ * @param msg
+ * @throws FailedToSendMessageException
+ * @throws JSONException
+ */
+ @Override
+ public void sendMessage(String topic, message msg) throws IOException{
+ final List<message> msgs = new LinkedList<message>();
+ msgs.add(msg);
+ sendMessages(topic, msgs);
+ }
+
+ /**
+ * method publishing batch messages
+ * This method is commented from 0.8 to 0.11 upgrade
+ * @param topic
+ * @param kms
+ * throws IOException
+ *
+ public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException {
+ try {
+ fProducer.send(kms);
+
+ } catch (FailedToSendMessageException excp) {
+ log.error("Failed to send message(s) to topic [" + topic + "].", excp);
+ throw new FailedToSendMessageException(excp.getMessage(), excp);
+ }
+
+ } */
+
+
+ /*
+ * Kafka 11.0 Interface
+ * @see com.att.nsa.cambria.backends.Publisher#sendBatchMessageNew(java.lang.String, java.util.ArrayList)
+ */
+ public void sendBatchMessageNew(String topic, ArrayList <ProducerRecord<String,String>> kms) throws IOException {
+ try {
+ for (ProducerRecord<String,String> km : kms) {
+ fProducer.send(km);
+ }
+
+ } catch (Exception excp) {
+ log.error("Failed to send message(s) to topic [" + topic + "].", excp);
+ throw new IOException(excp.getMessage(), excp);
+ }
+
+ }
+
+ /**
+ * Send a set of messages. Each must have a "key" string value.
+ *
+ * @param topic
+ * @param msg
+ * @throws FailedToSendMessageException
+ * @throws JSONException
+ *
+ @Override
+ public void sendMessages(String topic, List<? extends message> msgs)
+ throws IOException, FailedToSendMessageException {
+ log.info("sending " + msgs.size() + " events to [" + topic + "]");
+
+ final List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(msgs.size());
+ for (message o : msgs) {
+ final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, o.getKey(), o.toString());
+ kms.add(data);
+ }
+ try {
+ fProducer.send(kms);
+
+ } catch (FailedToSendMessageException excp) {
+ log.error("Failed to send message(s) to topic [" + topic + "].", excp);
+ throw new FailedToSendMessageException(excp.getMessage(), excp);
+ }
+ } */
+ @Override
+ public void sendMessagesNew(String topic, List<? extends message> msgs)
+ throws IOException {
+ log.info("sending " + msgs.size() + " events to [" + topic + "]");
+try{
+ final List<ProducerRecord<String, String>> kms = new ArrayList<ProducerRecord<String, String>>(msgs.size());
+ for (message o : msgs) {
+
+ final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, o.getKey(), o.toString());
+ //kms.add(data);
+
+ try {
+
+ fProducer.send(data);
+
+ } catch (Exception excp) {
+ log.error("Failed to send message(s) to topic [" + topic + "].", excp);
+ throw new Exception(excp.getMessage(), excp);
+ }
+ }
+
+ }catch(Exception e){}
+}
+ //private final rrNvReadable fSettings;
+
+ //private ProducerConfig fConfig;
+ private Producer<String, String> fProducer;
+
+ /**
+ * It sets the key value pair
+ * @param topic
+ * @param msg
+ * @param key
+ * @param defVal
+ */
+ private void transferSetting(Properties props, String key, String defVal) {
+ String kafka_prop= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key);
+ if (null==kafka_prop) kafka_prop=defVal;
+ //props.put(key, settings.getString("kafka." + key, defVal));
+ props.put(key, kafka_prop);
+ }
+
+ //private static final Logger log = LoggerFactory.getLogger(KafkaPublisher.class);
+
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaPublisher.class);
+
+ @Override
+ public void sendMessages(String topic, List<? extends message> msgs) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ //@Override
+ //public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException {
+ // TODO Auto-generated method stub
+
+ //}
+} \ No newline at end of file
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/LiveLockAvoidance.java b/src/main/java/com/att/dmf/mr/backends/kafka/LiveLockAvoidance.java
new file mode 100644
index 0000000..a13ecea
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/LiveLockAvoidance.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.kafka;
+
+
+
+/**
+ * Live Lock Avoidance interface. To be implemented by the main message router client
+ *
+ */
+public interface LiveLockAvoidance {
+
+ /**
+ * Gets the unique id
+ * @return the unique id for the Message Router server instance
+ */
+ String getAppId();
+
+
+ /**
+ * Main callback to inform the local MR server instance that all consumers in a group need to soft poll
+ * @param groupName name of the Kafka consumer group needed a soft poll
+ */
+ void handleRebalanceUnlock( String groupName);
+
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java b/src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java
new file mode 100644
index 0000000..5d3bc47
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java
@@ -0,0 +1,100 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.kafka;
+
+import java.util.List;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+/**
+ *
+ * LockInstructionWatcher
+ * A package-private class used internally by the KafkaLiveLockAvoider.
+ *
+ * This class implements the zookeeper Watcher callback and listens for changes on child nodes changing.
+ * Each child node is actually a Kafka group name that needs to be soft polled. Deletion of the child nodes
+ * after soft poll unlocking is finished.
+ *
+ *
+ */
+public class LockInstructionWatcher implements Watcher {
+
+ private CuratorFramework curatorFramework;
+ private LiveLockAvoidance avoidanceCallback;
+ private KafkaLiveLockAvoider2 avoider;
+
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(LockInstructionWatcher.class.getName());
+
+
+ public LockInstructionWatcher(CuratorFramework curatorFramework, LiveLockAvoidance avoidanceCallback,
+ KafkaLiveLockAvoider2 avoider) {
+ super();
+ this.curatorFramework = curatorFramework;
+ this.avoidanceCallback = avoidanceCallback;
+ this.avoider = avoider;
+ }
+
+
+ @Override
+ public void process(WatchedEvent event) {
+
+ switch (event.getType()) {
+ case NodeChildrenChanged:
+
+
+ try {
+
+ log.info("node children changed at path: {}", event.getPath());
+ //String grpName = new String(curatorFramework.getData().forPath(event.getPath()));
+ List<String> children = curatorFramework.getChildren().forPath(event.getPath());
+
+ log.info("found children nodes prodcessing now");
+ for (String child : children) {
+ String childPath = String.format("%s/%s", event.getPath(), child);
+ log.info("Processing child task at node {}",childPath);
+ avoidanceCallback.handleRebalanceUnlock( child);
+ log.info("Deleting child task at node {}",childPath);
+ curatorFramework.delete().forPath(childPath);
+ }
+ //reset the watch with the avoider
+ avoider.assignNewProcessNode(avoidanceCallback.getAppId(), this);
+
+
+ } catch (Exception e) {
+ log.error("Error manipulating ZNode data in watcher",e);
+ }
+
+ break;
+
+ default:
+ log.info("Listner fired on path: {}, with event: {}", event.getPath(), event.getType());
+ break;
+ }
+ }
+
+
+}