summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorParshad Patel <pars.patel@samsung.com>2018-08-31 16:32:14 +0900
committerParshad Patel <pars.patel@samsung.com>2018-08-31 18:10:19 +0900
commitef4d3e40c2ce25e19531223dd36e61168a57919a (patch)
treeefc7bf7b121dd9758a414928f121b6e82f9c1687 /src
parent3e093cf65a30e3d4bacc959af52df5c9d16a5932 (diff)
Fix sonar issues in KafkaConsumerCache.java
Fix use try-with-resources, InterruptedException sonar issues Issue-ID: DMAAP-687 Change-Id: Ia667169912df87cc77dc6bd78a6de454e6516881 Signed-off-by: Parshad Patel <pars.patel@samsung.com>
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java15
1 files changed, 8 insertions, 7 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
index 2bf2fb2..83c08ec 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
@@ -101,7 +101,7 @@ public class KafkaConsumerCache {
// the server at least every 30 seconds, timing out after 2 minutes should
// be okay.
// FIXME: consider allowing the client to specify its expected call rate?
- private static final long kDefault_MustTouchEveryMs = 1000 * 60 * 2;
+ private static final long kDefault_MustTouchEveryMs = 1000L*60*2;
// check for expirations pretty regularly
private static final long kDefault_SweepEverySeconds = 15;
@@ -298,8 +298,8 @@ public class KafkaConsumerCache {
try {
curator.blockUntilConnected();
} catch (InterruptedException e) {
- // Ignore
- log.error("error while setting curator framework :" + e.getMessage());
+ log.error("error while setting curator framework :",e);
+ Thread.currentThread().interrupt();
}
}
@@ -511,7 +511,8 @@ public class KafkaConsumerCache {
consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
Thread.sleep(consumerHandoverWaitMs);
} catch (InterruptedException e) {
- // Ignore
+ log.error("InterruptedException in dropTimedOutConsumer",e);
+ Thread.currentThread().interrupt();
}
log.info("Dropped " + key + " consumer due to timeout");
}
@@ -638,9 +639,8 @@ public class KafkaConsumerCache {
throws KafkaConsumerCacheException {
// get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
- final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership");
- try {
+ try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) {
final String consumerPath = fBaseZkPath + "/" + consumerKey;
log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
final CuratorFramework curator = ConfigurationReader.getCurator();
@@ -667,7 +667,8 @@ public class KafkaConsumerCache {
consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
Thread.sleep(consumerHandoverWaitMs);
} catch (InterruptedException e) {
- // Ignore
+ log.error("InterruptedException in signalOwnership",e);
+ Thread.currentThread().interrupt();
}
}