diff options
Diffstat (limited to 'src/main/java/com')
9 files changed, 39 insertions, 52 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java index dc4bcd5..735e372 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java @@ -42,10 +42,6 @@ import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.till.nv.rrNvReadable; -//import kafka.javaapi.producer.Producer; -//import kafka.producer.KeyedMessage; -//import kafka.producer.ProducerConfig; -//import kafka.producer.KeyedMessage; /** * Sends raw JSON objects into Kafka. @@ -77,8 +73,7 @@ public class KafkaPublisher implements Publisher { kafkaConnUrl="localhost:9092"; } - // props.put("bootstrap.servers", bootSever); - //System.setProperty("java.security.auth.login.config",jaaspath); + transferSetting( props, "bootstrap.servers",kafkaConnUrl); @@ -93,7 +88,7 @@ public class KafkaPublisher implements Publisher { props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - //fProducer = new Producer<String, String>(fConfig); + fProducer = new KafkaProducer<>(props); } @@ -178,10 +173,10 @@ public class KafkaPublisher implements Publisher { throws IOException { log.info("sending " + msgs.size() + " events to [" + topic + "]"); try{ - final List<ProducerRecord<String, String>> kms = new ArrayList<ProducerRecord<String, String>>(msgs.size()); + final List<ProducerRecord<String, String>> kms = new ArrayList<>(msgs.size()); for (message o : msgs) { - final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, o.getKey(), o.toString()); + final ProducerRecord<String, String> data = new ProducerRecord<>(topic, o.getKey(), o.toString()); try { diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java index 22f0588..8c841d4 100644 --- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java +++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java @@ -46,9 +46,9 @@ public class MemoryMetaBroker implements Broker { * @param settings */ public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb) { - //public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb, rrNvReadable settings) { + fQueue = mq; - fTopics = new HashMap<String, MemTopic>(); + fTopics = new HashMap<>(); } @Override @@ -190,7 +190,7 @@ public class MemoryMetaBroker implements Broker { @Override public Set<String> getOwners() { - final TreeSet<String> set = new TreeSet<String> (); + final TreeSet<String> set = new TreeSet<> (); set.add ( fOwner ); return set; } diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java index 5f28367..f0bb982 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java @@ -65,7 +65,7 @@ public class DMaaPCambriaLimiter { public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) throws missingReqdSetting, invalidSettingValue { fRateInfo = new HashMap<String, RateInfo>(); - fRateInfoCheck = new HashMap<String, RateInfoCheck>(); + fRateInfoCheck = new HashMap<>(); fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute, CambriaConstants.kDefault_MaxEmptyPollsPerMinute); fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute, @@ -112,7 +112,7 @@ public class DMaaPCambriaLimiter { */ public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) { fRateInfo = new HashMap<String, RateInfo>(); - fRateInfoCheck = new HashMap<String, RateInfoCheck>(); + fRateInfoCheck = new HashMap<>(); fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute); fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute); fWindowLengthMins = windowLengthMins; diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java index 6fc0838..e4e09c8 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java @@ -46,8 +46,8 @@ import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2; import com.att.dmf.mr.backends.kafka.LiveLockAvoidance; import com.att.dmf.mr.constants.CambriaConstants; import com.att.dmf.mr.utils.ConfigurationReader; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; + + import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; @@ -58,12 +58,9 @@ import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; */ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { - // private static final Logger log = LoggerFactory - // .getLogger(DMaaPKafkaConsumerFactory.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class); - // @Autowired - // private KafkaLiveLockAvoider kafkaLiveLockAvoider = new - // KafkaLiveLockAvoider(); + /** * constructor initialization @@ -106,8 +103,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { final boolean isCacheEnabled = kSetting_EnableCache; - // fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId, - // metrics) : null; + fCache = null; if (isCacheEnabled) { fCache = KafkaConsumerCache.getInstance(); @@ -195,8 +191,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId); long fCreateTimeMs = System.currentTimeMillis(); KafkaConsumer<String, String> cc = new KafkaConsumer<>(props); - kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);// ,fCache.getkafkaLiveLockAvoiderObj() - // ); + kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider); log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs)); if (fCache != null) { @@ -265,10 +260,9 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { private void transferSettingIfProvided(Properties target, String key, String prefix) { String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix)); - // if (fSettings.hasValueFor(makeLongKey(key, prefix))) { + if (null != keyVal) { - // final String val = fSettings - // .getString(makeLongKey(key, prefix), ""); + log.info("Setting [" + key + "] to " + keyVal + "."); target.put(key, keyVal); } @@ -294,10 +288,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { props.put("group.id", fakeGroupName); props.put("enable.auto.commit", "false"); // 0.11 props.put("bootstrap.servers", fkafkaBrokers); - /*props.put("sasl.jaas.config", - "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); - props.put("security.protocol", "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN");*/ + + props.put("client.id", consumerId); // additional settings: start with our defaults, then pull in configured diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java b/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java index e29403f..963ff2d 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java @@ -23,7 +23,7 @@ package com.att.dmf.mr.beans; import java.security.Key; -//import org.apache.log4-j.Logger; + import org.springframework.beans.factory.annotation.Autowired; import com.att.dmf.mr.constants.CambriaConstants; @@ -48,11 +48,11 @@ import com.att.nsa.util.rrConvertor; */ public class DMaaPNsaApiDb { - //private rrNvReadable settings; + private DMaaPZkConfigDb cdb; //private static final Logger log = Logger - // .getLogger(DMaaPNsaApiDb.class.toString()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPNsaApiDb.class); /** @@ -63,7 +63,7 @@ public class DMaaPNsaApiDb { */ @Autowired public DMaaPNsaApiDb(rrNvReadable settings, DMaaPZkConfigDb cdb) { - //this.setSettings(settings); + this.setCdb(cdb); } /** @@ -79,16 +79,16 @@ public class DMaaPNsaApiDb { missingReqdSetting { // Cambria uses an encrypted api key db - //final String keyBase64 = settings.getString("cambria.secureConfig.key", null); + final String keyBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.key"); - // final String initVectorBase64 = settings.getString( "cambria.secureConfig.iv", null); + final String initVectorBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.iv"); // if neither value was provided, don't encrypt api key db if (keyBase64 == null && initVectorBase64 == null) { log.info("This server is configured to use an unencrypted API key database. See the settings documentation."); - return new BaseNsaApiDbImpl<NsaSimpleApiKey>(cdb, + return new BaseNsaApiDbImpl<>(cdb, new NsaSimpleApiKeyFactory()); } else if (keyBase64 == null) { // neither or both, otherwise something's goofed @@ -100,7 +100,7 @@ public class DMaaPNsaApiDb { log.info("This server is configured to use an encrypted API key database."); final Key key = EncryptingLayer.readSecretKey(keyBase64); final byte[] iv = rrConvertor.base64Decode(initVectorBase64); - return new EncryptingApiDbImpl<NsaSimpleApiKey>(cdb, + return new EncryptingApiDbImpl<>(cdb, new NsaSimpleApiKeyFactory(), key, iv); } } @@ -109,17 +109,17 @@ public class DMaaPNsaApiDb { * @return * returns settings */ -/* public rrNvReadable getSettings() { - return settings; - }*/ + + + /** * @param settings * set settings */ - /*public void setSettings(rrNvReadable settings) { - this.settings = settings; - }*/ + + + /** * @return diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java b/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java index d543721..5aa25fa 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java @@ -26,7 +26,7 @@ import org.springframework.beans.factory.annotation.Qualifier; import com.att.dmf.mr.utils.ConfigurationReader; import com.att.nsa.configs.confimpl.ZkConfigDb; import com.att.nsa.drumlin.till.nv.rrNvReadable; -//import com.att.nsa.configs.confimpl.ZkConfigDb; + /** * Provide the zookeeper config db connection * @author nilanjana.maity @@ -42,7 +42,7 @@ public class DMaaPZkConfigDb extends ZkConfigDb { public DMaaPZkConfigDb(@Qualifier("dMaaPZkClient") DMaaPZkClient zk, @Qualifier("propertyReader") rrNvReadable settings) { - //super(com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)==null?CambriaConstants.kDefault_ZkConfigDbRoot:com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)); + super(ConfigurationReader.getMainZookeeperConnectionString(),ConfigurationReader.getMainZookeeperConnectionSRoot()); } diff --git a/src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java b/src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java index 7f27798..f61b6ea 100644 --- a/src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java +++ b/src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java @@ -51,7 +51,7 @@ public class DME2EndPointLoader { private String protocol; private String serviceURL; private static DME2EndPointLoader loader = new DME2EndPointLoader(); -// private static final Logger LOG = LoggerFactory.getLogger(EventsServiceImpl.class); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class); private DME2EndPointLoader() { } diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java index 0993aa6..4b219b1 100644 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java @@ -21,11 +21,11 @@ *******************************************************************************/ package com.att.dmf.mr.metrics.publisher; -//import org.slf4j.Logger; + // import com.att.eelf.configuration.EELFLogger; -//import com.att.eelf.configuration.EELFManager; + /** * diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java index 1510c32..46dfa99 100644 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java @@ -95,7 +95,7 @@ public class CambriaPublisherUtility */ public static List<HttpHost> createHostsList(Collection<String> hosts) { - final ArrayList<HttpHost> convertedHosts = new ArrayList<HttpHost> (); + final ArrayList<HttpHost> convertedHosts = new ArrayList<>(); for ( String host : hosts ) { if ( host.length () == 0 ) continue; |