diff options
Diffstat (limited to 'src/main')
-rw-r--r-- | src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java | 44 |
1 files changed, 18 insertions, 26 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java index 126711a..2bf2fb2 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java @@ -61,8 +61,8 @@ import com.att.dmf.mr.backends.MetricsSet; import com.att.dmf.mr.constants.CambriaConstants; import com.att.dmf.mr.exception.DMaaPErrorMessages; import com.att.dmf.mr.utils.ConfigurationReader; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; + + import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.metrics.CdmTimer; @@ -110,16 +110,13 @@ public class KafkaConsumerCache { NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED } - // @Qualifier("kafkalockavoid") - - // @Resource - // @Qualifier("kafkalockavoid") - // KafkaLiveLockAvoider2 kafkaLiveLockAvoider; + + @Autowired private DMaaPErrorMessages errorMessages; - // KafkaLiveLockAvoider kafkaLiveLockAvoider = new KafkaLiveLockAvoider(); + /** * User defined exception class for kafka consumer cache * @@ -267,8 +264,8 @@ public class KafkaConsumerCache { EnsurePath ensurePath = new EnsurePath(fBaseZkPath); ensurePath.ensure(curator.getZookeeperClient()); - // final long freq = fSettings.getLong(kSetting_SweepEverySeconds, - // kDefault_SweepEverySeconds); + + long freq = kDefault_SweepEverySeconds; String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, kSetting_SweepEverySeconds); @@ -393,8 +390,8 @@ public class KafkaConsumerCache { if (getStatus() != KafkaConsumerCache.Status.CONNECTED) throw new KafkaConsumerCacheException("The cache service is unavailable."); ArrayList<Kafka011Consumer> kcl = new ArrayList<>(); - // final String consumerKey = makeConsumerKey(topic, consumerGroupId, - // clientId); + + Enumeration<String> strEnum = fConsumers.keys(); String consumerLocalKey = null; while (strEnum.hasMoreElements()) { @@ -402,9 +399,9 @@ public class KafkaConsumerCache { if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) { - // System.out.println("consumer key returning from - // getConsumerListforCG +++++++++ " + consumerLocalKey - // + " " + fConsumers.get(consumerLocalKey)); + + + kcl.add(fConsumers.get(consumerLocalKey)); } @@ -417,8 +414,7 @@ public class KafkaConsumerCache { if (getStatus() != KafkaConsumerCache.Status.CONNECTED) throw new KafkaConsumerCacheException("The cache service is unavailable."); ArrayList<Kafka011Consumer> kcl = new ArrayList<>(); - // final String consumerKey = makeConsumerKey(topic, consumerGroupId, - // clientId); + Enumeration<String> strEnum = fConsumers.keys(); String consumerLocalKey = null; while (strEnum.hasMoreElements()) { @@ -426,9 +422,7 @@ public class KafkaConsumerCache { if (consumerLocalKey.startsWith(group)) { - // System.out.println("consumer key returning from - // getConsumerListforCG +++++++++ " + consumerLocalKey - // + " " + fConsumers.get(consumerLocalKey)); + kcl.add(fConsumers.get(consumerLocalKey)); } @@ -454,7 +448,7 @@ public class KafkaConsumerCache { final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId); fConsumers.put(consumerKey, consumer); - // String appId = "node-instance-"+i; + log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId); } @@ -549,7 +543,7 @@ public class KafkaConsumerCache { final Kafka011Consumer kc = fConsumers.get(key); log.info("closing Kafka consumer " + key + " object " + kc); if (kc != null) { - // log.info("closing Kafka consumer " + key); + if (kc.close()) { fConsumers.remove(key); @@ -690,8 +684,7 @@ public class KafkaConsumerCache { mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs); } - // final long mustTouchEveryMs = - // fSettings.getLong(kSetting_TouchEveryMs, kDefault_MustTouchEveryMs); + final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs; for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) { @@ -744,6 +737,5 @@ public class KafkaConsumerCache { } private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class); - // private static final Logger log = - // LoggerFactory.getLogger(KafkaConsumerCache.class); + }
\ No newline at end of file |