diff options
Diffstat (limited to 'src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java')
-rw-r--r-- | src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java | 59 |
1 files changed, 26 insertions, 33 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java index 126711a..83c08ec 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java @@ -61,8 +61,8 @@ import com.att.dmf.mr.backends.MetricsSet; import com.att.dmf.mr.constants.CambriaConstants; import com.att.dmf.mr.exception.DMaaPErrorMessages; import com.att.dmf.mr.utils.ConfigurationReader; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; + + import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.metrics.CdmTimer; @@ -101,7 +101,7 @@ public class KafkaConsumerCache { // the server at least every 30 seconds, timing out after 2 minutes should // be okay. // FIXME: consider allowing the client to specify its expected call rate? - private static final long kDefault_MustTouchEveryMs = 1000 * 60 * 2; + private static final long kDefault_MustTouchEveryMs = 1000L*60*2; // check for expirations pretty regularly private static final long kDefault_SweepEverySeconds = 15; @@ -110,16 +110,13 @@ public class KafkaConsumerCache { NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED } - // @Qualifier("kafkalockavoid") - - // @Resource - // @Qualifier("kafkalockavoid") - // KafkaLiveLockAvoider2 kafkaLiveLockAvoider; + + @Autowired private DMaaPErrorMessages errorMessages; - // KafkaLiveLockAvoider kafkaLiveLockAvoider = new KafkaLiveLockAvoider(); + /** * User defined exception class for kafka consumer cache * @@ -267,8 +264,8 @@ public class KafkaConsumerCache { EnsurePath ensurePath = new EnsurePath(fBaseZkPath); ensurePath.ensure(curator.getZookeeperClient()); - // final long freq = fSettings.getLong(kSetting_SweepEverySeconds, - // kDefault_SweepEverySeconds); + + long freq = kDefault_SweepEverySeconds; String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, kSetting_SweepEverySeconds); @@ -301,8 +298,8 @@ public class KafkaConsumerCache { try { curator.blockUntilConnected(); } catch (InterruptedException e) { - // Ignore - log.error("error while setting curator framework :" + e.getMessage()); + log.error("error while setting curator framework :",e); + Thread.currentThread().interrupt(); } } @@ -393,8 +390,8 @@ public class KafkaConsumerCache { if (getStatus() != KafkaConsumerCache.Status.CONNECTED) throw new KafkaConsumerCacheException("The cache service is unavailable."); ArrayList<Kafka011Consumer> kcl = new ArrayList<>(); - // final String consumerKey = makeConsumerKey(topic, consumerGroupId, - // clientId); + + Enumeration<String> strEnum = fConsumers.keys(); String consumerLocalKey = null; while (strEnum.hasMoreElements()) { @@ -402,9 +399,9 @@ public class KafkaConsumerCache { if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) { - // System.out.println("consumer key returning from - // getConsumerListforCG +++++++++ " + consumerLocalKey - // + " " + fConsumers.get(consumerLocalKey)); + + + kcl.add(fConsumers.get(consumerLocalKey)); } @@ -417,8 +414,7 @@ public class KafkaConsumerCache { if (getStatus() != KafkaConsumerCache.Status.CONNECTED) throw new KafkaConsumerCacheException("The cache service is unavailable."); ArrayList<Kafka011Consumer> kcl = new ArrayList<>(); - // final String consumerKey = makeConsumerKey(topic, consumerGroupId, - // clientId); + Enumeration<String> strEnum = fConsumers.keys(); String consumerLocalKey = null; while (strEnum.hasMoreElements()) { @@ -426,9 +422,7 @@ public class KafkaConsumerCache { if (consumerLocalKey.startsWith(group)) { - // System.out.println("consumer key returning from - // getConsumerListforCG +++++++++ " + consumerLocalKey - // + " " + fConsumers.get(consumerLocalKey)); + kcl.add(fConsumers.get(consumerLocalKey)); } @@ -454,7 +448,7 @@ public class KafkaConsumerCache { final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId); fConsumers.put(consumerKey, consumer); - // String appId = "node-instance-"+i; + log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId); } @@ -517,7 +511,8 @@ public class KafkaConsumerCache { consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs); Thread.sleep(consumerHandoverWaitMs); } catch (InterruptedException e) { - // Ignore + log.error("InterruptedException in dropTimedOutConsumer",e); + Thread.currentThread().interrupt(); } log.info("Dropped " + key + " consumer due to timeout"); } @@ -549,7 +544,7 @@ public class KafkaConsumerCache { final Kafka011Consumer kc = fConsumers.get(key); log.info("closing Kafka consumer " + key + " object " + kc); if (kc != null) { - // log.info("closing Kafka consumer " + key); + if (kc.close()) { fConsumers.remove(key); @@ -644,9 +639,8 @@ public class KafkaConsumerCache { throws KafkaConsumerCacheException { // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId> final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId); - final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership"); - try { + try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) { final String consumerPath = fBaseZkPath + "/" + consumerKey; log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey); final CuratorFramework curator = ConfigurationReader.getCurator(); @@ -673,7 +667,8 @@ public class KafkaConsumerCache { consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs); Thread.sleep(consumerHandoverWaitMs); } catch (InterruptedException e) { - // Ignore + log.error("InterruptedException in signalOwnership",e); + Thread.currentThread().interrupt(); } } @@ -690,8 +685,7 @@ public class KafkaConsumerCache { mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs); } - // final long mustTouchEveryMs = - // fSettings.getLong(kSetting_TouchEveryMs, kDefault_MustTouchEveryMs); + final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs; for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) { @@ -744,6 +738,5 @@ public class KafkaConsumerCache { } private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class); - // private static final Logger log = - // LoggerFactory.getLogger(KafkaConsumerCache.class); + }
\ No newline at end of file |