summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java15
1 files changed, 8 insertions, 7 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
index 2bf2fb2..83c08ec 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
@@ -101,7 +101,7 @@ public class KafkaConsumerCache {
// the server at least every 30 seconds, timing out after 2 minutes should
// be okay.
// FIXME: consider allowing the client to specify its expected call rate?
- private static final long kDefault_MustTouchEveryMs = 1000 * 60 * 2;
+ private static final long kDefault_MustTouchEveryMs = 1000L*60*2;
// check for expirations pretty regularly
private static final long kDefault_SweepEverySeconds = 15;
@@ -298,8 +298,8 @@ public class KafkaConsumerCache {
try {
curator.blockUntilConnected();
} catch (InterruptedException e) {
- // Ignore
- log.error("error while setting curator framework :" + e.getMessage());
+ log.error("error while setting curator framework :",e);
+ Thread.currentThread().interrupt();
}
}
@@ -511,7 +511,8 @@ public class KafkaConsumerCache {
consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
Thread.sleep(consumerHandoverWaitMs);
} catch (InterruptedException e) {
- // Ignore
+ log.error("InterruptedException in dropTimedOutConsumer",e);
+ Thread.currentThread().interrupt();
}
log.info("Dropped " + key + " consumer due to timeout");
}
@@ -638,9 +639,8 @@ public class KafkaConsumerCache {
throws KafkaConsumerCacheException {
// get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
- final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership");
- try {
+ try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) {
final String consumerPath = fBaseZkPath + "/" + consumerKey;
log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
final CuratorFramework curator = ConfigurationReader.getCurator();
@@ -667,7 +667,8 @@ public class KafkaConsumerCache {
consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
Thread.sleep(consumerHandoverWaitMs);
} catch (InterruptedException e) {
- // Ignore
+ log.error("InterruptedException in signalOwnership",e);
+ Thread.currentThread().interrupt();
}
}