summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/backends
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/dmf/mr/backends')
-rw-r--r--src/main/java/com/att/dmf/mr/backends/Consumer.java106
-rw-r--r--src/main/java/com/att/dmf/mr/backends/ConsumerFactory.java119
-rw-r--r--src/main/java/com/att/dmf/mr/backends/MetricsSet.java71
-rw-r--r--src/main/java/com/att/dmf/mr/backends/Publisher.java101
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java403
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java126
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt386
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java749
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java161
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java235
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/LiveLockAvoidance.java45
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java100
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java182
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java198
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java207
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MemoryQueuePublisher.java92
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java109
17 files changed, 3390 insertions, 0 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/Consumer.java b/src/main/java/com/att/dmf/mr/backends/Consumer.java
new file mode 100644
index 0000000..279d48b
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/Consumer.java
@@ -0,0 +1,106 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends;
+
+import java.util.ArrayList;
+
+/**
+ * A consumer interface. Consumers pull the next message from a given topic.
+ * @author peter
+ */
+public interface Consumer
+{
+ /**
+ * A message interface provide the offset and message
+ * @author nilanjana.maity
+ *
+ */
+ public interface Message
+ {
+ /**
+ * returning the offset of that particular message
+ * @return long
+ */
+ long getOffset ();
+ /**
+ * returning the message
+ * @return message
+ */
+ String getMessage ();
+ }
+
+ /**
+ * Get this consumer's name
+ * @return name
+ */
+ String getName ();
+
+ /**
+ * Get creation time in ms
+ * @return
+ */
+ long getCreateTimeMs ();
+
+ /**
+ * Get last access time in ms
+ * @return
+ */
+ long getLastAccessMs ();
+
+ /**
+ * Get the next message from this source. This method must not block.
+ * @return the next message, or null if none are waiting
+ */
+ Message nextMessage ();
+
+ /**
+ * Get the next message from this source. This method must not block.
+ * @param atOffset start with the next message at or after atOffset. -1 means next from last request
+ * @return the next message, or null if none are waiting
+ */
+// Message nextMessage ( long atOffset );
+
+ //Message nextMessage (ArrayList cl);
+ /**
+ * Close/clean up this consumer
+ * @return
+ */
+ boolean close();
+
+ /**
+ * Commit the offset of the last consumed message
+ *
+ */
+ void commitOffsets();
+
+ /**
+ * Get the offset this consumer is currently at
+ * @return offset
+ */
+ long getOffset();
+
+ void setOffset(long offset);
+
+ //public org.apache.kafka.clients.consumer.KafkaConsumer getConsumer();
+
+
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/ConsumerFactory.java b/src/main/java/com/att/dmf/mr/backends/ConsumerFactory.java
new file mode 100644
index 0000000..0b684fe
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/ConsumerFactory.java
@@ -0,0 +1,119 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends;
+
+import java.util.Collection;
+import java.util.HashMap;
+
+import com.att.dmf.mr.CambriaApiException;
+
+/**
+ * This is the factory class to instantiate the consumer
+ *
+ * @author nilanjana.maity
+ *
+ */
+
+public interface ConsumerFactory {
+ public static final String kSetting_EnableCache = "cambria.consumer.cache.enabled";
+ public static boolean kDefault_IsCacheEnabled = true;
+
+ /**
+ * User defined exception for Unavailable Exception
+ *
+ * @author nilanjana.maity
+ *
+ */
+ public class UnavailableException extends Exception {
+ /**
+ * Unavailable Exception with message
+ *
+ * @param msg
+ */
+ public UnavailableException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Unavailable Exception with the throwable object
+ *
+ * @param t
+ */
+ public UnavailableException(Throwable t) {
+ super(t);
+ }
+
+ /**
+ * Unavailable Exception with the message and cause
+ *
+ * @param msg
+ * @param cause
+ */
+ public UnavailableException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+ private static final long serialVersionUID = 1L;
+ }
+
+ /**
+ * For admin use, drop all cached consumers.
+ */
+ public void dropCache();
+
+ /**
+ * Get or create a consumer for the given set of info (topic, group, id)
+ *
+ * @param topic
+ * @param consumerGroupId
+ * @param clientId
+ * @param timeoutMs
+ * @return
+ * @throws UnavailableException
+ */
+ //public Consumer getConsumerFor(String topic, String consumerGroupId,
+ // String clientId, int timeoutMs) throws UnavailableException;
+
+ /**
+ * For factories that employ a caching mechanism, this allows callers to
+ * explicitly destory a consumer that resides in the factory's cache.
+ *
+ * @param topic
+ * @param consumerGroupId
+ * @param clientId
+ */
+ public void destroyConsumer(String topic, String consumerGroupId,
+ String clientId);
+
+ /**
+ * For admin/debug, we provide access to the consumers
+ *
+ * @return a collection of consumers
+ */
+ public Collection<? extends Consumer> getConsumers();
+
+ public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs, String remotehost) throws UnavailableException, CambriaApiException;
+ public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs, String remotehost) throws UnavailableException, CambriaApiException;
+
+
+
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/MetricsSet.java b/src/main/java/com/att/dmf/mr/backends/MetricsSet.java
new file mode 100644
index 0000000..de665b8
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/MetricsSet.java
@@ -0,0 +1,71 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends;
+
+import com.att.nsa.metrics.CdmMetricsRegistry;
+/**
+ * This interface will help to generate metrics
+ * @author nilanjana.maity
+ *
+ */
+public interface MetricsSet extends CdmMetricsRegistry{
+
+ /**
+ * This method will setup cambria sender code
+ */
+ public void setupCambriaSender ();
+ /**
+ * This method will define on route complete
+ * @param name
+ * @param durationMs
+ */
+ public void onRouteComplete ( String name, long durationMs );
+ /**
+ * This method will help the kafka publisher while publishing the messages
+ * @param amount
+ */
+ public void publishTick ( int amount );
+ /**
+ * This method will help the kafka consumer while consuming the messages
+ * @param amount
+ */
+ public void consumeTick ( int amount );
+ /**
+ * This method will call if the kafka consumer cache missed
+ */
+ public void onKafkaConsumerCacheMiss ();
+ /**
+ * This method will call if the kafka consumer cache will be hit while publishing/consuming the messages
+ */
+ public void onKafkaConsumerCacheHit ();
+ /**
+ * This method will call if the kafka consumer cache claimed
+ */
+ public void onKafkaConsumerClaimed ();
+ /**
+ * This method will call if Kafka consumer is timed out
+ */
+ public void onKafkaConsumerTimeout ();
+
+
+
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/Publisher.java b/src/main/java/com/att/dmf/mr/backends/Publisher.java
new file mode 100644
index 0000000..8ff6ce9
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/Publisher.java
@@ -0,0 +1,101 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import com.att.dmf.mr.beans.LogDetails;
+
+//import kafka.producer.KeyedMessage;
+/**
+ * A publisher interface. Publishers receive messages and post them to a topic.
+ * @author peter
+ */
+public interface Publisher
+{
+ /**
+ * A message interface. The message has a key and a body.
+ * @author peter
+ */
+ public interface message
+ {
+ /**
+ * Get the key for this message. The key is used to partition messages
+ * into "sub-streams" that have guaranteed order. The key can be null,
+ * which means the message can be processed without any concern for order.
+ *
+ * @return a key, possibly null
+ */
+ String getKey();
+
+ /**
+ * Get the message body.
+ * @return a message body
+ */
+ String getMessage();
+ /**
+ * set the logging params for transaction enabled logging
+ * @param logDetails
+ */
+ void setLogDetails (LogDetails logDetails);
+ /**
+ * Get the log details for transaction enabled logging
+ * @return LogDetails
+ */
+ LogDetails getLogDetails ();
+
+ /**
+ * boolean transactionEnabled
+ * @return true/false
+ */
+ boolean isTransactionEnabled();
+ /**
+ * Set the transaction enabled flag from prop file or topic based implementation
+ * @param transactionEnabled
+ */
+ void setTransactionEnabled(boolean transactionEnabled);
+ }
+
+ /**
+ * Send a single message to a topic. Equivalent to sendMessages with a list of size 1.
+ * @param topic
+ * @param msg
+ * @throws IOException
+ */
+ public void sendMessage ( String topic, message msg ) throws IOException;
+
+ /**
+ * Send messages to a topic.
+ * @param topic
+ * @param msgs
+ * @throws IOException
+ */
+ public void sendMessages ( String topic, List<? extends message> msgs ) throws IOException;
+
+ //public void sendBatchMessage(String topic ,ArrayList<KeyedMessage<String,String>> kms) throws IOException;
+ public void sendBatchMessageNew(String topic ,ArrayList<ProducerRecord<String,String>> kms) throws IOException;
+ public void sendMessagesNew( String topic, List<? extends message> msgs ) throws IOException;
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java
new file mode 100644
index 0000000..f7f5ba7
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java
@@ -0,0 +1,403 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.kafka;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+
+import com.att.dmf.mr.backends.Consumer;
+import com.att.dmf.mr.constants.CambriaConstants;
+
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+/**
+ * A consumer instance that's created per-request. These are stateless so that
+ * clients can connect to this service as a proxy.
+ *
+ * @author Ram
+ *
+ */
+public class Kafka011Consumer implements Consumer {
+ private enum State {
+ OPENED, CLOSED
+ }
+
+ // @Autowired
+ // KafkaLiveLockAvoider kafkaLiveLockAvoider;
+ /**
+ * KafkaConsumer() is constructor. It has following 4 parameters:-
+ *
+ * @param topic
+ * @param group
+ * @param id
+ * @param cc
+ *
+ */
+
+ public Kafka011Consumer(String topic, String group, String id, KafkaConsumer<String, String> cc,
+ KafkaLiveLockAvoider2 klla) throws Exception {
+ fTopic = topic;
+ fGroup = group;
+ fId = id;
+ fCreateTimeMs = System.currentTimeMillis();
+ fLastTouch = fCreateTimeMs;
+ fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String, String>>();
+ fLogTag = fGroup + "(" + fId + ")/" + fTopic;
+ offset = 0;
+ state = Kafka011Consumer.State.OPENED;
+ kConsumer = cc;
+ fKafkaLiveLockAvoider = klla;
+ synchronized (kConsumer) {
+ kConsumer.subscribe(Arrays.asList(topic));
+ }
+ }
+
+ private Consumer.Message makeMessage(final ConsumerRecord<String, String> msg) {
+ return new Consumer.Message() {
+ @Override
+ public long getOffset() {
+ offset = msg.offset();
+ return offset;
+ }
+
+ @Override
+ public String getMessage() {
+ return new String(msg.value());
+ }
+ };
+ }
+
+ @Override
+ public synchronized Consumer.Message nextMessage() {
+
+ try {
+ if (fPendingMsgs.size() > 0) {
+ return makeMessage(fPendingMsgs.take());
+ }
+ } catch (InterruptedException x) {
+ log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
+ x);
+ }
+
+ Callable<Boolean> run = new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ try {
+ ConsumerRecords<String, String> records;
+ synchronized (kConsumer) {
+ records = kConsumer.poll(500);
+ }
+ for (ConsumerRecord<String, String> record : records) {
+ // foundMsgs = true;
+ fPendingMsgs.offer(record);
+ }
+
+ } catch (KafkaException x) {
+ log.debug(fLogTag + ": KafkaException " + x.getMessage());
+
+ } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
+ log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. "
+ + x.getMessage());
+
+ }
+
+ // return null;
+ return true;
+ }
+ };
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ RunnableFuture future = new FutureTask(run);
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ service.execute(future);
+ try {
+ future.get(5, TimeUnit.SECONDS); // wait 1
+ // second
+ } catch (TimeoutException ex) {
+ // timed out. Try to stop the code if possible.
+ String apiNodeId = null;
+ try {
+ apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
+ } catch (UnknownHostException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+
+ try {
+ if (fKafkaLiveLockAvoider != null)
+ fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
+ } catch (Exception e) {
+ log.error("unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup);
+ }
+
+ forcePollOnConsumer();
+ future.cancel(true);
+ } catch (Exception ex) {
+ // timed out. Try to stop the code if possible.
+ future.cancel(true);
+ }
+ service.shutdown();
+
+ return null;
+
+ }
+
+ /**
+ * getName() method returns string type value. returns 3 parameters in
+ * string:- fTopic,fGroup,fId
+ *
+ * @Override
+ */
+ public String getName() {
+ return fTopic + " : " + fGroup + " : " + fId;
+ }
+
+ /**
+ * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
+ * variable value
+ *
+ * @Override
+ *
+ */
+ public long getCreateTimeMs() {
+ return fCreateTimeMs;
+ }
+
+ public org.apache.kafka.clients.consumer.KafkaConsumer<String, String> getConsumer() {
+ return kConsumer;
+ }
+
+ /**
+ * getLastAccessMs() method returns long type value. returns fLastTouch
+ * variable value
+ *
+ * @Override
+ *
+ */
+ public long getLastAccessMs() {
+ return fLastTouch;
+ }
+
+ /**
+ * getOffset() method returns long type value. returns offset variable value
+ *
+ * @Override
+ *
+ */
+ public long getOffset() {
+ return offset;
+ }
+
+ /**
+ * commit offsets commitOffsets() method will be called on closed of
+ * KafkaConsumer.
+ *
+ * @Override
+ *
+ *
+ * public void commitOffsets() { if (getState() ==
+ * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
+ * on closed KafkaConsumer " + getName()); return; }
+ * fConnector.commitOffsets(); }
+ */
+
+ /**
+ * updating fLastTouch with current time in ms
+ */
+ public void touch() {
+ fLastTouch = System.currentTimeMillis();
+ }
+
+ /**
+ * getLastTouch() method returns long type value. returns fLastTouch
+ * variable value
+ *
+ */
+ public long getLastTouch() {
+ return fLastTouch;
+ }
+
+ /**
+ * setting the kafkaConsumer state to closed
+ */
+ // public synchronized boolean close() {
+ public boolean close() {
+ if (getState() == Kafka011Consumer.State.CLOSED) {
+
+ log.error("close() called on closed KafkaConsumer " + getName());
+ return true;
+ }
+
+ // fConnector.shutdown();
+ boolean retVal = kafkaConnectorshuttask();
+ return retVal;
+
+ }
+
+ /* time out if the kafka shutdown fails for some reason */
+
+ private boolean kafkaConnectorshuttask() {
+ Callable<Boolean> run = new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+
+ try {
+ // System.out.println("attempt to delete " + kConsumer);
+ kConsumer.close();
+
+ } catch (Exception e) {
+ log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
+ throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
+ // return false;
+ }
+ log.info("Kafka connection closure with in 15 seconds by a Executors task");
+
+ return true;
+ }
+ };
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ RunnableFuture future = new FutureTask(run);
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ service.execute(future);
+ try {
+ future.get(200, TimeUnit.SECONDS); // wait 1
+ // second
+ } catch (TimeoutException ex) {
+ // timed out. Try to stop the code if possible.
+ log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task");
+ future.cancel(true);
+ setState(Kafka011Consumer.State.OPENED);
+ } catch (Exception ex) {
+ // timed out. Try to stop the code if possible.
+ log.error("Exception occured Occured - Kafka connection closure with in 300 seconds by a Executors task"
+ + ex);
+ future.cancel(true);
+ setState(Kafka011Consumer.State.OPENED);
+ return false;
+ }
+ service.shutdown();
+ setState(Kafka011Consumer.State.CLOSED);
+ return true;
+ }
+
+ public void forcePollOnConsumer() {
+ Kafka011ConsumerUtil.forcePollOnConsumer(fTopic, fGroup, fId);
+
+ }
+
+ /**
+ * getConsumerGroup() returns Consumer group
+ *
+ * @return
+ */
+ public String getConsumerGroup() {
+ return fGroup;
+ }
+
+ /**
+ * getConsumerId returns Consumer Id
+ *
+ * @return
+ */
+ public String getConsumerId() {
+ return fId;
+ }
+
+ /**
+ * getState returns kafkaconsumer state
+ *
+ * @return
+ */
+ private Kafka011Consumer.State getState() {
+ return this.state;
+ }
+
+ /**
+ * setState() sets the kafkaConsumer state
+ *
+ * @param state
+ */
+ private void setState(Kafka011Consumer.State state) {
+ this.state = state;
+ }
+
+ // private ConsumerConnector fConnector;
+ private final String fTopic;
+ private final String fGroup;
+ private final String fId;
+ private final String fLogTag;
+ // private final KafkaStream<byte[], byte[]> fStream;
+ private KafkaConsumer<String, String> kConsumer;
+ private long fCreateTimeMs;
+ private long fLastTouch;
+ private long offset;
+ private Kafka011Consumer.State state;
+ private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
+ private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;
+ //private ArrayList<Kafka011Consumer> fconsumerList;
+ @Override
+ public void commitOffsets() {
+ if (getState() == Kafka011Consumer.State.CLOSED) {
+ log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
+ return;
+ }
+ kConsumer.commitSync();
+ // fConsumer.close();
+
+ }
+
+ @Override
+ public void setOffset(long offsetval) {
+ offset = offsetval;
+ }
+
+
+ public void setConsumerCache(KafkaConsumerCache cache) {
+ }
+
+ //@Override
+ //public Message nextMessage(ArrayList<?> l) {
+ // TODO Auto-generated method stub
+ //return null;
+ //}
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java
new file mode 100644
index 0000000..ea9407b
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java
@@ -0,0 +1,126 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.kafka;
+
+import java.util.ArrayList;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+/**
+ * A consumer Util class for force polling when a rebalance issue is anticipated
+ *
+ * @author Ram
+ *
+ */
+public class Kafka011ConsumerUtil {
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011ConsumerUtil.class);
+
+ /**
+ * @param fconsumercache
+ * @param fTopic
+ * @param fGroup
+ * @param fId
+ * @return
+ */
+ public static boolean forcePollOnConsumer(final String fTopic, final String fGroup, final String fId) {
+
+ Thread forcepollThread = new Thread(new Runnable() {
+ public void run() {
+ try {
+
+ ArrayList<Kafka011Consumer> kcsList = null;
+
+ kcsList = KafkaConsumerCache.getInstance().getConsumerListForCG(fTopic + "::" + fGroup + "::", fId);
+ if (null != kcsList) {
+ for (int counter = 0; counter < kcsList.size(); counter++) {
+
+ Kafka011Consumer kc1 = kcsList.get(counter);
+
+ try {
+ ConsumerRecords<String, String> recs = kc1.getConsumer().poll(0);
+ log.info("soft poll on " + kc1);
+ } catch (java.util.ConcurrentModificationException e) {
+ log.error("Error occurs for " + e);
+ }
+
+ }
+
+ }
+
+ } catch (Exception e) {
+ log.error("Failed and go to Exception block for " + fGroup + " " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+ });
+
+ forcepollThread.start();
+
+ return false;
+
+ }
+
+ /**
+ * @param fconsumercache
+ * @param group
+ * @return
+ */
+ public static boolean forcePollOnConsumer(final String group) {
+
+ Thread forcepollThread = new Thread(new Runnable() {
+ public void run() {
+ try {
+ ArrayList<Kafka011Consumer> kcsList = new ArrayList<Kafka011Consumer>();
+ kcsList = KafkaConsumerCache.getInstance().getConsumerListForCG(group);
+
+ if (null != kcsList) {
+
+ for (int counter = 0; counter < kcsList.size(); counter++) {
+
+ Kafka011Consumer kc1 = kcsList.get(counter);
+ log.info("soft poll on remote nodes " + kc1);
+ ConsumerRecords<String, String> recs = kc1.getConsumer().poll(0);
+ }
+
+ }
+
+ } catch (java.util.ConcurrentModificationException e) {
+ log.error("Error occurs for " + e);
+ } catch (Exception e) {
+ log.error("Failed and go to Exception block for " + group + " " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+ });
+
+ forcepollThread.start();
+ return false;
+
+ }
+
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt
new file mode 100644
index 0000000..dd6259f
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt
@@ -0,0 +1,386 @@
+package com.att.dmf.mr.backends.kafka;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.KafkaException;
+
+import com.att.dmf.mr.backends.Consumer;
+
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+/**
+ * A consumer instance that's created per-request. These are stateless so that
+ * clients can connect to this service as a proxy.
+ *
+ * @author peter
+ *
+ */
+public class KafkaConsumer implements Consumer {
+ private enum State {
+ OPENED, CLOSED
+ }
+
+ /**
+ * KafkaConsumer() is constructor. It has following 4 parameters:-
+ *
+ * @param topic
+ * @param group
+ * @param id
+ * @param cc
+ *
+ */
+
+ public KafkaConsumer(String topic, String group, String id, Properties prop) throws Exception {
+ fTopic = topic;
+ fGroup = group;
+ fId = id;
+ // fConnector = cc;
+
+ fCreateTimeMs = System.currentTimeMillis();
+ fLastTouch = fCreateTimeMs;
+ fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String,String>> ();
+ fLogTag = fGroup + "(" + fId + ")/" + fTopic;
+ offset = 0;
+
+ state = KafkaConsumer.State.OPENED;
+
+ // final Map<String, Integer> topicCountMap = new HashMap<String,
+ // Integer>();
+ // topicCountMap.put(fTopic, 1);
+ // log.info(fLogTag +" kafka Consumer started at "
+ // +System.currentTimeMillis());
+ // final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
+ // fConnector.createMessageStreams(topicCountMap);
+ // final List<KafkaStream<byte[], byte[]>> streams =
+ // consumerMap.get(fTopic);
+
+ kConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(prop);
+ // System.out.println("I am in Consumer APP " + topic + "-- " +
+ // fConsumer);
+ kConsumer.subscribe(Arrays.asList(topic));
+ log.info(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
+ System.out.println("-----id " +id);
+
+
+ try { ConsumerRecords<String, String> records =
+ kConsumer.poll(500); System.out.println("---" +
+ records.count());
+
+ for (ConsumerRecord<String, String> record : records) {
+ System.out.printf("offset = %d, key = %s, value = %s",
+ record.offset(), record.key(), record.value()); String t =
+ record.value();
+
+ }
+ }catch(Exception e){
+ System.out.println( e);
+ }
+ System.out.println(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
+ kConsumer.commitSync();
+ // fConsumer.close();
+
+
+ /*
+ * ConsumerRecords<String, String> records = fConsumer.poll(500);
+ * System.out.println("---" + records.count());
+ *
+ * for (ConsumerRecord<String, String> record : records) {
+ * System.out.printf("offset = %d, key = %s, value = %s",
+ * record.offset(), record.key(), record.value()); String t =
+ * record.value();
+ *
+ * }
+ *
+ *
+ * fConsumer.commitSync(); fConsumer.close();
+ */
+
+ // fStream = streams.iterator().next();
+ }
+
+
+
+ private Consumer.Message makeMessage ( final ConsumerRecord<String,String> msg )
+ {
+ return new Consumer.Message()
+ {
+ @Override
+ public long getOffset ()
+ {
+ return msg.offset ();
+ }
+
+ @Override
+ public String getMessage ()
+ {
+ return new String ( msg.value () );
+ }
+ };
+ }
+
+ @Override
+ public synchronized Consumer.Message nextMessage ()
+ {
+
+ try
+ {
+ if ( fPendingMsgs.size () > 0 )
+ {
+ return makeMessage ( fPendingMsgs.take () );
+ }
+ }
+ catch ( InterruptedException x )
+ {
+ log.warn ( "After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage () + ")", x );
+ }
+
+
+ try
+ {
+ boolean foundMsgs = false;
+ System.out.println("entering into pollingWWWWWWWWWWWWWWWWW");
+ final ConsumerRecords<String,String> records = kConsumer.poll ( 100 );
+ System.out.println("polling doneXXXXXXXXXXXXXXXXXXXXXXXXXXX....");
+ for ( ConsumerRecord<String,String> record : records )
+ {
+ foundMsgs = true;
+ fPendingMsgs.offer ( record );
+ }
+
+ }
+ catch ( KafkaException x )
+ {
+ log.debug ( fLogTag + ": KafkaException " + x.getMessage () );
+
+ }
+ catch ( java.lang.IllegalStateException | java.lang.IllegalArgumentException x )
+ {
+ log.error ( fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. " + x.getMessage () );
+
+ }
+
+ return null;
+ }
+
+
+
+ /**
+ * getName() method returns string type value. returns 3 parameters in
+ * string:- fTopic,fGroup,fId
+ *
+ * @Override
+ */
+ public String getName() {
+ return fTopic + " : " + fGroup + " : " + fId;
+ }
+
+ /**
+ * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
+ * variable value
+ *
+ * @Override
+ *
+ */
+ public long getCreateTimeMs() {
+ return fCreateTimeMs;
+ }
+
+ public org.apache.kafka.clients.consumer.KafkaConsumer getConsumer() {
+ return kConsumer;
+ }
+
+ /**
+ * getLastAccessMs() method returns long type value. returns fLastTouch
+ * variable value
+ *
+ * @Override
+ *
+ */
+ public long getLastAccessMs() {
+ return fLastTouch;
+ }
+
+
+
+ /**
+ * getOffset() method returns long type value. returns offset variable value
+ *
+ * @Override
+ *
+ */
+ public long getOffset() {
+ return offset;
+ }
+
+ /**
+ * commit offsets commitOffsets() method will be called on closed of
+ * KafkaConsumer.
+ *
+ * @Override
+ *
+ *
+ * public void commitOffsets() { if (getState() ==
+ * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
+ * on closed KafkaConsumer " + getName()); return; }
+ * fConnector.commitOffsets(); }
+ */
+
+ /**
+ * updating fLastTouch with current time in ms
+ */
+ public void touch() {
+ fLastTouch = System.currentTimeMillis();
+ }
+
+ /**
+ * getLastTouch() method returns long type value. returns fLastTouch
+ * variable value
+ *
+ */
+ public long getLastTouch() {
+ return fLastTouch;
+ }
+
+ /**
+ * setting the kafkaConsumer state to closed
+ */
+ public synchronized boolean close() {
+
+ if (getState() == KafkaConsumer.State.CLOSED) {
+
+ log.warn("close() called on closed KafkaConsumer " + getName());
+ return true;
+ }
+
+ setState(KafkaConsumer.State.CLOSED);
+ // fConnector.shutdown();
+ boolean retVal = kafkaConnectorshuttask();
+ return retVal;
+
+ }
+
+ /* time out if the kafka shutdown fails for some reason */
+
+ private boolean kafkaConnectorshuttask() {
+ Callable<Boolean> run = new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ // your code to be timed
+ try {
+ System.out.println("consumer closing....." + kConsumer);
+ kConsumer.close();
+ } catch (Exception e) {
+ log.info("@@@@@@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
+ }
+ log.info("Kafka connection closure with in 15 seconds by a Executors task");
+ return true;
+ }
+ };
+
+ RunnableFuture future = new FutureTask(run);
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ service.execute(future);
+ Boolean result = null;
+ try {
+ result = (Boolean) future.get(15, TimeUnit.SECONDS); // wait 1
+ // second
+ } catch (TimeoutException ex) {
+ // timed out. Try to stop the code if possible.
+ log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task");
+ future.cancel(true);
+ } catch (Exception ex) {
+ // timed out. Try to stop the code if possible.
+ log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task" + ex);
+ future.cancel(true);
+ return false;
+ }
+ service.shutdown();
+ return true;
+ }
+
+ /**
+ * getConsumerGroup() returns Consumer group
+ *
+ * @return
+ */
+ public String getConsumerGroup() {
+ return fGroup;
+ }
+
+ /**
+ * getConsumerId returns Consumer Id
+ *
+ * @return
+ */
+ public String getConsumerId() {
+ return fId;
+ }
+
+ /**
+ * getState returns kafkaconsumer state
+ *
+ * @return
+ */
+ private KafkaConsumer.State getState() {
+ return this.state;
+ }
+
+ /**
+ * setState() sets the kafkaConsumer state
+ *
+ * @param state
+ */
+ private void setState(KafkaConsumer.State state) {
+ this.state = state;
+ }
+
+ // private ConsumerConnector fConnector;
+ private final String fTopic;
+ private final String fGroup;
+ private final String fId;
+ private final String fLogTag;
+ // private final KafkaStream<byte[], byte[]> fStream;
+ private final org.apache.kafka.clients.consumer.KafkaConsumer<String, String> kConsumer;
+ private long fCreateTimeMs;
+ private long fLastTouch;
+ private long offset;
+ private KafkaConsumer.State state;
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumer.class);
+ private final LinkedBlockingQueue<ConsumerRecord<String,String>> fPendingMsgs;
+ // private static final Logger log =
+ // LoggerFactory.getLogger(KafkaConsumer.class);
+
+ @Override
+ public void commitOffsets() {
+ if (getState() == KafkaConsumer.State.CLOSED) {
+ log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
+ return;
+ }
+ kConsumer.commitSync();
+ // fConsumer.close();
+
+ }
+
+
+
+ @Override
+ public void setOffset(long offsetval) {
+ // TODO Auto-generated method stub
+ offset = offsetval;
+ }
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
new file mode 100644
index 0000000..4340cae
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
@@ -0,0 +1,749 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.kafka;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Resource;
+
+import org.I0Itec.zkclient.exception.ZkException;
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.http.annotation.NotThreadSafe;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.ComponentScan;
+
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.dmf.mr.backends.Consumer;
+import com.att.dmf.mr.backends.MetricsSet;
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.dmf.mr.exception.DMaaPErrorMessages;
+import com.att.dmf.mr.utils.ConfigurationReader;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.metrics.CdmTimer;
+
+/**
+ * @NotThreadSafe but expected to be used within KafkaConsumerFactory, which
+ * must be
+ * @author peter
+ *
+ */
+@NotThreadSafe
+public class KafkaConsumerCache {
+
+ private static KafkaConsumerCache kafkaconscache = null;
+
+ public static KafkaConsumerCache getInstance() {
+ if (kafkaconscache == null)
+ kafkaconscache = new KafkaConsumerCache();
+
+ return kafkaconscache;
+ }
+
+ private static final String kSetting_ConsumerHandoverWaitMs = "cambria.consumer.cache.handoverWaitMs";
+ private static final int kDefault_ConsumerHandoverWaitMs = 500;
+
+ private static final String kSetting_SweepEverySeconds = "cambria.consumer.cache.sweepFreqSeconds";
+ private static final String kSetting_TouchEveryMs = "cambria.consumer.cache.touchFreqMs";
+
+ private static final String kSetting_ZkBasePath = "cambria.consumer.cache.zkBasePath";
+ private static final String kDefault_ZkBasePath = CambriaConstants.kDefault_ZkRoot + "/consumerCache";
+
+ // kafka defaults to timing out a client after 6 seconds of inactivity, but
+ // it heartbeats even when the client isn't fetching. Here, we don't
+ // want to prematurely rebalance the consumer group. Assuming clients are
+ // hitting
+ // the server at least every 30 seconds, timing out after 2 minutes should
+ // be okay.
+ // FIXME: consider allowing the client to specify its expected call rate?
+ private static final long kDefault_MustTouchEveryMs = 1000 * 60 * 2;
+
+ // check for expirations pretty regularly
+ private static final long kDefault_SweepEverySeconds = 15;
+
+ private enum Status {
+ NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
+ }
+
+ // @Qualifier("kafkalockavoid")
+
+ // @Resource
+ // @Qualifier("kafkalockavoid")
+ // KafkaLiveLockAvoider2 kafkaLiveLockAvoider;
+
+ @Autowired
+ private DMaaPErrorMessages errorMessages;
+
+ // KafkaLiveLockAvoider kafkaLiveLockAvoider = new KafkaLiveLockAvoider();
+ /**
+ * User defined exception class for kafka consumer cache
+ *
+ * @author nilanjana.maity
+ *
+ */
+ public class KafkaConsumerCacheException extends Exception {
+ /**
+ * To throw the exception
+ *
+ * @param t
+ */
+ KafkaConsumerCacheException(Throwable t) {
+ super(t);
+ }
+
+ /**
+ *
+ * @param s
+ */
+ public KafkaConsumerCacheException(String s) {
+ super(s);
+ }
+
+ private static final long serialVersionUID = 1L;
+ }
+
+ /**
+ * Creates a KafkaConsumerCache object. Before it is used, you must call
+ * startCache()
+ *
+ * @param apiId
+ * @param s
+ * @param metrics
+ */
+ public KafkaConsumerCache() {
+
+ String strkSetting_ZkBasePath = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ kSetting_ZkBasePath);
+ if (null == strkSetting_ZkBasePath)
+ strkSetting_ZkBasePath = kDefault_ZkBasePath;
+ fBaseZkPath = strkSetting_ZkBasePath;
+
+ fConsumers = new ConcurrentHashMap<String, Kafka011Consumer>();
+ fSweepScheduler = Executors.newScheduledThreadPool(1);
+
+ curatorConsumerCache = null;
+
+ status = Status.NOT_STARTED;
+ // Watcher for consumer rebalancing across nodes. Kafka011 rebalancing
+ // work around
+
+ listener = new ConnectionStateListener() {
+ public void stateChanged(CuratorFramework client, ConnectionState newState) {
+ if (newState == ConnectionState.LOST) {
+
+ log.info("ZooKeeper connection expired");
+ handleConnectionLoss();
+ } else if (newState == ConnectionState.READ_ONLY) {
+ log.warn("ZooKeeper connection set to read only mode.");
+ } else if (newState == ConnectionState.RECONNECTED) {
+ log.info("ZooKeeper connection re-established");
+ handleReconnection();
+ } else if (newState == ConnectionState.SUSPENDED) {
+ log.warn("ZooKeeper connection has been suspended.");
+ handleConnectionSuspended();
+ }
+ }
+ };
+ }
+
+ /**
+ * Start the cache service. This must be called before any get/put
+ * operations.
+ *
+ * @param mode
+ * DMAAP or cambria
+ * @param curator
+ * @throws IOException
+ * @throws KafkaConsumerCacheException
+ */
+ public void startCache(String mode, CuratorFramework curator) throws KafkaConsumerCacheException {
+
+ if (fApiId == null) {
+ throw new IllegalArgumentException("API Node ID must be specified.");
+ }
+
+ try {
+
+ if (mode != null && mode.equals(CambriaConstants.DMAAP)) {
+ curator = getCuratorFramework(curator);
+ }
+ curator.getConnectionStateListenable().addListener(listener);
+ setStatus(Status.CONNECTED);
+ curatorConsumerCache = new PathChildrenCache(curator, fBaseZkPath, true);
+ curatorConsumerCache.start();
+ curatorConsumerCache.getListenable().addListener(new PathChildrenCacheListener() {
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
+ switch (event.getType()) {
+ case CHILD_ADDED: {
+ try {
+ final String apiId = new String(event.getData().getData());
+ final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
+
+ log.info(apiId + " started consumer " + consumer);
+ } catch (Exception ex) {
+ log.info("#Error Occured during Adding child" + ex);
+ }
+ break;
+ }
+ case CHILD_UPDATED: {
+ final String apiId = new String(event.getData().getData());
+ final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
+
+ if (fConsumers.containsKey(consumer)) {
+ log.info(apiId + " claimed consumer " + consumer + " from " + fApiId
+ + " but wont hand over");
+ // Commented so that it dont give the connection
+ // until the active node is running for this client
+ // id.
+ dropClaimedConsumer(consumer);
+ }
+
+ break;
+ }
+ case CHILD_REMOVED: {
+ final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
+
+ if (fConsumers.containsKey(consumer)) {
+ log.info("Someone wanted consumer " + consumer
+ + " gone; but not removing it from the cache");
+ dropConsumer(consumer, false);
+ }
+
+ break;
+ }
+
+ default:
+ break;
+ }
+ }
+ });
+
+ // initialize the ZK path
+ EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
+ ensurePath.ensure(curator.getZookeeperClient());
+
+ // final long freq = fSettings.getLong(kSetting_SweepEverySeconds,
+ // kDefault_SweepEverySeconds);
+ long freq = kDefault_SweepEverySeconds;
+ String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ kSetting_SweepEverySeconds);
+ if (null != strkSetting_SweepEverySeconds) {
+ freq = Long.parseLong(strkSetting_SweepEverySeconds);
+ }
+
+ fSweepScheduler.scheduleAtFixedRate(new sweeper(), freq, freq, TimeUnit.SECONDS);
+ log.info("KafkaConsumerCache started");
+ log.info("sweeping cached clients every " + freq + " seconds");
+ } catch (ZkException e) {
+ log.error("@@@@@@ ZK Exception occured for " + e);
+ throw new KafkaConsumerCacheException(e);
+ } catch (Exception e) {
+ log.error("@@@@@@ Exception occured for " + e);
+ throw new KafkaConsumerCacheException(e);
+ }
+ }
+
+ /**
+ * Getting the curator oject to start the zookeeper connection estabished
+ *
+ * @param curator
+ * @return curator object
+ */
+ public static CuratorFramework getCuratorFramework(CuratorFramework curator) {
+ if (curator.getState() == CuratorFrameworkState.LATENT) {
+ curator.start();
+
+ try {
+ curator.blockUntilConnected();
+ } catch (InterruptedException e) {
+ // Ignore
+ log.error("error while setting curator framework :" + e.getMessage());
+ }
+ }
+
+ return curator;
+ }
+
+ /**
+ * Stop the cache service.
+ */
+ public void stopCache() {
+ setStatus(Status.DISCONNECTED);
+
+ final CuratorFramework curator = ConfigurationReader.getCurator();
+
+ if (curator != null) {
+ try {
+ curator.getConnectionStateListenable().removeListener(listener);
+ curatorConsumerCache.close();
+ log.info("Curator client closed");
+ } catch (ZkInterruptedException e) {
+ log.warn("Curator client close interrupted: " + e.getMessage());
+ } catch (IOException e) {
+ log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache" + e.getMessage());
+ }
+
+ curatorConsumerCache = null;
+ }
+
+ if (fSweepScheduler != null) {
+ fSweepScheduler.shutdownNow();
+ log.info("cache sweeper stopped");
+ }
+
+ if (fConsumers != null) {
+ fConsumers.clear();
+ fConsumers = null;
+ }
+
+ setStatus(Status.NOT_STARTED);
+
+ log.info("Consumer cache service stopped");
+ }
+
+ /**
+ * Get a cached consumer by topic, group, and id, if it exists (and remains
+ * valid) In addition, this method waits for all other consumer caches in
+ * the cluster to release their ownership and delete their version of this
+ * consumer.
+ *
+ * @param topic
+ * @param consumerGroupId
+ * @param clientId
+ * @return a consumer, or null
+ */
+ public Kafka011Consumer getConsumerFor(String topic, String consumerGroupId, String clientId)
+ throws KafkaConsumerCacheException {
+ if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
+ throw new KafkaConsumerCacheException("The cache service is unavailable.");
+
+ final String consumerKey = makeConsumerKey(topic, consumerGroupId, clientId);
+ final Kafka011Consumer kc = fConsumers.get(consumerKey);
+
+ if (kc != null) {
+ log.debug("Consumer cache hit for [" + consumerKey + "], last was at " + kc.getLastTouch());
+ kc.touch();
+ fMetrics.onKafkaConsumerCacheHit();
+ } else {
+ log.debug("Consumer cache miss for [" + consumerKey + "]");
+ fMetrics.onKafkaConsumerCacheMiss();
+ }
+
+ return kc;
+ }
+
+ /**
+ * Get a cached consumer by topic, group, and id, if it exists (and remains
+ * valid) In addition, this method waits for all other consumer caches in
+ * the cluster to release their ownership and delete their version of this
+ * consumer.
+ *
+ * @param topic
+ * @param consumerGroupId
+ * @param clientId
+ * @return a consumer, or null
+ */
+ public ArrayList<Kafka011Consumer> getConsumerListForCG(String topicgroup, String clientId)
+ throws KafkaConsumerCacheException {
+ if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
+ throw new KafkaConsumerCacheException("The cache service is unavailable.");
+ ArrayList<Kafka011Consumer> kcl = new ArrayList<Kafka011Consumer>();
+ // final String consumerKey = makeConsumerKey(topic, consumerGroupId,
+ // clientId);
+ Enumeration<String> strEnum = fConsumers.keys();
+ String consumerLocalKey = null;
+ while (strEnum.hasMoreElements()) {
+ consumerLocalKey = strEnum.nextElement();
+
+ if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) {
+
+ // System.out.println("consumer key returning from
+ // getConsumerListforCG +++++++++ " + consumerLocalKey
+ // + " " + fConsumers.get(consumerLocalKey));
+ kcl.add(fConsumers.get(consumerLocalKey));
+
+ }
+ }
+
+ return kcl;
+ }
+
+ public ArrayList<Kafka011Consumer> getConsumerListForCG(String group) throws KafkaConsumerCacheException {
+ if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
+ throw new KafkaConsumerCacheException("The cache service is unavailable.");
+ ArrayList<Kafka011Consumer> kcl = new ArrayList<Kafka011Consumer>();
+ // final String consumerKey = makeConsumerKey(topic, consumerGroupId,
+ // clientId);
+ Enumeration<String> strEnum = fConsumers.keys();
+ String consumerLocalKey = null;
+ while (strEnum.hasMoreElements()) {
+ consumerLocalKey = strEnum.nextElement();
+
+ if (consumerLocalKey.startsWith(group)) {
+
+ // System.out.println("consumer key returning from
+ // getConsumerListforCG +++++++++ " + consumerLocalKey
+ // + " " + fConsumers.get(consumerLocalKey));
+ kcl.add(fConsumers.get(consumerLocalKey));
+
+ }
+ }
+
+ return kcl;
+ }
+
+ /**
+ * Put a consumer into the cache by topic, group and ID
+ *
+ * @param topic
+ * @param consumerGroupId
+ * @param consumerId
+ * @param consumer
+ * @throws KafkaConsumerCacheException
+ */
+ public void putConsumerFor(String topic, String consumerGroupId, String consumerId, Kafka011Consumer consumer)
+ throws KafkaConsumerCacheException {
+ if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
+ throw new KafkaConsumerCacheException("The cache service is unavailable.");
+
+ final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
+ fConsumers.put(consumerKey, consumer);
+
+ // String appId = "node-instance-"+i;
+
+ log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId);
+ }
+
+ public Collection<? extends Consumer> getConsumers() {
+ return new LinkedList<Kafka011Consumer>(fConsumers.values());
+ }
+
+ /**
+ * This method is to drop all the consumer
+ */
+ public void dropAllConsumers() {
+ for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
+ dropConsumer(entry.getKey(), true);
+ }
+
+ // consumers should be empty here
+ if (fConsumers.size() > 0) {
+ log.warn("During dropAllConsumers, the consumer map is not empty.");
+ fConsumers.clear();
+ }
+ }
+
+ /**
+ * Drop a consumer from our cache due to a timeout
+ *
+ * @param key
+ */
+ private void dropTimedOutConsumer(String key) {
+ fMetrics.onKafkaConsumerTimeout();
+
+ if (!fConsumers.containsKey(key)) {
+ log.warn("Attempted to drop a timed out consumer which was not in our cache: " + key);
+ return;
+ }
+
+ // First, drop this consumer from our cache
+ boolean isdrop = dropConsumer(key, true);
+ if (!isdrop) {
+ return;
+ }
+ final CuratorFramework curator = ConfigurationReader.getCurator();
+
+ try {
+ curator.delete().guaranteed().forPath(fBaseZkPath + "/" + key);
+ log.info(" ^ deleted " + fBaseZkPath + "/" + key);
+ } catch (NoNodeException e) {
+ log.warn("A consumer was deleted from " + fApiId
+ + "'s cache, but no Cambria API node had ownership of it in ZooKeeper");
+ } catch (Exception e) {
+ log.debug("Unexpected exception while deleting consumer: " + e.getMessage());
+ log.info(" %%%%%%@# Unexpected exception while deleting consumer: " + e.getMessage());
+ }
+
+ try {
+ int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
+ String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ kSetting_ConsumerHandoverWaitMs);
+ if (strkSetting_ConsumerHandoverWaitMs != null)
+ consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
+ Thread.sleep(consumerHandoverWaitMs);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ log.info("Dropped " + key + " consumer due to timeout");
+ }
+
+ /**
+ * Drop a consumer from our cache due to another API node claiming it as
+ * their own.
+ *
+ * @param key
+ */
+ private void dropClaimedConsumer(String key) {
+ // if the consumer is still in our cache, it implies a claim.
+ if (fConsumers.containsKey(key)) {
+ fMetrics.onKafkaConsumerClaimed();
+ log.info("Consumer [" + key + "] claimed by another node.");
+ }
+ log.info("^dropping claimed Kafka consumer " + key);
+ dropConsumer(key, false);
+ }
+
+ /**
+ * Removes the consumer from the cache and closes its connection to the
+ * kafka broker(s).
+ *
+ * @param key
+ * @param dueToTimeout
+ */
+ private boolean dropConsumer(String key, boolean dueToTimeout) {
+ final Kafka011Consumer kc = fConsumers.get(key);
+ log.info("closing Kafka consumer " + key + " object " + kc);
+ if (kc != null) {
+ // log.info("closing Kafka consumer " + key);
+ if (kc.close()) {
+ fConsumers.remove(key);
+
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // private final rrNvReadable fSettings;
+ private MetricsSet fMetrics;
+ private final String fBaseZkPath;
+ private final ScheduledExecutorService fSweepScheduler;
+ private String fApiId;
+
+ public void setfMetrics(final MetricsSet metrics) {
+ this.fMetrics = metrics;
+ }
+
+ public void setfApiId(final String id) {
+ this.fApiId = id;
+ }
+
+ private final ConnectionStateListener listener;
+
+ private ConcurrentHashMap<String, Kafka011Consumer> fConsumers;
+ private PathChildrenCache curatorConsumerCache;
+
+ private volatile Status status;
+
+ private void handleReconnection() {
+
+ log.info("Reading current cache data from ZK and synchronizing local cache");
+ final List<ChildData> cacheData = curatorConsumerCache.getCurrentData();
+ // Remove all the consumers in this API nodes cache that now belong to
+ // other API nodes.
+ for (ChildData cachedConsumer : cacheData) {
+ final String consumerId = ZKPaths.getNodeFromPath(cachedConsumer.getPath());
+ final String owningApiId = (cachedConsumer.getData() != null) ? new String(cachedConsumer.getData())
+ : "undefined";
+ if (!fApiId.equals(owningApiId)) {
+ fConsumers.remove(consumerId); // Commented to avoid removing
+ // the value cache hashmap but the lock still exists.
+ // This is not considered in kafka consumer Factory
+ log.info("@@@ Validating current cache data from ZK and synchronizing local cache" + owningApiId
+ + " removing " + consumerId);
+ }
+ }
+
+ setStatus(Status.CONNECTED);
+ }
+
+ private void handleConnectionSuspended() {
+ log.info("Suspending cache until ZK connection is re-established");
+
+ setStatus(Status.SUSPENDED);
+ }
+
+ private void handleConnectionLoss() {
+ log.info("Clearing consumer cache (shutting down all Kafka consumers on this node)");
+
+ setStatus(Status.DISCONNECTED);
+
+ closeAllCachedConsumers();
+ fConsumers.clear();
+ }
+
+ private void closeAllCachedConsumers() {
+ for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
+ try {
+ entry.getValue().close();
+ } catch (Exception e) {
+ log.info("@@@@@@ Error occurd while closing Clearing All cache " + e);
+ }
+ }
+ }
+
+ private static String makeConsumerKey(String topic, String consumerGroupId, String clientId) {
+ return topic + "::" + consumerGroupId + "::" + clientId;
+ }
+
+ /**
+ * This method is to get a lock
+ *
+ * @param topic
+ * @param consumerGroupId
+ * @param consumerId
+ * @throws KafkaConsumerCacheException
+ */
+ public void signalOwnership(final String topic, final String consumerGroupId, final String consumerId)
+ throws KafkaConsumerCacheException {
+ // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
+ final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
+ final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership");
+
+ try {
+ final String consumerPath = fBaseZkPath + "/" + consumerKey;
+ log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
+ final CuratorFramework curator = ConfigurationReader.getCurator();
+
+ try {
+ curator.setData().forPath(consumerPath, fApiId.getBytes());
+ } catch (KeeperException.NoNodeException e) {
+ curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes());
+ }
+ log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey);
+ timer.end();
+ } catch (Exception e) {
+ log.error(fApiId + " failed to claim ownership of consumer " + consumerKey);
+ throw new KafkaConsumerCacheException(e);
+ }
+
+ log.info("Backing off to give the Kafka broker time to clean up the ZK data for this consumer");
+
+ try {
+ int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
+ String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ kSetting_ConsumerHandoverWaitMs);
+ if (strkSetting_ConsumerHandoverWaitMs != null)
+ consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
+ Thread.sleep(consumerHandoverWaitMs);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+
+ public KafkaLiveLockAvoider2 getkafkaLiveLockAvoiderObj() {
+ return null;
+ }
+
+ public void sweep() {
+ final LinkedList<String> removals = new LinkedList<String>();
+ long mustTouchEveryMs = kDefault_MustTouchEveryMs;
+ String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ kSetting_TouchEveryMs);
+ if (null != strkSetting_TouchEveryMs) {
+ mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
+ }
+
+ // final long mustTouchEveryMs =
+ // fSettings.getLong(kSetting_TouchEveryMs, kDefault_MustTouchEveryMs);
+ final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
+
+ for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) {
+ final long lastTouchMs = e.getValue().getLastTouch();
+ log.debug("consumer #####1" + e.getKey() + " " + lastTouchMs + " < " + oldestAllowedTouchMs);
+
+ if (lastTouchMs < oldestAllowedTouchMs) {
+ log.info("consumer " + e.getKey() + " has expired");
+ removals.add(e.getKey());
+ }
+ }
+
+ for (String key : removals) {
+ dropTimedOutConsumer(key);
+ }
+ }
+
+ /**
+ * Creating a thread to run the sweep method
+ *
+ * @author nilanjana.maity
+ *
+ */
+ private class sweeper implements Runnable {
+ /**
+ * run method
+ */
+ public void run() {
+ sweep();
+ }
+ }
+
+ /**
+ * This method is to drop consumer
+ *
+ * @param topic
+ * @param consumerGroup
+ * @param clientId
+ */
+ public void dropConsumer(String topic, String consumerGroup, String clientId) {
+ dropConsumer(makeConsumerKey(topic, consumerGroup, clientId), false);
+ }
+
+ private Status getStatus() {
+ return this.status;
+ }
+
+ private void setStatus(Status status) {
+ this.status = status;
+ }
+
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
+ // private static final Logger log =
+ // LoggerFactory.getLogger(KafkaConsumerCache.class);
+} \ No newline at end of file
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java
new file mode 100644
index 0000000..805701a
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java
@@ -0,0 +1,161 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.kafka;
+
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Watcher;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
+
+//@ComponentScan(basePackages="com.att.dmf.mr.backends.kafka")
+@Component
+public class KafkaLiveLockAvoider2 {
+
+ public static final String ZNODE_ROOT = "/live-lock-avoid";
+ public static final String ZNODE_LOCKS = "/locks";
+ public static final String ZNODE_UNSTICK_TASKS ="/unstick-tasks";
+
+ private static String locksPath = ZNODE_ROOT+ZNODE_LOCKS;
+ private static String tasksPath = ZNODE_ROOT+ZNODE_UNSTICK_TASKS;
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaLiveLockAvoider2.class.getName());
+
+ @Autowired
+ @Qualifier("curator")
+ private CuratorFramework curatorFramework;
+
+ @PostConstruct
+ public void init() {
+ System.out.println("Welcome......................................................................................");
+ try {
+ if (curatorFramework.checkExists().forPath(locksPath) == null) {
+ curatorFramework.create().creatingParentsIfNeeded().forPath(locksPath);
+ }
+ if (curatorFramework.checkExists().forPath(tasksPath) == null) {
+ curatorFramework.create().creatingParentsIfNeeded().forPath(tasksPath);
+ }
+
+ } catch (Exception e) {
+ //e.printStackTrace();
+ log.error("Error during creation of permanent Znodes under /live-lock-avoid ",e);
+
+ }
+
+
+ }
+ public void unlockConsumerGroup(String appId, String groupName) throws Exception {
+
+ log.info("Signalling unlock to all conumsers of in group [{}] now, " , groupName);
+
+ String fullLockPath = String.format("%s/%s", locksPath, groupName );
+ String fullTasksPath = null;
+
+ try {
+
+ //Use the Curator recipe for a Mutex lock, only one process can be broadcasting unlock instructions for a group
+ InterProcessMutex lock = new InterProcessMutex(curatorFramework, fullLockPath);
+ if ( lock.acquire(100L, TimeUnit.MILLISECONDS) )
+ {
+ try
+ {
+ List<String> taskNodes = curatorFramework.getChildren().forPath(tasksPath);
+ for (String taskNodeName : taskNodes) {
+ if(!taskNodeName.equals(appId)) {
+
+ fullTasksPath = String.format("%s/%s/%s", tasksPath, taskNodeName, groupName);
+ log.info("Writing groupName {} to path {}",groupName, fullTasksPath);
+
+
+ if(curatorFramework.checkExists().forPath(fullTasksPath) != null) {
+ curatorFramework.delete().forPath(fullTasksPath);
+ }
+ curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(fullTasksPath);
+ }
+ }
+
+
+ }
+ finally
+ {
+ //Curator lock recipe requires a acquire() to be followed by a release()
+ lock.release();
+ }
+ }else {
+ log.info("Could not obtain the avoider lock, another process has the avoider lock? {}", !lock.isAcquiredInThisProcess() );
+ }
+
+
+ } catch (Exception e) {
+ log.error("Error setting up either lock ZNode {} or task ZNode {}",fullLockPath, fullTasksPath,e);
+ throw e;
+ }
+
+
+ }
+
+ /*
+ * Shoud be called once per MR server instance.
+ *
+ */
+ public void startNewWatcherForServer(String appId, LiveLockAvoidance avoidanceCallback) {
+ LockInstructionWatcher instructionWatcher = new LockInstructionWatcher(curatorFramework,avoidanceCallback,this);
+ assignNewProcessNode(appId, instructionWatcher);
+
+ }
+
+
+ protected void assignNewProcessNode(String appId, Watcher processNodeWatcher ) {
+
+ String taskHolderZnodePath = ZNODE_ROOT+ZNODE_UNSTICK_TASKS+"/"+appId;
+ //Watcher processNodeWatcher = createWatcher();
+
+ try {
+
+ if(curatorFramework.checkExists().forPath(taskHolderZnodePath) != null) {
+ curatorFramework.delete().deletingChildrenIfNeeded().forPath(taskHolderZnodePath);
+
+ }
+ curatorFramework.create().forPath(taskHolderZnodePath);
+ //setup the watcher
+ curatorFramework.getChildren().usingWatcher(processNodeWatcher).inBackground().forPath(taskHolderZnodePath);
+ log.info("Done creating task holder and watcher for APP name: {}",appId);
+
+ } catch (Exception e) {
+ log.error("Could not add new processing node for name {}", appId, e);
+ }
+
+ }
+
+
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
new file mode 100644
index 0000000..30209f0
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
@@ -0,0 +1,235 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.kafka;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.json.JSONException;
+import org.springframework.beans.factory.annotation.Qualifier;
+
+import com.att.dmf.mr.backends.Publisher;
+import com.att.dmf.mr.constants.CambriaConstants;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+
+//import kafka.FailedToSendMessageException;
+//import kafka.javaapi.producer.Producer;
+//import kafka.producer.KeyedMessage;
+//import kafka.producer.ProducerConfig;
+//import kafka.producer.KeyedMessage;
+
+/**
+ * Sends raw JSON objects into Kafka.
+ *
+ * Could improve space: BSON rather than JSON?
+ *
+ * @author peter
+ *
+ */
+
+public class KafkaPublisher implements Publisher {
+ /**
+ * constructor initializing
+ *
+ * @param settings
+ * @throws rrNvReadable.missingReqdSetting
+ */
+ public KafkaPublisher(@Qualifier("propertyReader") rrNvReadable settings) throws rrNvReadable.missingReqdSetting {
+ //fSettings = settings;
+
+ final Properties props = new Properties();
+ /*transferSetting(fSettings, props, "metadata.broker.list", "localhost:9092");
+ transferSetting(fSettings, props, "request.required.acks", "1");
+ transferSetting(fSettings, props, "message.send.max.retries", "5");
+ transferSetting(fSettings, props, "retry.backoff.ms", "150"); */
+ String kafkaConnUrl= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka.metadata.broker.list");
+ if(null==kafkaConnUrl){
+
+ kafkaConnUrl="localhost:9092";
+ }
+ //String jaaspath="C:/ATT/Apps/dmaapCodedmaap-framework/dmaap/bundleconfig-local/etc/appprops/kafka_pub_jaas.conf";
+ // props.put("bootstrap.servers", bootSever);
+ //System.setProperty("java.security.auth.login.config",jaaspath);
+
+ /*transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+ transferSetting( props, "security.protocol", "SASL_PLAINTEXT");
+ transferSetting( props, "sasl.mechanism", "PLAIN");*/
+ transferSetting( props, "bootstrap.servers",kafkaConnUrl);
+ //transferSetting( props, "metadata.broker.list", kafkaConnUrl);
+ transferSetting( props, "request.required.acks", "1");
+ transferSetting( props, "message.send.max.retries", "5");
+ transferSetting(props, "retry.backoff.ms", "150");
+
+ //props.put("serializer.class", "kafka.serializer.StringEncoder");
+
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+ //fConfig = new ProducerConfig(props);
+ //fProducer = new Producer<String, String>(fConfig);
+ fProducer = new KafkaProducer<>(props);
+ }
+
+ /**
+ * Send a message with a given topic and key.
+ *
+ * @param msg
+ * @throws FailedToSendMessageException
+ * @throws JSONException
+ */
+ @Override
+ public void sendMessage(String topic, message msg) throws IOException{
+ final List<message> msgs = new LinkedList<message>();
+ msgs.add(msg);
+ sendMessages(topic, msgs);
+ }
+
+ /**
+ * method publishing batch messages
+ * This method is commented from 0.8 to 0.11 upgrade
+ * @param topic
+ * @param kms
+ * throws IOException
+ *
+ public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException {
+ try {
+ fProducer.send(kms);
+
+ } catch (FailedToSendMessageException excp) {
+ log.error("Failed to send message(s) to topic [" + topic + "].", excp);
+ throw new FailedToSendMessageException(excp.getMessage(), excp);
+ }
+
+ } */
+
+
+ /*
+ * Kafka 11.0 Interface
+ * @see com.att.nsa.cambria.backends.Publisher#sendBatchMessageNew(java.lang.String, java.util.ArrayList)
+ */
+ public void sendBatchMessageNew(String topic, ArrayList <ProducerRecord<String,String>> kms) throws IOException {
+ try {
+ for (ProducerRecord<String,String> km : kms) {
+ fProducer.send(km);
+ }
+
+ } catch (Exception excp) {
+ log.error("Failed to send message(s) to topic [" + topic + "].", excp);
+ throw new IOException(excp.getMessage(), excp);
+ }
+
+ }
+
+ /**
+ * Send a set of messages. Each must have a "key" string value.
+ *
+ * @param topic
+ * @param msg
+ * @throws FailedToSendMessageException
+ * @throws JSONException
+ *
+ @Override
+ public void sendMessages(String topic, List<? extends message> msgs)
+ throws IOException, FailedToSendMessageException {
+ log.info("sending " + msgs.size() + " events to [" + topic + "]");
+
+ final List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(msgs.size());
+ for (message o : msgs) {
+ final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, o.getKey(), o.toString());
+ kms.add(data);
+ }
+ try {
+ fProducer.send(kms);
+
+ } catch (FailedToSendMessageException excp) {
+ log.error("Failed to send message(s) to topic [" + topic + "].", excp);
+ throw new FailedToSendMessageException(excp.getMessage(), excp);
+ }
+ } */
+ @Override
+ public void sendMessagesNew(String topic, List<? extends message> msgs)
+ throws IOException {
+ log.info("sending " + msgs.size() + " events to [" + topic + "]");
+try{
+ final List<ProducerRecord<String, String>> kms = new ArrayList<ProducerRecord<String, String>>(msgs.size());
+ for (message o : msgs) {
+
+ final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, o.getKey(), o.toString());
+ //kms.add(data);
+
+ try {
+
+ fProducer.send(data);
+
+ } catch (Exception excp) {
+ log.error("Failed to send message(s) to topic [" + topic + "].", excp);
+ throw new Exception(excp.getMessage(), excp);
+ }
+ }
+
+ }catch(Exception e){}
+}
+ //private final rrNvReadable fSettings;
+
+ //private ProducerConfig fConfig;
+ private Producer<String, String> fProducer;
+
+ /**
+ * It sets the key value pair
+ * @param topic
+ * @param msg
+ * @param key
+ * @param defVal
+ */
+ private void transferSetting(Properties props, String key, String defVal) {
+ String kafka_prop= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key);
+ if (null==kafka_prop) kafka_prop=defVal;
+ //props.put(key, settings.getString("kafka." + key, defVal));
+ props.put(key, kafka_prop);
+ }
+
+ //private static final Logger log = LoggerFactory.getLogger(KafkaPublisher.class);
+
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaPublisher.class);
+
+ @Override
+ public void sendMessages(String topic, List<? extends message> msgs) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ //@Override
+ //public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException {
+ // TODO Auto-generated method stub
+
+ //}
+} \ No newline at end of file
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/LiveLockAvoidance.java b/src/main/java/com/att/dmf/mr/backends/kafka/LiveLockAvoidance.java
new file mode 100644
index 0000000..a13ecea
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/LiveLockAvoidance.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.kafka;
+
+
+
+/**
+ * Live Lock Avoidance interface. To be implemented by the main message router client
+ *
+ */
+public interface LiveLockAvoidance {
+
+ /**
+ * Gets the unique id
+ * @return the unique id for the Message Router server instance
+ */
+ String getAppId();
+
+
+ /**
+ * Main callback to inform the local MR server instance that all consumers in a group need to soft poll
+ * @param groupName name of the Kafka consumer group needed a soft poll
+ */
+ void handleRebalanceUnlock( String groupName);
+
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java b/src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java
new file mode 100644
index 0000000..5d3bc47
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java
@@ -0,0 +1,100 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.kafka;
+
+import java.util.List;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+/**
+ *
+ * LockInstructionWatcher
+ * A package-private class used internally by the KafkaLiveLockAvoider.
+ *
+ * This class implements the zookeeper Watcher callback and listens for changes on child nodes changing.
+ * Each child node is actually a Kafka group name that needs to be soft polled. Deletion of the child nodes
+ * after soft poll unlocking is finished.
+ *
+ *
+ */
+public class LockInstructionWatcher implements Watcher {
+
+ private CuratorFramework curatorFramework;
+ private LiveLockAvoidance avoidanceCallback;
+ private KafkaLiveLockAvoider2 avoider;
+
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(LockInstructionWatcher.class.getName());
+
+
+ public LockInstructionWatcher(CuratorFramework curatorFramework, LiveLockAvoidance avoidanceCallback,
+ KafkaLiveLockAvoider2 avoider) {
+ super();
+ this.curatorFramework = curatorFramework;
+ this.avoidanceCallback = avoidanceCallback;
+ this.avoider = avoider;
+ }
+
+
+ @Override
+ public void process(WatchedEvent event) {
+
+ switch (event.getType()) {
+ case NodeChildrenChanged:
+
+
+ try {
+
+ log.info("node children changed at path: {}", event.getPath());
+ //String grpName = new String(curatorFramework.getData().forPath(event.getPath()));
+ List<String> children = curatorFramework.getChildren().forPath(event.getPath());
+
+ log.info("found children nodes prodcessing now");
+ for (String child : children) {
+ String childPath = String.format("%s/%s", event.getPath(), child);
+ log.info("Processing child task at node {}",childPath);
+ avoidanceCallback.handleRebalanceUnlock( child);
+ log.info("Deleting child task at node {}",childPath);
+ curatorFramework.delete().forPath(childPath);
+ }
+ //reset the watch with the avoider
+ avoider.assignNewProcessNode(avoidanceCallback.getAppId(), this);
+
+
+ } catch (Exception e) {
+ log.error("Error manipulating ZNode data in watcher",e);
+ }
+
+ break;
+
+ default:
+ log.info("Listner fired on path: {}, with event: {}", event.getPath(), event.getType());
+ break;
+ }
+ }
+
+
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java
new file mode 100644
index 0000000..0c34bfd
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java
@@ -0,0 +1,182 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.memory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+
+import com.att.dmf.mr.CambriaApiException;
+import com.att.dmf.mr.backends.Consumer;
+import com.att.dmf.mr.backends.ConsumerFactory;
+/**
+ *
+ * @author anowarul.islam
+ *
+ */
+public class MemoryConsumerFactory implements ConsumerFactory
+{
+ /**
+ *
+ * Initializing constructor
+ * @param q
+ */
+ public MemoryConsumerFactory ( MemoryQueue q )
+ {
+ fQueue = q;
+ }
+
+ /**
+ *
+ * @param topic
+ * @param consumerGroupId
+ * @param clientId
+ * @param timeoutMs
+ * @return Consumer
+ */
+ @Override
+ public Consumer getConsumerFor ( String topic, String consumerGroupId, String clientId, int timeoutMs, String remotehost )
+ {
+ return new MemoryConsumer ( topic, consumerGroupId );
+ }
+
+ private final MemoryQueue fQueue;
+
+ /**
+ *
+ * Define nested inner class
+ *
+ */
+ private class MemoryConsumer implements Consumer
+ {
+ /**
+ *
+ * Initializing MemoryConsumer constructor
+ * @param topic
+ * @param consumer
+ *
+ */
+ public MemoryConsumer ( String topic, String consumer )
+ {
+ fTopic = topic;
+ fConsumer = consumer;
+ fCreateMs = System.currentTimeMillis ();
+ fLastAccessMs = fCreateMs;
+ }
+
+ @Override
+ /**
+ *
+ * return consumer details
+ */
+ public Message nextMessage ()
+ {
+ return fQueue.get ( fTopic, fConsumer );
+ }
+
+ private final String fTopic;
+ private final String fConsumer;
+ private final long fCreateMs;
+ private long fLastAccessMs;
+
+ @Override
+ public boolean close() {
+ //Nothing to close/clean up.
+ return true;
+ }
+ /**
+ *
+ */
+ public void commitOffsets()
+ {
+ // ignoring this aspect
+ }
+ /**
+ * get offset
+ */
+ public long getOffset()
+ {
+ return 0;
+ }
+
+ @Override
+ /**
+ * get consumer topic name
+ */
+ public String getName ()
+ {
+ return fTopic + "/" + fConsumer;
+ }
+
+ @Override
+ public long getCreateTimeMs ()
+ {
+ return fCreateMs;
+ }
+
+ @Override
+ public long getLastAccessMs ()
+ {
+ return fLastAccessMs;
+ }
+
+
+
+ @Override
+ public void setOffset(long offset) {
+ // TODO Auto-generated method stub
+
+ }
+
+
+ }
+
+ @Override
+ public void destroyConsumer(String topic, String consumerGroupId,
+ String clientId) {
+ //No cache for memory consumers, so NOOP
+ }
+
+ @Override
+ public void dropCache ()
+ {
+ // nothing to do - there's no cache here
+ }
+
+ @Override
+ /**
+ * @return ArrayList<MemoryConsumer>
+ */
+ public Collection<? extends Consumer> getConsumers ()
+ {
+ return new ArrayList<MemoryConsumer> ();
+ }
+
+ @Override
+ public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs,
+ String remotehost) throws UnavailableException, CambriaApiException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java
new file mode 100644
index 0000000..22f0588
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java
@@ -0,0 +1,198 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.memory;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import com.att.dmf.mr.metabroker.Broker;
+import com.att.dmf.mr.metabroker.Topic;
+import com.att.nsa.configs.ConfigDb;
+import com.att.nsa.security.NsaAcl;
+import com.att.nsa.security.NsaApiKey;
+
+/**
+ *
+ * @author anowarul.islam
+ *
+ */
+public class MemoryMetaBroker implements Broker {
+ /**
+ *
+ * @param mq
+ * @param configDb
+ * @param settings
+ */
+ public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb) {
+ //public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb, rrNvReadable settings) {
+ fQueue = mq;
+ fTopics = new HashMap<String, MemTopic>();
+ }
+
+ @Override
+ public List<Topic> getAllTopics() {
+ return new LinkedList<Topic>(fTopics.values());
+ }
+
+ @Override
+ public Topic getTopic(String topic) {
+ return fTopics.get(topic);
+ }
+
+ @Override
+ public Topic createTopic(String topic, String desc, String ownerApiId, int partitions, int replicas,
+ boolean transactionEnabled) throws TopicExistsException {
+ if (getTopic(topic) != null) {
+ throw new TopicExistsException(topic);
+ }
+ fQueue.createTopic(topic);
+ fTopics.put(topic, new MemTopic(topic, desc, ownerApiId, transactionEnabled));
+ return getTopic(topic);
+ }
+
+ @Override
+ public void deleteTopic(String topic) {
+ fTopics.remove(topic);
+ fQueue.removeTopic(topic);
+ }
+
+ private final MemoryQueue fQueue;
+ private final HashMap<String, MemTopic> fTopics;
+
+ private static class MemTopic implements Topic {
+ /**
+ * constructor initialization
+ *
+ * @param name
+ * @param desc
+ * @param owner
+ * @param transactionEnabled
+ */
+ public MemTopic(String name, String desc, String owner, boolean transactionEnabled) {
+ fName = name;
+ fDesc = desc;
+ fOwner = owner;
+ ftransactionEnabled = transactionEnabled;
+ fReaders = null;
+ fWriters = null;
+ }
+
+ @Override
+ public String getOwner() {
+ return fOwner;
+ }
+
+ @Override
+ public NsaAcl getReaderAcl() {
+ return fReaders;
+ }
+
+ @Override
+ public NsaAcl getWriterAcl() {
+ return fWriters;
+ }
+
+ @Override
+ public void checkUserRead(NsaApiKey user) throws AccessDeniedException {
+ if (fReaders != null && (user == null || !fReaders.canUser(user.getKey()))) {
+ throw new AccessDeniedException(user == null ? "" : user.getKey());
+ }
+ }
+
+ @Override
+ public void checkUserWrite(NsaApiKey user) throws AccessDeniedException {
+ if (fWriters != null && (user == null || !fWriters.canUser(user.getKey()))) {
+ throw new AccessDeniedException(user == null ? "" : user.getKey());
+ }
+ }
+
+ @Override
+ public String getName() {
+ return fName;
+ }
+
+ @Override
+ public String getDescription() {
+ return fDesc;
+ }
+
+ @Override
+ public void permitWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException {
+ if (!fOwner.equals(asUser.getKey())) {
+ throw new AccessDeniedException("User does not own this topic " + fName);
+ }
+ if (fWriters == null) {
+ fWriters = new NsaAcl();
+ }
+ fWriters.add(publisherId);
+ }
+
+ @Override
+ public void denyWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException {
+ if (!fOwner.equals(asUser.getKey())) {
+ throw new AccessDeniedException("User does not own this topic " + fName);
+ }
+ fWriters.remove(publisherId);
+ }
+
+ @Override
+ public void permitReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException {
+ if (!fOwner.equals(asUser.getKey())) {
+ throw new AccessDeniedException("User does not own this topic " + fName);
+ }
+ if (fReaders == null) {
+ fReaders = new NsaAcl();
+ }
+ fReaders.add(consumerId);
+ }
+
+ @Override
+ public void denyReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException {
+ if (!fOwner.equals(asUser.getKey())) {
+ throw new AccessDeniedException("User does not own this topic " + fName);
+ }
+ fReaders.remove(consumerId);
+ }
+
+ private final String fName;
+ private final String fDesc;
+ private final String fOwner;
+ private NsaAcl fReaders;
+ private NsaAcl fWriters;
+ private boolean ftransactionEnabled;
+
+ @Override
+ public boolean isTransactionEnabled() {
+ return ftransactionEnabled;
+ }
+
+ @Override
+ public Set<String> getOwners() {
+ final TreeSet<String> set = new TreeSet<String> ();
+ set.add ( fOwner );
+ return set;
+ }
+ }
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java
new file mode 100644
index 0000000..0629972
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java
@@ -0,0 +1,207 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.memory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import com.att.dmf.mr.backends.Consumer;
+import com.att.dmf.mr.backends.Publisher.message;
+
+/**
+ * When broker type is memory, then this class is doing all the topic related
+ * operations
+ *
+ * @author anowarul.islam
+ *
+ */
+public class MemoryQueue {
+ // map from topic to list of msgs
+ private HashMap<String, LogBuffer> fQueue;
+ private HashMap<String, HashMap<String, Integer>> fOffsets;
+
+ /**
+ * constructor storing hashMap objects in Queue and Offsets object
+ */
+ public MemoryQueue() {
+ fQueue = new HashMap<String, LogBuffer>();
+ fOffsets = new HashMap<String, HashMap<String, Integer>>();
+ }
+
+ /**
+ * method used to create topic
+ *
+ * @param topic
+ */
+ public synchronized void createTopic(String topic) {
+ LogBuffer q = fQueue.get(topic);
+ if (q == null) {
+ q = new LogBuffer(1024 * 1024);
+ fQueue.put(topic, q);
+ }
+ }
+
+ /**
+ * method used to remove topic
+ *
+ * @param topic
+ */
+ public synchronized void removeTopic(String topic) {
+ LogBuffer q = fQueue.get(topic);
+ if (q != null) {
+ fQueue.remove(topic);
+ }
+ }
+
+ /**
+ * method to write message on topic
+ *
+ * @param topic
+ * @param m
+ */
+ public synchronized void put(String topic, message m) {
+ LogBuffer q = fQueue.get(topic);
+ if (q == null) {
+ createTopic(topic);
+ q = fQueue.get(topic);
+ }
+ q.push(m.getMessage());
+ }
+
+ /**
+ * method to read consumer messages
+ *
+ * @param topic
+ * @param consumerName
+ * @return
+ */
+ public synchronized Consumer.Message get(String topic, String consumerName) {
+ final LogBuffer q = fQueue.get(topic);
+ if (q == null) {
+ return null;
+ }
+
+ HashMap<String, Integer> offsetMap = fOffsets.get(consumerName);
+ if (offsetMap == null) {
+ offsetMap = new HashMap<String, Integer>();
+ fOffsets.put(consumerName, offsetMap);
+ }
+ Integer offset = offsetMap.get(topic);
+ if (offset == null) {
+ offset = 0;
+ }
+
+ final msgInfo result = q.read(offset);
+ if (result != null && result.msg != null) {
+ offsetMap.put(topic, result.offset + 1);
+ }
+ return result;
+ }
+
+ /**
+ * static inner class used to details about consumed messages
+ *
+ * @author anowarul.islam
+ *
+ */
+ private static class msgInfo implements Consumer.Message {
+ /**
+ * published message which is consumed
+ */
+ public String msg;
+ /**
+ * offset associated with message
+ */
+ public int offset;
+
+ /**
+ * get offset of messages
+ */
+ @Override
+ public long getOffset() {
+ return offset;
+ }
+
+ /**
+ * get consumed message
+ */
+ @Override
+ public String getMessage() {
+ return msg;
+ }
+ }
+
+ /**
+ *
+ * @author sneha.d.desai
+ *
+ * private LogBuffer class has synchronized push and read method
+ */
+ private class LogBuffer {
+ private int fBaseOffset;
+ private final int fMaxSize;
+ private final ArrayList<String> fList;
+
+ /**
+ * constructor initializing the offset, maxsize and list
+ *
+ * @param maxSize
+ */
+ public LogBuffer(int maxSize) {
+ fBaseOffset = 0;
+ fMaxSize = maxSize;
+ fList = new ArrayList<String>();
+ }
+
+ /**
+ * pushing message
+ *
+ * @param msg
+ */
+ public synchronized void push(String msg) {
+ fList.add(msg);
+ while (fList.size() > fMaxSize) {
+ fList.remove(0);
+ fBaseOffset++;
+ }
+ }
+
+ /**
+ * reading messages
+ *
+ * @param offset
+ * @return
+ */
+ public synchronized msgInfo read(int offset) {
+ final int actual = Math.max(0, offset - fBaseOffset);
+
+ final msgInfo mi = new msgInfo();
+ mi.msg = (actual >= fList.size()) ? null : fList.get(actual);
+ if (mi.msg == null)
+ return null;
+
+ mi.offset = actual + fBaseOffset;
+ return mi;
+ }
+
+ }
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueuePublisher.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueuePublisher.java
new file mode 100644
index 0000000..2b43ed3
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueuePublisher.java
@@ -0,0 +1,92 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.memory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import com.att.dmf.mr.backends.Publisher;
+import com.att.dmf.mr.metabroker.Broker.TopicExistsException;
+
+
+
+/**
+ *
+ * @author anowarul.islam
+ *
+ */
+public class MemoryQueuePublisher implements Publisher {
+ /**
+ *
+ * @param q
+ * @param b
+ */
+ public MemoryQueuePublisher(MemoryQueue q, MemoryMetaBroker b) {
+ fBroker = b;
+ fQueue = q;
+ }
+
+
+ /**
+ *
+ * @param topic
+ * @param msg
+ * @throws IOException
+ */
+ @Override
+ public void sendMessage(String topic, message msg) throws IOException {
+ if (null == fBroker.getTopic(topic)) {
+ try {
+ fBroker.createTopic(topic, topic, null, 8, 3, false);
+ } catch (TopicExistsException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ fQueue.put(topic, msg);
+ }
+
+ @Override
+ /**
+ * @param topic
+ * @param msgs
+ * @throws IOException
+ */
+
+ public void sendBatchMessageNew(String topic, ArrayList<ProducerRecord<String, String>> kms) throws IOException {
+
+ }
+
+ public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException {
+ }
+
+ public void sendMessages(String topic, List<? extends message> msgs) throws IOException {
+ for (message m : msgs) {
+ sendMessage(topic, m);
+ }
+ }
+
+ private final MemoryMetaBroker fBroker;
+ private final MemoryQueue fQueue;
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java b/src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java
new file mode 100644
index 0000000..8e41c9f
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java
@@ -0,0 +1,109 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.memory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import com.att.dmf.mr.backends.Publisher;
+
+//import kafka.producer.KeyedMessage;
+
+/**
+ * class used for logging perspective
+ *
+ * @author anowarul.islam
+ *
+ */
+public class MessageLogger implements Publisher {
+ public MessageLogger() {
+ }
+
+ public void setFile(File f) throws FileNotFoundException {
+ fStream = new FileOutputStream(f, true);
+ }
+
+ /**
+ *
+ * @param topic
+ * @param msg
+ * @throws IOException
+ */
+ @Override
+ public void sendMessage(String topic, message msg) throws IOException {
+ logMsg(msg);
+ }
+
+ /**
+ * @param topic
+ * @param msgs
+ * @throws IOException
+ */
+ @Override
+ public void sendMessages(String topic, List<? extends message> msgs) throws IOException {
+ for (message m : msgs) {
+ logMsg(m);
+ }
+ }
+
+ /**
+ * @param topic
+ * @param kms
+ * @throws IOException
+
+ @Override
+ public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws
+
+ IOException {
+ }
+ */
+ private FileOutputStream fStream;
+
+ /**
+ *
+ * @param msg
+ * @throws IOException
+ */
+ private void logMsg(message msg) throws IOException {
+ String key = msg.getKey();
+ if (key == null)
+ key = "<none>";
+
+ fStream.write('[');
+ fStream.write(key.getBytes());
+ fStream.write("] ".getBytes());
+ fStream.write(msg.getMessage().getBytes());
+ fStream.write('\n');
+ }
+ public void sendBatchMessageNew(String topic, ArrayList<ProducerRecord<String, String>> kms) throws IOException {
+
+ }
+
+ public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException {
+ }
+}