summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java')
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java59
1 files changed, 26 insertions, 33 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
index 126711a..83c08ec 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
@@ -61,8 +61,8 @@ import com.att.dmf.mr.backends.MetricsSet;
import com.att.dmf.mr.constants.CambriaConstants;
import com.att.dmf.mr.exception.DMaaPErrorMessages;
import com.att.dmf.mr.utils.ConfigurationReader;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.metrics.CdmTimer;
@@ -101,7 +101,7 @@ public class KafkaConsumerCache {
// the server at least every 30 seconds, timing out after 2 minutes should
// be okay.
// FIXME: consider allowing the client to specify its expected call rate?
- private static final long kDefault_MustTouchEveryMs = 1000 * 60 * 2;
+ private static final long kDefault_MustTouchEveryMs = 1000L*60*2;
// check for expirations pretty regularly
private static final long kDefault_SweepEverySeconds = 15;
@@ -110,16 +110,13 @@ public class KafkaConsumerCache {
NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
}
- // @Qualifier("kafkalockavoid")
-
- // @Resource
- // @Qualifier("kafkalockavoid")
- // KafkaLiveLockAvoider2 kafkaLiveLockAvoider;
+
+
@Autowired
private DMaaPErrorMessages errorMessages;
- // KafkaLiveLockAvoider kafkaLiveLockAvoider = new KafkaLiveLockAvoider();
+
/**
* User defined exception class for kafka consumer cache
*
@@ -267,8 +264,8 @@ public class KafkaConsumerCache {
EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
ensurePath.ensure(curator.getZookeeperClient());
- // final long freq = fSettings.getLong(kSetting_SweepEverySeconds,
- // kDefault_SweepEverySeconds);
+
+
long freq = kDefault_SweepEverySeconds;
String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
kSetting_SweepEverySeconds);
@@ -301,8 +298,8 @@ public class KafkaConsumerCache {
try {
curator.blockUntilConnected();
} catch (InterruptedException e) {
- // Ignore
- log.error("error while setting curator framework :" + e.getMessage());
+ log.error("error while setting curator framework :",e);
+ Thread.currentThread().interrupt();
}
}
@@ -393,8 +390,8 @@ public class KafkaConsumerCache {
if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
throw new KafkaConsumerCacheException("The cache service is unavailable.");
ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
- // final String consumerKey = makeConsumerKey(topic, consumerGroupId,
- // clientId);
+
+
Enumeration<String> strEnum = fConsumers.keys();
String consumerLocalKey = null;
while (strEnum.hasMoreElements()) {
@@ -402,9 +399,9 @@ public class KafkaConsumerCache {
if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) {
- // System.out.println("consumer key returning from
- // getConsumerListforCG +++++++++ " + consumerLocalKey
- // + " " + fConsumers.get(consumerLocalKey));
+
+
+
kcl.add(fConsumers.get(consumerLocalKey));
}
@@ -417,8 +414,7 @@ public class KafkaConsumerCache {
if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
throw new KafkaConsumerCacheException("The cache service is unavailable.");
ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
- // final String consumerKey = makeConsumerKey(topic, consumerGroupId,
- // clientId);
+
Enumeration<String> strEnum = fConsumers.keys();
String consumerLocalKey = null;
while (strEnum.hasMoreElements()) {
@@ -426,9 +422,7 @@ public class KafkaConsumerCache {
if (consumerLocalKey.startsWith(group)) {
- // System.out.println("consumer key returning from
- // getConsumerListforCG +++++++++ " + consumerLocalKey
- // + " " + fConsumers.get(consumerLocalKey));
+
kcl.add(fConsumers.get(consumerLocalKey));
}
@@ -454,7 +448,7 @@ public class KafkaConsumerCache {
final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
fConsumers.put(consumerKey, consumer);
- // String appId = "node-instance-"+i;
+
log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId);
}
@@ -517,7 +511,8 @@ public class KafkaConsumerCache {
consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
Thread.sleep(consumerHandoverWaitMs);
} catch (InterruptedException e) {
- // Ignore
+ log.error("InterruptedException in dropTimedOutConsumer",e);
+ Thread.currentThread().interrupt();
}
log.info("Dropped " + key + " consumer due to timeout");
}
@@ -549,7 +544,7 @@ public class KafkaConsumerCache {
final Kafka011Consumer kc = fConsumers.get(key);
log.info("closing Kafka consumer " + key + " object " + kc);
if (kc != null) {
- // log.info("closing Kafka consumer " + key);
+
if (kc.close()) {
fConsumers.remove(key);
@@ -644,9 +639,8 @@ public class KafkaConsumerCache {
throws KafkaConsumerCacheException {
// get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
- final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership");
- try {
+ try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) {
final String consumerPath = fBaseZkPath + "/" + consumerKey;
log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
final CuratorFramework curator = ConfigurationReader.getCurator();
@@ -673,7 +667,8 @@ public class KafkaConsumerCache {
consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
Thread.sleep(consumerHandoverWaitMs);
} catch (InterruptedException e) {
- // Ignore
+ log.error("InterruptedException in signalOwnership",e);
+ Thread.currentThread().interrupt();
}
}
@@ -690,8 +685,7 @@ public class KafkaConsumerCache {
mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
}
- // final long mustTouchEveryMs =
- // fSettings.getLong(kSetting_TouchEveryMs, kDefault_MustTouchEveryMs);
+
final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) {
@@ -744,6 +738,5 @@ public class KafkaConsumerCache {
}
private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
- // private static final Logger log =
- // LoggerFactory.getLogger(KafkaConsumerCache.class);
+
} \ No newline at end of file