summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java44
1 files changed, 18 insertions, 26 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
index 126711a..2bf2fb2 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
@@ -61,8 +61,8 @@ import com.att.dmf.mr.backends.MetricsSet;
import com.att.dmf.mr.constants.CambriaConstants;
import com.att.dmf.mr.exception.DMaaPErrorMessages;
import com.att.dmf.mr.utils.ConfigurationReader;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.metrics.CdmTimer;
@@ -110,16 +110,13 @@ public class KafkaConsumerCache {
NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
}
- // @Qualifier("kafkalockavoid")
-
- // @Resource
- // @Qualifier("kafkalockavoid")
- // KafkaLiveLockAvoider2 kafkaLiveLockAvoider;
+
+
@Autowired
private DMaaPErrorMessages errorMessages;
- // KafkaLiveLockAvoider kafkaLiveLockAvoider = new KafkaLiveLockAvoider();
+
/**
* User defined exception class for kafka consumer cache
*
@@ -267,8 +264,8 @@ public class KafkaConsumerCache {
EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
ensurePath.ensure(curator.getZookeeperClient());
- // final long freq = fSettings.getLong(kSetting_SweepEverySeconds,
- // kDefault_SweepEverySeconds);
+
+
long freq = kDefault_SweepEverySeconds;
String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
kSetting_SweepEverySeconds);
@@ -393,8 +390,8 @@ public class KafkaConsumerCache {
if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
throw new KafkaConsumerCacheException("The cache service is unavailable.");
ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
- // final String consumerKey = makeConsumerKey(topic, consumerGroupId,
- // clientId);
+
+
Enumeration<String> strEnum = fConsumers.keys();
String consumerLocalKey = null;
while (strEnum.hasMoreElements()) {
@@ -402,9 +399,9 @@ public class KafkaConsumerCache {
if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) {
- // System.out.println("consumer key returning from
- // getConsumerListforCG +++++++++ " + consumerLocalKey
- // + " " + fConsumers.get(consumerLocalKey));
+
+
+
kcl.add(fConsumers.get(consumerLocalKey));
}
@@ -417,8 +414,7 @@ public class KafkaConsumerCache {
if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
throw new KafkaConsumerCacheException("The cache service is unavailable.");
ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
- // final String consumerKey = makeConsumerKey(topic, consumerGroupId,
- // clientId);
+
Enumeration<String> strEnum = fConsumers.keys();
String consumerLocalKey = null;
while (strEnum.hasMoreElements()) {
@@ -426,9 +422,7 @@ public class KafkaConsumerCache {
if (consumerLocalKey.startsWith(group)) {
- // System.out.println("consumer key returning from
- // getConsumerListforCG +++++++++ " + consumerLocalKey
- // + " " + fConsumers.get(consumerLocalKey));
+
kcl.add(fConsumers.get(consumerLocalKey));
}
@@ -454,7 +448,7 @@ public class KafkaConsumerCache {
final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
fConsumers.put(consumerKey, consumer);
- // String appId = "node-instance-"+i;
+
log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId);
}
@@ -549,7 +543,7 @@ public class KafkaConsumerCache {
final Kafka011Consumer kc = fConsumers.get(key);
log.info("closing Kafka consumer " + key + " object " + kc);
if (kc != null) {
- // log.info("closing Kafka consumer " + key);
+
if (kc.close()) {
fConsumers.remove(key);
@@ -690,8 +684,7 @@ public class KafkaConsumerCache {
mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
}
- // final long mustTouchEveryMs =
- // fSettings.getLong(kSetting_TouchEveryMs, kDefault_MustTouchEveryMs);
+
final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) {
@@ -744,6 +737,5 @@ public class KafkaConsumerCache {
}
private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
- // private static final Logger log =
- // LoggerFactory.getLogger(KafkaConsumerCache.class);
+
} \ No newline at end of file