summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java4
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java4
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java28
3 files changed, 14 insertions, 22 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
index dc4bcd5..9543828 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
@@ -178,10 +178,10 @@ public class KafkaPublisher implements Publisher {
throws IOException {
log.info("sending " + msgs.size() + " events to [" + topic + "]");
try{
- final List<ProducerRecord<String, String>> kms = new ArrayList<ProducerRecord<String, String>>(msgs.size());
+ final List<ProducerRecord<String, String>> kms = new ArrayList<>(msgs.size());
for (message o : msgs) {
- final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, o.getKey(), o.toString());
+ final ProducerRecord<String, String> data = new ProducerRecord<>(topic, o.getKey(), o.toString());
try {
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
index 5f28367..f0bb982 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
@@ -65,7 +65,7 @@ public class DMaaPCambriaLimiter {
public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings)
throws missingReqdSetting, invalidSettingValue {
fRateInfo = new HashMap<String, RateInfo>();
- fRateInfoCheck = new HashMap<String, RateInfoCheck>();
+ fRateInfoCheck = new HashMap<>();
fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
@@ -112,7 +112,7 @@ public class DMaaPCambriaLimiter {
*/
public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
fRateInfo = new HashMap<String, RateInfo>();
- fRateInfoCheck = new HashMap<String, RateInfoCheck>();
+ fRateInfoCheck = new HashMap<>();
fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
fWindowLengthMins = windowLengthMins;
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
index 6fc0838..e4e09c8 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
@@ -46,8 +46,8 @@ import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
import com.att.dmf.mr.backends.kafka.LiveLockAvoidance;
import com.att.dmf.mr.constants.CambriaConstants;
import com.att.dmf.mr.utils.ConfigurationReader;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
@@ -58,12 +58,9 @@ import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
*/
public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
- // private static final Logger log = LoggerFactory
- // .getLogger(DMaaPKafkaConsumerFactory.class);
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
- // @Autowired
- // private KafkaLiveLockAvoider kafkaLiveLockAvoider = new
- // KafkaLiveLockAvoider();
+
/**
* constructor initialization
@@ -106,8 +103,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
final boolean isCacheEnabled = kSetting_EnableCache;
- // fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId,
- // metrics) : null;
+
fCache = null;
if (isCacheEnabled) {
fCache = KafkaConsumerCache.getInstance();
@@ -195,8 +191,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
long fCreateTimeMs = System.currentTimeMillis();
KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
- kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);// ,fCache.getkafkaLiveLockAvoiderObj()
- // );
+ kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);
log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
if (fCache != null) {
@@ -265,10 +260,9 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
private void transferSettingIfProvided(Properties target, String key, String prefix) {
String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
- // if (fSettings.hasValueFor(makeLongKey(key, prefix))) {
+
if (null != keyVal) {
- // final String val = fSettings
- // .getString(makeLongKey(key, prefix), "");
+
log.info("Setting [" + key + "] to " + keyVal + ".");
target.put(key, keyVal);
}
@@ -294,10 +288,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
props.put("group.id", fakeGroupName);
props.put("enable.auto.commit", "false"); // 0.11
props.put("bootstrap.servers", fkafkaBrokers);
- /*props.put("sasl.jaas.config",
- "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");*/
+
+
props.put("client.id", consumerId);
// additional settings: start with our defaults, then pull in configured