summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java')
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java729
1 files changed, 729 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java
new file mode 100644
index 0000000..ac68a11
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java
@@ -0,0 +1,729 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.dmf.mr.backends.kafka;
+
+import com.att.aft.dme2.internal.springframework.beans.factory.annotation.Autowired;
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.metrics.CdmTimer;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.http.annotation.NotThreadSafe;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.onap.dmaap.dmf.mr.backends.Consumer;
+import org.onap.dmaap.dmf.mr.backends.MetricsSet;
+import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
+import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
+import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
+
+/**
+ * @NotThreadSafe but expected to be used within KafkaConsumerFactory, which
+ * must be
+ * @author peter
+ *
+ */
+@NotThreadSafe
+public class KafkaConsumerCache {
+
+ private static KafkaConsumerCache kafkaconscache = null;
+
+ public static KafkaConsumerCache getInstance() {
+ if (kafkaconscache == null)
+ kafkaconscache = new KafkaConsumerCache();
+
+ return kafkaconscache;
+ }
+
+ private static final String kSetting_ConsumerHandoverWaitMs = "cambria.consumer.cache.handoverWaitMs";
+ private static final int kDefault_ConsumerHandoverWaitMs = 500;
+
+ private static final String kSetting_SweepEverySeconds = "cambria.consumer.cache.sweepFreqSeconds";
+ private static final String kSetting_TouchEveryMs = "cambria.consumer.cache.touchFreqMs";
+
+ private static final String kSetting_ZkBasePath = "cambria.consumer.cache.zkBasePath";
+ private static final String kDefault_ZkBasePath = CambriaConstants.kDefault_ZkRoot + "/consumerCache";
+
+ // kafka defaults to timing out a client after 6 seconds of inactivity, but
+ // it heartbeats even when the client isn't fetching. Here, we don't
+ // want to prematurely rebalance the consumer group. Assuming clients are
+ // hitting
+ // the server at least every 30 seconds, timing out after 2 minutes should
+ // be okay.
+ // FIXME: consider allowing the client to specify its expected call rate?
+ private static final long kDefault_MustTouchEveryMs = 1000L*60*2;
+
+ // check for expirations pretty regularly
+ private static final long kDefault_SweepEverySeconds = 15;
+
+ private enum Status {
+ NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
+ }
+
+
+
+
+ @Autowired
+ private DMaaPErrorMessages errorMessages;
+
+
+ /**
+ * User defined exception class for kafka consumer cache
+ *
+ * @author nilanjana.maity
+ *
+ */
+ public class KafkaConsumerCacheException extends Exception {
+ /**
+ * To throw the exception
+ *
+ * @param t
+ */
+ KafkaConsumerCacheException(Throwable t) {
+ super(t);
+ }
+
+ /**
+ *
+ * @param s
+ */
+ public KafkaConsumerCacheException(String s) {
+ super(s);
+ }
+
+ private static final long serialVersionUID = 1L;
+ }
+
+ /**
+ * Creates a KafkaConsumerCache object. Before it is used, you must call
+ * startCache()
+ *
+ */
+ public KafkaConsumerCache() {
+
+ String strkSetting_ZkBasePath = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ kSetting_ZkBasePath);
+ if (null == strkSetting_ZkBasePath)
+ strkSetting_ZkBasePath = kDefault_ZkBasePath;
+ fBaseZkPath = strkSetting_ZkBasePath;
+
+ fConsumers = new ConcurrentHashMap<>();
+ fSweepScheduler = Executors.newScheduledThreadPool(1);
+
+ curatorConsumerCache = null;
+
+ status = Status.NOT_STARTED;
+ // Watcher for consumer rebalancing across nodes. Kafka011 rebalancing
+ // work around
+
+ listener = new ConnectionStateListener() {
+ public void stateChanged(CuratorFramework client, ConnectionState newState) {
+ if (newState == ConnectionState.LOST) {
+
+ log.info("ZooKeeper connection expired");
+ handleConnectionLoss();
+ } else if (newState == ConnectionState.READ_ONLY) {
+ log.warn("ZooKeeper connection set to read only mode.");
+ } else if (newState == ConnectionState.RECONNECTED) {
+ log.info("ZooKeeper connection re-established");
+ handleReconnection();
+ } else if (newState == ConnectionState.SUSPENDED) {
+ log.warn("ZooKeeper connection has been suspended.");
+ handleConnectionSuspended();
+ }
+ }
+ };
+ }
+
+ /**
+ * Start the cache service. This must be called before any get/put
+ * operations.
+ *
+ * @param mode
+ * DMAAP or cambria
+ * @param curator
+ * @throws IOException
+ * @throws KafkaConsumerCacheException
+ */
+ public void startCache(String mode, CuratorFramework curator) throws KafkaConsumerCacheException {
+
+ if (fApiId == null) {
+ throw new IllegalArgumentException("API Node ID must be specified.");
+ }
+
+ try {
+
+ if (mode != null && mode.equals(CambriaConstants.DMAAP)) {
+ curator = getCuratorFramework(curator);
+ }
+ curator.getConnectionStateListenable().addListener(listener);
+ setStatus(Status.CONNECTED);
+ curatorConsumerCache = new PathChildrenCache(curator, fBaseZkPath, true);
+ curatorConsumerCache.start();
+ curatorConsumerCache.getListenable().addListener(new PathChildrenCacheListener() {
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
+ switch (event.getType()) {
+ case CHILD_ADDED: {
+ try {
+ final String apiId = new String(event.getData().getData());
+ final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
+
+ log.info(apiId + " started consumer " + consumer);
+ } catch (Exception ex) {
+ log.info("#Error Occured during Adding child" + ex);
+ }
+ break;
+ }
+ case CHILD_UPDATED: {
+ final String apiId = new String(event.getData().getData());
+ final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
+
+ if (fConsumers.containsKey(consumer)) {
+ log.info(apiId + " claimed consumer " + consumer + " from " + fApiId
+ + " but wont hand over");
+ // Commented so that it dont give the connection
+ // until the active node is running for this client
+ // id.
+ dropClaimedConsumer(consumer);
+ }
+
+ break;
+ }
+ case CHILD_REMOVED: {
+ final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
+
+ if (fConsumers.containsKey(consumer)) {
+ log.info("Someone wanted consumer " + consumer
+ + " gone; but not removing it from the cache");
+ dropConsumer(consumer, false);
+ }
+
+ break;
+ }
+
+ default:
+ break;
+ }
+ }
+ });
+
+ // initialize the ZK path
+ EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
+ ensurePath.ensure(curator.getZookeeperClient());
+
+
+
+ long freq = kDefault_SweepEverySeconds;
+ String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ kSetting_SweepEverySeconds);
+ if (null != strkSetting_SweepEverySeconds) {
+ freq = Long.parseLong(strkSetting_SweepEverySeconds);
+ }
+
+ fSweepScheduler.scheduleAtFixedRate(new sweeper(), freq, freq, TimeUnit.SECONDS);
+ log.info("KafkaConsumerCache started");
+ log.info("sweeping cached clients every " + freq + " seconds");
+ } catch (ZkException e) {
+ log.error("@@@@@@ ZK Exception occured for " + e);
+ throw new KafkaConsumerCacheException(e);
+ } catch (Exception e) {
+ log.error("@@@@@@ Exception occured for " + e);
+ throw new KafkaConsumerCacheException(e);
+ }
+ }
+
+ /**
+ * Getting the curator oject to start the zookeeper connection estabished
+ *
+ * @param curator
+ * @return curator object
+ */
+ public static CuratorFramework getCuratorFramework(CuratorFramework curator) {
+ if (curator.getState() == CuratorFrameworkState.LATENT) {
+ curator.start();
+
+ try {
+ curator.blockUntilConnected();
+ } catch (InterruptedException e) {
+ log.error("error while setting curator framework :",e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ return curator;
+ }
+
+ /**
+ * Stop the cache service.
+ */
+ public void stopCache() {
+ setStatus(Status.DISCONNECTED);
+
+ final CuratorFramework curator = ConfigurationReader.getCurator();
+
+ if (curator != null) {
+ try {
+ curator.getConnectionStateListenable().removeListener(listener);
+ curatorConsumerCache.close();
+ log.info("Curator client closed");
+ } catch (ZkInterruptedException e) {
+ log.warn("Curator client close interrupted: ", e);
+ } catch (IOException e) {
+ log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache ", e);
+ }
+
+ curatorConsumerCache = null;
+ }
+
+ if (fSweepScheduler != null) {
+ fSweepScheduler.shutdownNow();
+ log.info("cache sweeper stopped");
+ }
+
+ if (fConsumers != null) {
+ fConsumers.clear();
+ fConsumers = null;
+ }
+
+ setStatus(Status.NOT_STARTED);
+
+ log.info("Consumer cache service stopped");
+ }
+
+ /**
+ * Get a cached consumer by topic, group, and id, if it exists (and remains
+ * valid) In addition, this method waits for all other consumer caches in
+ * the cluster to release their ownership and delete their version of this
+ * consumer.
+ *
+ * @param topic
+ * @param consumerGroupId
+ * @param clientId
+ * @return a consumer, or null
+ */
+ public Kafka011Consumer getConsumerFor(String topic, String consumerGroupId, String clientId)
+ throws KafkaConsumerCacheException {
+ if (getStatus() != Status.CONNECTED)
+ throw new KafkaConsumerCacheException("The cache service is unavailable.");
+
+ final String consumerKey = makeConsumerKey(topic, consumerGroupId, clientId);
+ final Kafka011Consumer kc = fConsumers.get(consumerKey);
+
+ if (kc != null) {
+ log.debug("Consumer cache hit for [" + consumerKey + "], last was at " + kc.getLastTouch());
+ kc.touch();
+ fMetrics.onKafkaConsumerCacheHit();
+ } else {
+ log.debug("Consumer cache miss for [" + consumerKey + "]");
+ fMetrics.onKafkaConsumerCacheMiss();
+ }
+
+ return kc;
+ }
+
+ /**
+ * Get a cached consumer by topic, group, and id, if it exists (and remains
+ * valid) In addition, this method waits for all other consumer caches in
+ * the cluster to release their ownership and delete their version of this
+ * consumer.
+ *
+ * @param topicgroup
+ * @param clientId
+ * @return a consumer, or null
+ */
+ public ArrayList<Kafka011Consumer> getConsumerListForCG(String topicgroup, String clientId)
+ throws KafkaConsumerCacheException {
+ if (getStatus() != Status.CONNECTED)
+ throw new KafkaConsumerCacheException("The cache service is unavailable.");
+ ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
+
+
+ Enumeration<String> strEnum = fConsumers.keys();
+ String consumerLocalKey = null;
+ while (strEnum.hasMoreElements()) {
+ consumerLocalKey = strEnum.nextElement();
+
+ if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) {
+
+
+
+
+ kcl.add(fConsumers.get(consumerLocalKey));
+
+ }
+ }
+
+ return kcl;
+ }
+
+ public ArrayList<Kafka011Consumer> getConsumerListForCG(String group) throws KafkaConsumerCacheException {
+ if (getStatus() != Status.CONNECTED)
+ throw new KafkaConsumerCacheException("The cache service is unavailable.");
+ ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
+
+ Enumeration<String> strEnum = fConsumers.keys();
+ String consumerLocalKey = null;
+ while (strEnum.hasMoreElements()) {
+ consumerLocalKey = strEnum.nextElement();
+
+ if (consumerLocalKey.startsWith(group)) {
+
+
+ kcl.add(fConsumers.get(consumerLocalKey));
+
+ }
+ }
+
+ return kcl;
+ }
+
+ /**
+ * Put a consumer into the cache by topic, group and ID
+ *
+ * @param topic
+ * @param consumerGroupId
+ * @param consumerId
+ * @param consumer
+ * @throws KafkaConsumerCacheException
+ */
+ public void putConsumerFor(String topic, String consumerGroupId, String consumerId, Kafka011Consumer consumer)
+ throws KafkaConsumerCacheException {
+ if (getStatus() != Status.CONNECTED)
+ throw new KafkaConsumerCacheException("The cache service is unavailable.");
+
+ final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
+ fConsumers.put(consumerKey, consumer);
+
+
+
+ log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId);
+ }
+
+ public Collection<? extends Consumer> getConsumers() {
+ return new LinkedList<>(fConsumers.values());
+ }
+
+ /**
+ * This method is to drop all the consumer
+ */
+ public void dropAllConsumers() {
+ for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
+ dropConsumer(entry.getKey(), true);
+ }
+
+ // consumers should be empty here
+ if (fConsumers.size() > 0) {
+ log.warn("During dropAllConsumers, the consumer map is not empty.");
+ fConsumers.clear();
+ }
+ }
+
+ /**
+ * Drop a consumer from our cache due to a timeout
+ *
+ * @param key
+ */
+ private void dropTimedOutConsumer(String key) {
+ fMetrics.onKafkaConsumerTimeout();
+
+ if (!fConsumers.containsKey(key)) {
+ log.warn("Attempted to drop a timed out consumer which was not in our cache: " + key);
+ return;
+ }
+
+ // First, drop this consumer from our cache
+ boolean isdrop = dropConsumer(key, true);
+ if (!isdrop) {
+ return;
+ }
+ final CuratorFramework curator = ConfigurationReader.getCurator();
+
+ try {
+ curator.delete().guaranteed().forPath(fBaseZkPath + "/" + key);
+ log.info(" ^ deleted " + fBaseZkPath + "/" + key);
+ } catch (NoNodeException e) {
+ log.warn("A consumer was deleted from " + fApiId
+ + "'s cache, but no Cambria API node had ownership of it in ZooKeeper ", e);
+ } catch (Exception e) {
+ log.debug("Unexpected exception while deleting consumer: ", e);
+ log.info(" %%%%%%@# Unexpected exception while deleting consumer: ", e);
+ }
+
+ try {
+ int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
+ String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ kSetting_ConsumerHandoverWaitMs);
+ if (strkSetting_ConsumerHandoverWaitMs != null)
+ consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
+ Thread.sleep(consumerHandoverWaitMs);
+ } catch (InterruptedException e) {
+ log.error("InterruptedException in dropTimedOutConsumer",e);
+ Thread.currentThread().interrupt();
+ }
+ log.info("Dropped " + key + " consumer due to timeout");
+ }
+
+ /**
+ * Drop a consumer from our cache due to another API node claiming it as
+ * their own.
+ *
+ * @param key
+ */
+ private void dropClaimedConsumer(String key) {
+ // if the consumer is still in our cache, it implies a claim.
+ if (fConsumers.containsKey(key)) {
+ fMetrics.onKafkaConsumerClaimed();
+ log.info("Consumer [" + key + "] claimed by another node.");
+ }
+ log.info("^dropping claimed Kafka consumer " + key);
+ dropConsumer(key, false);
+ }
+
+ /**
+ * Removes the consumer from the cache and closes its connection to the
+ * kafka broker(s).
+ *
+ * @param key
+ * @param dueToTimeout
+ */
+ private boolean dropConsumer(String key, boolean dueToTimeout) {
+ final Kafka011Consumer kc = fConsumers.get(key);
+ log.info("closing Kafka consumer " + key + " object " + kc);
+ if (kc != null) {
+
+ if (kc.close()) {
+ fConsumers.remove(key);
+
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // private final rrNvReadable fSettings;
+ private MetricsSet fMetrics;
+ private final String fBaseZkPath;
+ private final ScheduledExecutorService fSweepScheduler;
+ private String fApiId;
+
+ public void setfMetrics(final MetricsSet metrics) {
+ this.fMetrics = metrics;
+ }
+
+ public void setfApiId(final String id) {
+ this.fApiId = id;
+ }
+
+ private final ConnectionStateListener listener;
+
+ private ConcurrentHashMap<String, Kafka011Consumer> fConsumers;
+ private PathChildrenCache curatorConsumerCache;
+
+ private volatile Status status;
+
+ private void handleReconnection() {
+
+ log.info("Reading current cache data from ZK and synchronizing local cache");
+ final List<ChildData> cacheData = curatorConsumerCache.getCurrentData();
+ // Remove all the consumers in this API nodes cache that now belong to
+ // other API nodes.
+ for (ChildData cachedConsumer : cacheData) {
+ final String consumerId = ZKPaths.getNodeFromPath(cachedConsumer.getPath());
+ final String owningApiId = (cachedConsumer.getData() != null) ? new String(cachedConsumer.getData())
+ : "undefined";
+ if (!fApiId.equals(owningApiId)) {
+ fConsumers.remove(consumerId); // Commented to avoid removing
+ // the value cache hashmap but the lock still exists.
+ // This is not considered in kafka consumer Factory
+ log.info("@@@ Validating current cache data from ZK and synchronizing local cache" + owningApiId
+ + " removing " + consumerId);
+ }
+ }
+
+ setStatus(Status.CONNECTED);
+ }
+
+ private void handleConnectionSuspended() {
+ log.info("Suspending cache until ZK connection is re-established");
+
+ setStatus(Status.SUSPENDED);
+ }
+
+ private void handleConnectionLoss() {
+ log.info("Clearing consumer cache (shutting down all Kafka consumers on this node)");
+
+ setStatus(Status.DISCONNECTED);
+
+ closeAllCachedConsumers();
+ fConsumers.clear();
+ }
+
+ private void closeAllCachedConsumers() {
+ for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
+ try {
+ entry.getValue().close();
+ } catch (Exception e) {
+ log.info("@@@@@@ Error occurd while closing Clearing All cache " + e);
+ }
+ }
+ }
+
+ private static String makeConsumerKey(String topic, String consumerGroupId, String clientId) {
+ return topic + "::" + consumerGroupId + "::" + clientId;
+ }
+
+ /**
+ * This method is to get a lock
+ *
+ * @param topic
+ * @param consumerGroupId
+ * @param consumerId
+ * @throws KafkaConsumerCacheException
+ */
+ public void signalOwnership(final String topic, final String consumerGroupId, final String consumerId)
+ throws KafkaConsumerCacheException {
+ // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
+ final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
+
+ try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) {
+ final String consumerPath = fBaseZkPath + "/" + consumerKey;
+ log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
+ final CuratorFramework curator = ConfigurationReader.getCurator();
+
+ try {
+ curator.setData().forPath(consumerPath, fApiId.getBytes());
+ } catch (NoNodeException e) {
+ log.info("KeeperException.NoNodeException occured", e);
+ curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes());
+ }
+ log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey);
+ timer.end();
+ } catch (Exception e) {
+ log.error(fApiId + " failed to claim ownership of consumer " + consumerKey);
+ throw new KafkaConsumerCacheException(e);
+ }
+
+ log.info("Backing off to give the Kafka broker time to clean up the ZK data for this consumer");
+
+ try {
+ int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
+ String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ kSetting_ConsumerHandoverWaitMs);
+ if (strkSetting_ConsumerHandoverWaitMs != null)
+ consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
+ Thread.sleep(consumerHandoverWaitMs);
+ } catch (InterruptedException e) {
+ log.error("InterruptedException in signalOwnership",e);
+ //Thread.currentThread().interrupt();
+ }
+ }
+
+ public KafkaLiveLockAvoider2 getkafkaLiveLockAvoiderObj() {
+ return null;
+ }
+
+ public void sweep() {
+ final LinkedList<String> removals = new LinkedList<String>();
+ long mustTouchEveryMs = kDefault_MustTouchEveryMs;
+ String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ kSetting_TouchEveryMs);
+ if (null != strkSetting_TouchEveryMs) {
+ mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
+ }
+
+
+ final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
+
+ for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) {
+ final long lastTouchMs = e.getValue().getLastTouch();
+ log.debug("consumer #####1" + e.getKey() + " " + lastTouchMs + " < " + oldestAllowedTouchMs);
+
+ if (lastTouchMs < oldestAllowedTouchMs) {
+ log.info("consumer " + e.getKey() + " has expired");
+ removals.add(e.getKey());
+ }
+ }
+
+ for (String key : removals) {
+ dropTimedOutConsumer(key);
+ }
+ }
+
+ /**
+ * Creating a thread to run the sweep method
+ *
+ * @author nilanjana.maity
+ *
+ */
+ private class sweeper implements Runnable {
+ /**
+ * run method
+ */
+ public void run() {
+ sweep();
+ }
+ }
+
+ /**
+ * This method is to drop consumer
+ *
+ * @param topic
+ * @param consumerGroup
+ * @param clientId
+ */
+ public void dropConsumer(String topic, String consumerGroup, String clientId) {
+ dropConsumer(makeConsumerKey(topic, consumerGroup, clientId), false);
+ }
+
+ private Status getStatus() {
+ return this.status;
+ }
+
+ private void setStatus(Status status) {
+ this.status = status;
+ }
+
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
+
+} \ No newline at end of file