summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java28
1 files changed, 10 insertions, 18 deletions
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
index 6fc0838..e4e09c8 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
@@ -46,8 +46,8 @@ import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
import com.att.dmf.mr.backends.kafka.LiveLockAvoidance;
import com.att.dmf.mr.constants.CambriaConstants;
import com.att.dmf.mr.utils.ConfigurationReader;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
@@ -58,12 +58,9 @@ import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
*/
public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
- // private static final Logger log = LoggerFactory
- // .getLogger(DMaaPKafkaConsumerFactory.class);
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
- // @Autowired
- // private KafkaLiveLockAvoider kafkaLiveLockAvoider = new
- // KafkaLiveLockAvoider();
+
/**
* constructor initialization
@@ -106,8 +103,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
final boolean isCacheEnabled = kSetting_EnableCache;
- // fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId,
- // metrics) : null;
+
fCache = null;
if (isCacheEnabled) {
fCache = KafkaConsumerCache.getInstance();
@@ -195,8 +191,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
long fCreateTimeMs = System.currentTimeMillis();
KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
- kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);// ,fCache.getkafkaLiveLockAvoiderObj()
- // );
+ kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);
log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
if (fCache != null) {
@@ -265,10 +260,9 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
private void transferSettingIfProvided(Properties target, String key, String prefix) {
String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
- // if (fSettings.hasValueFor(makeLongKey(key, prefix))) {
+
if (null != keyVal) {
- // final String val = fSettings
- // .getString(makeLongKey(key, prefix), "");
+
log.info("Setting [" + key + "] to " + keyVal + ".");
target.put(key, keyVal);
}
@@ -294,10 +288,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
props.put("group.id", fakeGroupName);
props.put("enable.auto.commit", "false"); // 0.11
props.put("bootstrap.servers", fkafkaBrokers);
- /*props.put("sasl.jaas.config",
- "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");*/
+
+
props.put("client.id", consumerId);
// additional settings: start with our defaults, then pull in configured