diff options
-rw-r--r-- | src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java | 15 |
1 files changed, 8 insertions, 7 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java index 2bf2fb2..83c08ec 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java @@ -101,7 +101,7 @@ public class KafkaConsumerCache { // the server at least every 30 seconds, timing out after 2 minutes should // be okay. // FIXME: consider allowing the client to specify its expected call rate? - private static final long kDefault_MustTouchEveryMs = 1000 * 60 * 2; + private static final long kDefault_MustTouchEveryMs = 1000L*60*2; // check for expirations pretty regularly private static final long kDefault_SweepEverySeconds = 15; @@ -298,8 +298,8 @@ public class KafkaConsumerCache { try { curator.blockUntilConnected(); } catch (InterruptedException e) { - // Ignore - log.error("error while setting curator framework :" + e.getMessage()); + log.error("error while setting curator framework :",e); + Thread.currentThread().interrupt(); } } @@ -511,7 +511,8 @@ public class KafkaConsumerCache { consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs); Thread.sleep(consumerHandoverWaitMs); } catch (InterruptedException e) { - // Ignore + log.error("InterruptedException in dropTimedOutConsumer",e); + Thread.currentThread().interrupt(); } log.info("Dropped " + key + " consumer due to timeout"); } @@ -638,9 +639,8 @@ public class KafkaConsumerCache { throws KafkaConsumerCacheException { // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId> final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId); - final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership"); - try { + try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) { final String consumerPath = fBaseZkPath + "/" + consumerKey; log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey); final CuratorFramework curator = ConfigurationReader.getCurator(); @@ -667,7 +667,8 @@ public class KafkaConsumerCache { consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs); Thread.sleep(consumerHandoverWaitMs); } catch (InterruptedException e) { - // Ignore + log.error("InterruptedException in signalOwnership",e); + Thread.currentThread().interrupt(); } } |