summaryrefslogtreecommitdiffstats
path: root/src/main/java/com
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com')
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java13
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java6
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java4
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java28
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java28
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java4
-rw-r--r--src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java2
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java4
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java2
9 files changed, 39 insertions, 52 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
index dc4bcd5..735e372 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
@@ -42,10 +42,6 @@ import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.till.nv.rrNvReadable;
-//import kafka.javaapi.producer.Producer;
-//import kafka.producer.KeyedMessage;
-//import kafka.producer.ProducerConfig;
-//import kafka.producer.KeyedMessage;
/**
* Sends raw JSON objects into Kafka.
@@ -77,8 +73,7 @@ public class KafkaPublisher implements Publisher {
kafkaConnUrl="localhost:9092";
}
- // props.put("bootstrap.servers", bootSever);
- //System.setProperty("java.security.auth.login.config",jaaspath);
+
transferSetting( props, "bootstrap.servers",kafkaConnUrl);
@@ -93,7 +88,7 @@ public class KafkaPublisher implements Publisher {
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- //fProducer = new Producer<String, String>(fConfig);
+
fProducer = new KafkaProducer<>(props);
}
@@ -178,10 +173,10 @@ public class KafkaPublisher implements Publisher {
throws IOException {
log.info("sending " + msgs.size() + " events to [" + topic + "]");
try{
- final List<ProducerRecord<String, String>> kms = new ArrayList<ProducerRecord<String, String>>(msgs.size());
+ final List<ProducerRecord<String, String>> kms = new ArrayList<>(msgs.size());
for (message o : msgs) {
- final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, o.getKey(), o.toString());
+ final ProducerRecord<String, String> data = new ProducerRecord<>(topic, o.getKey(), o.toString());
try {
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java
index 22f0588..8c841d4 100644
--- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java
@@ -46,9 +46,9 @@ public class MemoryMetaBroker implements Broker {
* @param settings
*/
public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb) {
- //public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb, rrNvReadable settings) {
+
fQueue = mq;
- fTopics = new HashMap<String, MemTopic>();
+ fTopics = new HashMap<>();
}
@Override
@@ -190,7 +190,7 @@ public class MemoryMetaBroker implements Broker {
@Override
public Set<String> getOwners() {
- final TreeSet<String> set = new TreeSet<String> ();
+ final TreeSet<String> set = new TreeSet<> ();
set.add ( fOwner );
return set;
}
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
index 5f28367..f0bb982 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
@@ -65,7 +65,7 @@ public class DMaaPCambriaLimiter {
public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings)
throws missingReqdSetting, invalidSettingValue {
fRateInfo = new HashMap<String, RateInfo>();
- fRateInfoCheck = new HashMap<String, RateInfoCheck>();
+ fRateInfoCheck = new HashMap<>();
fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
@@ -112,7 +112,7 @@ public class DMaaPCambriaLimiter {
*/
public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
fRateInfo = new HashMap<String, RateInfo>();
- fRateInfoCheck = new HashMap<String, RateInfoCheck>();
+ fRateInfoCheck = new HashMap<>();
fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
fWindowLengthMins = windowLengthMins;
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
index 6fc0838..e4e09c8 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
@@ -46,8 +46,8 @@ import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
import com.att.dmf.mr.backends.kafka.LiveLockAvoidance;
import com.att.dmf.mr.constants.CambriaConstants;
import com.att.dmf.mr.utils.ConfigurationReader;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
@@ -58,12 +58,9 @@ import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
*/
public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
- // private static final Logger log = LoggerFactory
- // .getLogger(DMaaPKafkaConsumerFactory.class);
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
- // @Autowired
- // private KafkaLiveLockAvoider kafkaLiveLockAvoider = new
- // KafkaLiveLockAvoider();
+
/**
* constructor initialization
@@ -106,8 +103,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
final boolean isCacheEnabled = kSetting_EnableCache;
- // fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId,
- // metrics) : null;
+
fCache = null;
if (isCacheEnabled) {
fCache = KafkaConsumerCache.getInstance();
@@ -195,8 +191,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
long fCreateTimeMs = System.currentTimeMillis();
KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
- kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);// ,fCache.getkafkaLiveLockAvoiderObj()
- // );
+ kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);
log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
if (fCache != null) {
@@ -265,10 +260,9 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
private void transferSettingIfProvided(Properties target, String key, String prefix) {
String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
- // if (fSettings.hasValueFor(makeLongKey(key, prefix))) {
+
if (null != keyVal) {
- // final String val = fSettings
- // .getString(makeLongKey(key, prefix), "");
+
log.info("Setting [" + key + "] to " + keyVal + ".");
target.put(key, keyVal);
}
@@ -294,10 +288,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
props.put("group.id", fakeGroupName);
props.put("enable.auto.commit", "false"); // 0.11
props.put("bootstrap.servers", fkafkaBrokers);
- /*props.put("sasl.jaas.config",
- "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");*/
+
+
props.put("client.id", consumerId);
// additional settings: start with our defaults, then pull in configured
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java b/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java
index e29403f..963ff2d 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java
@@ -23,7 +23,7 @@ package com.att.dmf.mr.beans;
import java.security.Key;
-//import org.apache.log4-j.Logger;
+
import org.springframework.beans.factory.annotation.Autowired;
import com.att.dmf.mr.constants.CambriaConstants;
@@ -48,11 +48,11 @@ import com.att.nsa.util.rrConvertor;
*/
public class DMaaPNsaApiDb {
- //private rrNvReadable settings;
+
private DMaaPZkConfigDb cdb;
//private static final Logger log = Logger
- // .getLogger(DMaaPNsaApiDb.class.toString());
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPNsaApiDb.class);
/**
@@ -63,7 +63,7 @@ public class DMaaPNsaApiDb {
*/
@Autowired
public DMaaPNsaApiDb(rrNvReadable settings, DMaaPZkConfigDb cdb) {
- //this.setSettings(settings);
+
this.setCdb(cdb);
}
/**
@@ -79,16 +79,16 @@ public class DMaaPNsaApiDb {
missingReqdSetting {
// Cambria uses an encrypted api key db
- //final String keyBase64 = settings.getString("cambria.secureConfig.key", null);
+
final String keyBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.key");
- // final String initVectorBase64 = settings.getString( "cambria.secureConfig.iv", null);
+
final String initVectorBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.iv");
// if neither value was provided, don't encrypt api key db
if (keyBase64 == null && initVectorBase64 == null) {
log.info("This server is configured to use an unencrypted API key database. See the settings documentation.");
- return new BaseNsaApiDbImpl<NsaSimpleApiKey>(cdb,
+ return new BaseNsaApiDbImpl<>(cdb,
new NsaSimpleApiKeyFactory());
} else if (keyBase64 == null) {
// neither or both, otherwise something's goofed
@@ -100,7 +100,7 @@ public class DMaaPNsaApiDb {
log.info("This server is configured to use an encrypted API key database.");
final Key key = EncryptingLayer.readSecretKey(keyBase64);
final byte[] iv = rrConvertor.base64Decode(initVectorBase64);
- return new EncryptingApiDbImpl<NsaSimpleApiKey>(cdb,
+ return new EncryptingApiDbImpl<>(cdb,
new NsaSimpleApiKeyFactory(), key, iv);
}
}
@@ -109,17 +109,17 @@ public class DMaaPNsaApiDb {
* @return
* returns settings
*/
-/* public rrNvReadable getSettings() {
- return settings;
- }*/
+
+
+
/**
* @param settings
* set settings
*/
- /*public void setSettings(rrNvReadable settings) {
- this.settings = settings;
- }*/
+
+
+
/**
* @return
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java b/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java
index d543721..5aa25fa 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java
@@ -26,7 +26,7 @@ import org.springframework.beans.factory.annotation.Qualifier;
import com.att.dmf.mr.utils.ConfigurationReader;
import com.att.nsa.configs.confimpl.ZkConfigDb;
import com.att.nsa.drumlin.till.nv.rrNvReadable;
-//import com.att.nsa.configs.confimpl.ZkConfigDb;
+
/**
* Provide the zookeeper config db connection
* @author nilanjana.maity
@@ -42,7 +42,7 @@ public class DMaaPZkConfigDb extends ZkConfigDb {
public DMaaPZkConfigDb(@Qualifier("dMaaPZkClient") DMaaPZkClient zk,
@Qualifier("propertyReader") rrNvReadable settings) {
- //super(com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)==null?CambriaConstants.kDefault_ZkConfigDbRoot:com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot));
+
super(ConfigurationReader.getMainZookeeperConnectionString(),ConfigurationReader.getMainZookeeperConnectionSRoot());
}
diff --git a/src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java b/src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java
index 7f27798..f61b6ea 100644
--- a/src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java
+++ b/src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java
@@ -51,7 +51,7 @@ public class DME2EndPointLoader {
private String protocol;
private String serviceURL;
private static DME2EndPointLoader loader = new DME2EndPointLoader();
-// private static final Logger LOG = LoggerFactory.getLogger(EventsServiceImpl.class);
+
private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
private DME2EndPointLoader() {
}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java
index 0993aa6..4b219b1 100644
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java
@@ -21,11 +21,11 @@
*******************************************************************************/
package com.att.dmf.mr.metrics.publisher;
-//import org.slf4j.Logger;
+
//
import com.att.eelf.configuration.EELFLogger;
-//import com.att.eelf.configuration.EELFManager;
+
/**
*
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java
index 1510c32..46dfa99 100644
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java
@@ -95,7 +95,7 @@ public class CambriaPublisherUtility
*/
public static List<HttpHost> createHostsList(Collection<String> hosts)
{
- final ArrayList<HttpHost> convertedHosts = new ArrayList<HttpHost> ();
+ final ArrayList<HttpHost> convertedHosts = new ArrayList<>();
for ( String host : hosts )
{
if ( host.length () == 0 ) continue;