diff options
Diffstat (limited to 'src/main/java')
3 files changed, 14 insertions, 22 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java index dc4bcd5..9543828 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java @@ -178,10 +178,10 @@ public class KafkaPublisher implements Publisher { throws IOException { log.info("sending " + msgs.size() + " events to [" + topic + "]"); try{ - final List<ProducerRecord<String, String>> kms = new ArrayList<ProducerRecord<String, String>>(msgs.size()); + final List<ProducerRecord<String, String>> kms = new ArrayList<>(msgs.size()); for (message o : msgs) { - final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, o.getKey(), o.toString()); + final ProducerRecord<String, String> data = new ProducerRecord<>(topic, o.getKey(), o.toString()); try { diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java index 5f28367..f0bb982 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java @@ -65,7 +65,7 @@ public class DMaaPCambriaLimiter { public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) throws missingReqdSetting, invalidSettingValue { fRateInfo = new HashMap<String, RateInfo>(); - fRateInfoCheck = new HashMap<String, RateInfoCheck>(); + fRateInfoCheck = new HashMap<>(); fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute, CambriaConstants.kDefault_MaxEmptyPollsPerMinute); fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute, @@ -112,7 +112,7 @@ public class DMaaPCambriaLimiter { */ public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) { fRateInfo = new HashMap<String, RateInfo>(); - fRateInfoCheck = new HashMap<String, RateInfoCheck>(); + fRateInfoCheck = new HashMap<>(); fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute); fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute); fWindowLengthMins = windowLengthMins; diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java index 6fc0838..e4e09c8 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java @@ -46,8 +46,8 @@ import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2; import com.att.dmf.mr.backends.kafka.LiveLockAvoidance; import com.att.dmf.mr.constants.CambriaConstants; import com.att.dmf.mr.utils.ConfigurationReader; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; + + import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; @@ -58,12 +58,9 @@ import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; */ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { - // private static final Logger log = LoggerFactory - // .getLogger(DMaaPKafkaConsumerFactory.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class); - // @Autowired - // private KafkaLiveLockAvoider kafkaLiveLockAvoider = new - // KafkaLiveLockAvoider(); + /** * constructor initialization @@ -106,8 +103,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { final boolean isCacheEnabled = kSetting_EnableCache; - // fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId, - // metrics) : null; + fCache = null; if (isCacheEnabled) { fCache = KafkaConsumerCache.getInstance(); @@ -195,8 +191,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId); long fCreateTimeMs = System.currentTimeMillis(); KafkaConsumer<String, String> cc = new KafkaConsumer<>(props); - kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);// ,fCache.getkafkaLiveLockAvoiderObj() - // ); + kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider); log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs)); if (fCache != null) { @@ -265,10 +260,9 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { private void transferSettingIfProvided(Properties target, String key, String prefix) { String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix)); - // if (fSettings.hasValueFor(makeLongKey(key, prefix))) { + if (null != keyVal) { - // final String val = fSettings - // .getString(makeLongKey(key, prefix), ""); + log.info("Setting [" + key + "] to " + keyVal + "."); target.put(key, keyVal); } @@ -294,10 +288,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { props.put("group.id", fakeGroupName); props.put("enable.auto.commit", "false"); // 0.11 props.put("bootstrap.servers", fkafkaBrokers); - /*props.put("sasl.jaas.config", - "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); - props.put("security.protocol", "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN");*/ + + props.put("client.id", consumerId); // additional settings: start with our defaults, then pull in configured |