summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/att/dmf/mr/CambriaApiException.java10
-rw-r--r--src/main/java/com/att/dmf/mr/backends/Consumer.java1
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java59
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java8
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java37
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java18
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java37
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java6
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java111
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java36
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java31
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java4
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java28
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java4
-rw-r--r--src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java5
-rw-r--r--src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java2
-rw-r--r--src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java2
-rw-r--r--src/main/java/com/att/dmf/mr/metabroker/Topic.java6
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java4
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java2
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java14
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java10
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java29
-rw-r--r--src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java7
-rw-r--r--src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java2
-rw-r--r--src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java11
-rw-r--r--src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java4
-rw-r--r--src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java58
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java2
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java10
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java22
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java2
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java127
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java4
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java4
-rw-r--r--src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java16
-rw-r--r--src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java41
-rw-r--r--src/main/java/com/att/dmf/mr/utils/PropertyReader.java72
-rw-r--r--src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java4
-rw-r--r--src/main/java/com/att/mr/filter/ContentLengthFilter.java4
-rw-r--r--src/test/java/com/att/nsa/cambria/backends/kafka/CuratorFrameworkImpl.java75
41 files changed, 445 insertions, 484 deletions
diff --git a/src/main/java/com/att/dmf/mr/CambriaApiException.java b/src/main/java/com/att/dmf/mr/CambriaApiException.java
index 84dd32c..cdf95ab 100644
--- a/src/main/java/com/att/dmf/mr/CambriaApiException.java
+++ b/src/main/java/com/att/dmf/mr/CambriaApiException.java
@@ -28,8 +28,12 @@ import com.att.nsa.apiServer.NsaAppException;
public class CambriaApiException extends NsaAppException
{
+ /*
+ * defined long type constant serialVersionUID
+ */
+ private static final long serialVersionUID = 1L;
- private ErrorResponse errRes;
+ private transient ErrorResponse errRes;
/**
* Implements constructor CambriaApiException
* @param jsonObject
@@ -66,10 +70,6 @@ public class CambriaApiException extends NsaAppException
this.errRes = errRes;
}
- /*
- * defined long type constant serialVersionUID
- */
- private static final long serialVersionUID = 1L;
public ErrorResponse getErrRes() {
return errRes;
}
diff --git a/src/main/java/com/att/dmf/mr/backends/Consumer.java b/src/main/java/com/att/dmf/mr/backends/Consumer.java
index 2743fc3..f4a9a80 100644
--- a/src/main/java/com/att/dmf/mr/backends/Consumer.java
+++ b/src/main/java/com/att/dmf/mr/backends/Consumer.java
@@ -21,7 +21,6 @@
*******************************************************************************/
package com.att.dmf.mr.backends;
-import java.util.ArrayList;
/**
* A consumer interface. Consumers pull the next message from a given topic.
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
index 126711a..83c08ec 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
@@ -61,8 +61,8 @@ import com.att.dmf.mr.backends.MetricsSet;
import com.att.dmf.mr.constants.CambriaConstants;
import com.att.dmf.mr.exception.DMaaPErrorMessages;
import com.att.dmf.mr.utils.ConfigurationReader;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.metrics.CdmTimer;
@@ -101,7 +101,7 @@ public class KafkaConsumerCache {
// the server at least every 30 seconds, timing out after 2 minutes should
// be okay.
// FIXME: consider allowing the client to specify its expected call rate?
- private static final long kDefault_MustTouchEveryMs = 1000 * 60 * 2;
+ private static final long kDefault_MustTouchEveryMs = 1000L*60*2;
// check for expirations pretty regularly
private static final long kDefault_SweepEverySeconds = 15;
@@ -110,16 +110,13 @@ public class KafkaConsumerCache {
NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
}
- // @Qualifier("kafkalockavoid")
-
- // @Resource
- // @Qualifier("kafkalockavoid")
- // KafkaLiveLockAvoider2 kafkaLiveLockAvoider;
+
+
@Autowired
private DMaaPErrorMessages errorMessages;
- // KafkaLiveLockAvoider kafkaLiveLockAvoider = new KafkaLiveLockAvoider();
+
/**
* User defined exception class for kafka consumer cache
*
@@ -267,8 +264,8 @@ public class KafkaConsumerCache {
EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
ensurePath.ensure(curator.getZookeeperClient());
- // final long freq = fSettings.getLong(kSetting_SweepEverySeconds,
- // kDefault_SweepEverySeconds);
+
+
long freq = kDefault_SweepEverySeconds;
String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
kSetting_SweepEverySeconds);
@@ -301,8 +298,8 @@ public class KafkaConsumerCache {
try {
curator.blockUntilConnected();
} catch (InterruptedException e) {
- // Ignore
- log.error("error while setting curator framework :" + e.getMessage());
+ log.error("error while setting curator framework :",e);
+ Thread.currentThread().interrupt();
}
}
@@ -393,8 +390,8 @@ public class KafkaConsumerCache {
if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
throw new KafkaConsumerCacheException("The cache service is unavailable.");
ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
- // final String consumerKey = makeConsumerKey(topic, consumerGroupId,
- // clientId);
+
+
Enumeration<String> strEnum = fConsumers.keys();
String consumerLocalKey = null;
while (strEnum.hasMoreElements()) {
@@ -402,9 +399,9 @@ public class KafkaConsumerCache {
if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) {
- // System.out.println("consumer key returning from
- // getConsumerListforCG +++++++++ " + consumerLocalKey
- // + " " + fConsumers.get(consumerLocalKey));
+
+
+
kcl.add(fConsumers.get(consumerLocalKey));
}
@@ -417,8 +414,7 @@ public class KafkaConsumerCache {
if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
throw new KafkaConsumerCacheException("The cache service is unavailable.");
ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
- // final String consumerKey = makeConsumerKey(topic, consumerGroupId,
- // clientId);
+
Enumeration<String> strEnum = fConsumers.keys();
String consumerLocalKey = null;
while (strEnum.hasMoreElements()) {
@@ -426,9 +422,7 @@ public class KafkaConsumerCache {
if (consumerLocalKey.startsWith(group)) {
- // System.out.println("consumer key returning from
- // getConsumerListforCG +++++++++ " + consumerLocalKey
- // + " " + fConsumers.get(consumerLocalKey));
+
kcl.add(fConsumers.get(consumerLocalKey));
}
@@ -454,7 +448,7 @@ public class KafkaConsumerCache {
final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
fConsumers.put(consumerKey, consumer);
- // String appId = "node-instance-"+i;
+
log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId);
}
@@ -517,7 +511,8 @@ public class KafkaConsumerCache {
consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
Thread.sleep(consumerHandoverWaitMs);
} catch (InterruptedException e) {
- // Ignore
+ log.error("InterruptedException in dropTimedOutConsumer",e);
+ Thread.currentThread().interrupt();
}
log.info("Dropped " + key + " consumer due to timeout");
}
@@ -549,7 +544,7 @@ public class KafkaConsumerCache {
final Kafka011Consumer kc = fConsumers.get(key);
log.info("closing Kafka consumer " + key + " object " + kc);
if (kc != null) {
- // log.info("closing Kafka consumer " + key);
+
if (kc.close()) {
fConsumers.remove(key);
@@ -644,9 +639,8 @@ public class KafkaConsumerCache {
throws KafkaConsumerCacheException {
// get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
- final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership");
- try {
+ try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) {
final String consumerPath = fBaseZkPath + "/" + consumerKey;
log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
final CuratorFramework curator = ConfigurationReader.getCurator();
@@ -673,7 +667,8 @@ public class KafkaConsumerCache {
consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
Thread.sleep(consumerHandoverWaitMs);
} catch (InterruptedException e) {
- // Ignore
+ log.error("InterruptedException in signalOwnership",e);
+ Thread.currentThread().interrupt();
}
}
@@ -690,8 +685,7 @@ public class KafkaConsumerCache {
mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
}
- // final long mustTouchEveryMs =
- // fSettings.getLong(kSetting_TouchEveryMs, kDefault_MustTouchEveryMs);
+
final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) {
@@ -744,6 +738,5 @@ public class KafkaConsumerCache {
}
private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
- // private static final Logger log =
- // LoggerFactory.getLogger(KafkaConsumerCache.class);
+
} \ No newline at end of file
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java
index 805701a..f521b41 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java
@@ -35,9 +35,7 @@ import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Component;
-import org.springframework.stereotype.Service;
//@ComponentScan(basePackages="com.att.dmf.mr.backends.kafka")
@Component
@@ -57,7 +55,7 @@ public class KafkaLiveLockAvoider2 {
@PostConstruct
public void init() {
- System.out.println("Welcome......................................................................................");
+ log.info("Welcome......................................................................................");
try {
if (curatorFramework.checkExists().forPath(locksPath) == null) {
curatorFramework.create().creatingParentsIfNeeded().forPath(locksPath);
@@ -67,7 +65,7 @@ public class KafkaLiveLockAvoider2 {
}
} catch (Exception e) {
- //e.printStackTrace();
+
log.error("Error during creation of permanent Znodes under /live-lock-avoid ",e);
}
@@ -138,7 +136,7 @@ public class KafkaLiveLockAvoider2 {
protected void assignNewProcessNode(String appId, Watcher processNodeWatcher ) {
String taskHolderZnodePath = ZNODE_ROOT+ZNODE_UNSTICK_TASKS+"/"+appId;
- //Watcher processNodeWatcher = createWatcher();
+
try {
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
index 30209f0..735e372 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
@@ -41,11 +41,7 @@ import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.till.nv.rrNvReadable;
-//import kafka.FailedToSendMessageException;
-//import kafka.javaapi.producer.Producer;
-//import kafka.producer.KeyedMessage;
-//import kafka.producer.ProducerConfig;
-//import kafka.producer.KeyedMessage;
+
/**
* Sends raw JSON objects into Kafka.
@@ -76,26 +72,23 @@ public class KafkaPublisher implements Publisher {
kafkaConnUrl="localhost:9092";
}
- //String jaaspath="C:/ATT/Apps/dmaapCodedmaap-framework/dmaap/bundleconfig-local/etc/appprops/kafka_pub_jaas.conf";
- // props.put("bootstrap.servers", bootSever);
- //System.setProperty("java.security.auth.login.config",jaaspath);
+
- /*transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- transferSetting( props, "security.protocol", "SASL_PLAINTEXT");
- transferSetting( props, "sasl.mechanism", "PLAIN");*/
+
+
transferSetting( props, "bootstrap.servers",kafkaConnUrl);
- //transferSetting( props, "metadata.broker.list", kafkaConnUrl);
+
transferSetting( props, "request.required.acks", "1");
transferSetting( props, "message.send.max.retries", "5");
transferSetting(props, "retry.backoff.ms", "150");
- //props.put("serializer.class", "kafka.serializer.StringEncoder");
+
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- //fConfig = new ProducerConfig(props);
- //fProducer = new Producer<String, String>(fConfig);
+
+
fProducer = new KafkaProducer<>(props);
}
@@ -180,11 +173,11 @@ public class KafkaPublisher implements Publisher {
throws IOException {
log.info("sending " + msgs.size() + " events to [" + topic + "]");
try{
- final List<ProducerRecord<String, String>> kms = new ArrayList<ProducerRecord<String, String>>(msgs.size());
+ final List<ProducerRecord<String, String>> kms = new ArrayList<>(msgs.size());
for (message o : msgs) {
- final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, o.getKey(), o.toString());
- //kms.add(data);
+ final ProducerRecord<String, String> data = new ProducerRecord<>(topic, o.getKey(), o.toString());
+
try {
@@ -200,7 +193,7 @@ try{
}
//private final rrNvReadable fSettings;
- //private ProducerConfig fConfig;
+
private Producer<String, String> fProducer;
/**
@@ -227,9 +220,5 @@ try{
}
- //@Override
- //public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException {
- // TODO Auto-generated method stub
-
- //}
+
} \ No newline at end of file
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java
index 0c34bfd..237cac8 100644
--- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java
@@ -35,6 +35,9 @@ import com.att.dmf.mr.backends.ConsumerFactory;
*/
public class MemoryConsumerFactory implements ConsumerFactory
{
+
+ private final MemoryQueue fQueue;
+
/**
*
* Initializing constructor
@@ -59,8 +62,6 @@ public class MemoryConsumerFactory implements ConsumerFactory
return new MemoryConsumer ( topic, consumerGroupId );
}
- private final MemoryQueue fQueue;
-
/**
*
* Define nested inner class
@@ -68,6 +69,12 @@ public class MemoryConsumerFactory implements ConsumerFactory
*/
private class MemoryConsumer implements Consumer
{
+
+ private final String fTopic;
+ private final String fConsumer;
+ private final long fCreateMs;
+ private long fLastAccessMs;
+
/**
*
* Initializing MemoryConsumer constructor
@@ -93,11 +100,6 @@ public class MemoryConsumerFactory implements ConsumerFactory
return fQueue.get ( fTopic, fConsumer );
}
- private final String fTopic;
- private final String fConsumer;
- private final long fCreateMs;
- private long fLastAccessMs;
-
@Override
public boolean close() {
//Nothing to close/clean up.
@@ -168,7 +170,7 @@ public class MemoryConsumerFactory implements ConsumerFactory
*/
public Collection<? extends Consumer> getConsumers ()
{
- return new ArrayList<MemoryConsumer> ();
+ return new ArrayList<> ();
}
@Override
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java
index 22f0588..e0c80bd 100644
--- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java
@@ -39,6 +39,10 @@ import com.att.nsa.security.NsaApiKey;
*
*/
public class MemoryMetaBroker implements Broker {
+
+ private final MemoryQueue fQueue;
+ private final HashMap<String, MemTopic> fTopics;
+
/**
*
* @param mq
@@ -46,9 +50,9 @@ public class MemoryMetaBroker implements Broker {
* @param settings
*/
public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb) {
- //public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb, rrNvReadable settings) {
+
fQueue = mq;
- fTopics = new HashMap<String, MemTopic>();
+ fTopics = new HashMap<>();
}
@Override
@@ -78,10 +82,16 @@ public class MemoryMetaBroker implements Broker {
fQueue.removeTopic(topic);
}
- private final MemoryQueue fQueue;
- private final HashMap<String, MemTopic> fTopics;
-
private static class MemTopic implements Topic {
+
+ private final String fName;
+ private final String fDesc;
+ private final String fOwner;
+ private NsaAcl fReaders;
+ private NsaAcl fWriters;
+ private boolean ftransactionEnabled;
+ private String accessDenied = "User does not own this topic ";
+
/**
* constructor initialization
*
@@ -141,7 +151,7 @@ public class MemoryMetaBroker implements Broker {
@Override
public void permitWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException {
if (!fOwner.equals(asUser.getKey())) {
- throw new AccessDeniedException("User does not own this topic " + fName);
+ throw new AccessDeniedException(accessDenied + fName);
}
if (fWriters == null) {
fWriters = new NsaAcl();
@@ -152,7 +162,7 @@ public class MemoryMetaBroker implements Broker {
@Override
public void denyWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException {
if (!fOwner.equals(asUser.getKey())) {
- throw new AccessDeniedException("User does not own this topic " + fName);
+ throw new AccessDeniedException(accessDenied + fName);
}
fWriters.remove(publisherId);
}
@@ -160,7 +170,7 @@ public class MemoryMetaBroker implements Broker {
@Override
public void permitReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException {
if (!fOwner.equals(asUser.getKey())) {
- throw new AccessDeniedException("User does not own this topic " + fName);
+ throw new AccessDeniedException(accessDenied + fName);
}
if (fReaders == null) {
fReaders = new NsaAcl();
@@ -171,18 +181,11 @@ public class MemoryMetaBroker implements Broker {
@Override
public void denyReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException {
if (!fOwner.equals(asUser.getKey())) {
- throw new AccessDeniedException("User does not own this topic " + fName);
+ throw new AccessDeniedException(accessDenied + fName);
}
fReaders.remove(consumerId);
}
- private final String fName;
- private final String fDesc;
- private final String fOwner;
- private NsaAcl fReaders;
- private NsaAcl fWriters;
- private boolean ftransactionEnabled;
-
@Override
public boolean isTransactionEnabled() {
return ftransactionEnabled;
@@ -190,7 +193,7 @@ public class MemoryMetaBroker implements Broker {
@Override
public Set<String> getOwners() {
- final TreeSet<String> set = new TreeSet<String> ();
+ final TreeSet<String> set = new TreeSet<> ();
set.add ( fOwner );
return set;
}
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java
index 0629972..8ab4619 100644
--- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java
@@ -43,7 +43,7 @@ public class MemoryQueue {
* constructor storing hashMap objects in Queue and Offsets object
*/
public MemoryQueue() {
- fQueue = new HashMap<String, LogBuffer>();
+ fQueue = new HashMap<>();
fOffsets = new HashMap<String, HashMap<String, Integer>>();
}
@@ -102,7 +102,7 @@ public class MemoryQueue {
HashMap<String, Integer> offsetMap = fOffsets.get(consumerName);
if (offsetMap == null) {
- offsetMap = new HashMap<String, Integer>();
+ offsetMap = new HashMap<>();
fOffsets.put(consumerName, offsetMap);
}
Integer offset = offsetMap.get(topic);
@@ -169,7 +169,7 @@ public class MemoryQueue {
public LogBuffer(int maxSize) {
fBaseOffset = 0;
fMaxSize = maxSize;
- fList = new ArrayList<String>();
+ fList = new ArrayList<>();
}
/**
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
index 5f28367..8cbf64f 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
@@ -35,9 +35,6 @@ import com.att.dmf.mr.exception.DMaaPResponseCode;
import com.att.dmf.mr.exception.ErrorResponse;
import com.att.dmf.mr.utils.Utils;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
@@ -54,6 +51,15 @@ import com.att.nsa.metrics.impl.CdmRateTicker;
*/
@Component
public class DMaaPCambriaLimiter {
+ private final HashMap<String, RateInfo> fRateInfo;
+ private final HashMap<String, RateInfoCheck> fRateInfoCheck;
+ private final double fMaxEmptyPollsPerMinute;
+ private final double fMaxPollsPerMinute;
+ private final int fWindowLengthMins;
+ private final long fSleepMs;
+ private final long fSleepMs1;
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
+
/**
* constructor initializes
*
@@ -62,10 +68,9 @@ public class DMaaPCambriaLimiter {
* @throws invalidSettingValue
*/
@Autowired
- public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings)
- throws missingReqdSetting, invalidSettingValue {
- fRateInfo = new HashMap<String, RateInfo>();
- fRateInfoCheck = new HashMap<String, RateInfoCheck>();
+ public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) {
+ fRateInfo = new HashMap<>();
+ fRateInfoCheck = new HashMap<>();
fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
@@ -78,19 +83,7 @@ public class DMaaPCambriaLimiter {
5000);
}
-
- /**
- * static method provide the sleep time
- *
- * @param ratePerMinute
- * @return
- */
- public static long getSleepMsForRate(double ratePerMinute) {
- if (ratePerMinute <= 0.0)
- return 0;
- return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
- }
-
+
/**
* Construct a rate limiter.
*
@@ -111,8 +104,8 @@ public class DMaaPCambriaLimiter {
* @param windowLengthMins
*/
public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
- fRateInfo = new HashMap<String, RateInfo>();
- fRateInfoCheck = new HashMap<String, RateInfoCheck>();
+ fRateInfo = new HashMap<>();
+ fRateInfoCheck = new HashMap<>();
fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
fWindowLengthMins = windowLengthMins;
@@ -121,6 +114,18 @@ public class DMaaPCambriaLimiter {
}
/**
+ * static method provide the sleep time
+ *
+ * @param ratePerMinute
+ * @return
+ */
+ public static long getSleepMsForRate(double ratePerMinute) {
+ if (ratePerMinute <= 0.0)
+ return 0;
+ return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
+ }
+
+ /**
* Tell the rate limiter about a call to a topic/group/id. If the rate is
* too high, this call delays its return and throws an exception.
*
@@ -151,6 +156,7 @@ public class DMaaPCambriaLimiter {
log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
}
} catch (InterruptedException e) {
+ log.error("Exception "+ e);
// ignore
}
@@ -163,37 +169,7 @@ public class DMaaPCambriaLimiter {
log.info(errRes.toString());
throw new CambriaApiException(errRes);
}
- /*if (fMaxPollsPerMinute <= 0) {
- return;
- }
- final RateInfoCheck ric = getRateInfoCheck(topic, consumerGroup, clientId);
- final double ratevalue = ric.onCall();
- if (ratevalue > fMaxPollsPerMinute) {
- try {
- log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute
- + ".");
- if (fSleepMs1 > fMaxPollsPerMinute) {
- log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
- + " ms sleep, then responding in error.");
- Thread.sleep(fSleepMs1);
- ric.reset();
- } else {
- log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
- }
- } catch (InterruptedException e) {
- // ignore
- }
-
-
- ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests,
- DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
- "This client is making too many requests "
- + ",decrease the number of requests. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
-
- log.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }*/
}
@@ -213,6 +189,8 @@ public class DMaaPCambriaLimiter {
}
private static class RateInfo {
+ private final String fLabel;
+ private final CdmRateTicker fCallRateSinceLastMsgSend;
/**
* constructor initialzes
*
@@ -244,14 +222,14 @@ public class DMaaPCambriaLimiter {
fCallRateSinceLastMsgSend.tick();
return fCallRateSinceLastMsgSend.getRate();
}
-
- private final String fLabel;
- private final CdmRateTicker fCallRateSinceLastMsgSend;
}
private static class RateInfoCheck {
+
+ private final String fLabel;
+ private final CdmRateTicker fCallRateSinceLastMsgSend;
/**
* constructor initialzes
*
@@ -283,21 +261,10 @@ public class DMaaPCambriaLimiter {
fCallRateSinceLastMsgSend.tick();
return fCallRateSinceLastMsgSend.getRate();
}
-
- private final String fLabel;
- private final CdmRateTicker fCallRateSinceLastMsgSend;
}
- private final HashMap<String, RateInfo> fRateInfo;
- private final HashMap<String, RateInfoCheck> fRateInfoCheck;
- private final double fMaxEmptyPollsPerMinute;
- private final double fMaxPollsPerMinute;
- private final int fWindowLengthMins;
- private final long fSleepMs;
- private final long fSleepMs1;
- //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class);
- private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
+
private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
final String key = makeKey(topic, consumerGroup, clientId);
@@ -310,15 +277,7 @@ public class DMaaPCambriaLimiter {
}
- private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) {
- final String key = makeKey(topic, consumerGroup, clientId);
- RateInfoCheck ri = fRateInfoCheck.get(key);
- if (ri == null) {
- ri = new RateInfoCheck(key, 1);
- fRateInfoCheck.put(key, ri);
- }
- return ri;
- }
+
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
index 6fc0838..f60fd53 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
@@ -46,8 +46,8 @@ import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
import com.att.dmf.mr.backends.kafka.LiveLockAvoidance;
import com.att.dmf.mr.constants.CambriaConstants;
import com.att.dmf.mr.utils.ConfigurationReader;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
@@ -58,12 +58,9 @@ import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
*/
public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
- // private static final Logger log = LoggerFactory
- // .getLogger(DMaaPKafkaConsumerFactory.class);
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
- // @Autowired
- // private KafkaLiveLockAvoider kafkaLiveLockAvoider = new
- // KafkaLiveLockAvoider();
+
/**
* constructor initialization
@@ -106,8 +103,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
final boolean isCacheEnabled = kSetting_EnableCache;
- // fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId,
- // metrics) : null;
+
fCache = null;
if (isCacheEnabled) {
fCache = KafkaConsumerCache.getInstance();
@@ -189,14 +185,15 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
+ "], on topic [" + topic + "].");
-
- fCache.signalOwnership(topic, consumerGroupName, consumerId);
-
+
+ if (fCache != null) {
+ fCache.signalOwnership(topic, consumerGroupName, consumerId);
+ }
+
final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
long fCreateTimeMs = System.currentTimeMillis();
KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
- kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);// ,fCache.getkafkaLiveLockAvoiderObj()
- // );
+ kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);
log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
if (fCache != null) {
@@ -265,10 +262,9 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
private void transferSettingIfProvided(Properties target, String key, String prefix) {
String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
- // if (fSettings.hasValueFor(makeLongKey(key, prefix))) {
+
if (null != keyVal) {
- // final String val = fSettings
- // .getString(makeLongKey(key, prefix), "");
+
log.info("Setting [" + key + "] to " + keyVal + ".");
target.put(key, keyVal);
}
@@ -294,10 +290,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
props.put("group.id", fakeGroupName);
props.put("enable.auto.commit", "false"); // 0.11
props.put("bootstrap.servers", fkafkaBrokers);
- /*props.put("sasl.jaas.config",
- "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");*/
+
+
props.put("client.id", consumerId);
// additional settings: start with our defaults, then pull in configured
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
index 643eae9..4bef985 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
@@ -50,7 +50,7 @@ import com.att.dmf.mr.utils.ConfigurationReader;
//import org.apache.log4-j.Logger;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-//import com.att.dmf.mr.backends.kafka.kafka011.SettingsUtil;
+
import com.att.nsa.configs.ConfigDb;
import com.att.nsa.configs.ConfigDbException;
import com.att.nsa.configs.ConfigPath;
@@ -85,11 +85,9 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
- /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");*/
+
fKafkaAdminClient=AdminClient.create ( props );
- // fKafkaAdminClient = null;
+
}
//private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
@@ -122,23 +120,21 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
- /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");*/
+
fKafkaAdminClient=AdminClient.create ( props );
- // fKafkaAdminClient = null;
+
}
public DMaaPKafkaMetaBroker( rrNvReadable settings,
ZkClient zk, ConfigDb configDb,AdminClient client) {
- //fSettings = settings;
+
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
fKafkaAdminClient= client;
- // fKafkaAdminClient = null;
+
}
@@ -235,13 +231,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
}
catch ( InterruptedException e )
{
- //timer.fail ( "Timeout" );
+
log.warn ( "Execution of describeTopics timed out." );
throw new ConfigDbException ( e );
}
catch ( ExecutionException e )
{
- //timer.fail ( "ExecutionError" );
+
log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
throw new ConfigDbException ( e.getCause () );
}
@@ -256,16 +252,11 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
log.info("Loading zookeeper client for topic deletion.");
// topic creation. (Otherwise, the topic is only partially created
// in ZK.)
- /*zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000,
- ZKStringSerializer$.MODULE$);
- String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
- if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers;
- ZkUtils zkutils =new ZkUtils(zkClient , new ZkConnection(strkSettings_KafkaZookeeper),false);
- */
+
fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
log.info("Zookeeper client loaded successfully. Deleting topic.");
- //AdminUtils.deleteTopic(zkutils, topic);
+
} catch (Exception e) {
log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
throw new ConfigDbException(e);
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java b/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java
index 9942837..4c9532b 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java
@@ -84,9 +84,9 @@ public class DMaaPMetricsSet extends CdmMetricsRegistryImpl implements MetricsSe
*
* @param cs
*/
- //public DMaaPMetricsSet() {
+
public DMaaPMetricsSet(rrNvReadable cs) {
- //fSettings = cs;
+
fVersion = new CdmStringConstant("Version " + CambriaApiVersionInfo.getVersion());
super.putItem("version", fVersion);
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java b/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java
index e29403f..963ff2d 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java
@@ -23,7 +23,7 @@ package com.att.dmf.mr.beans;
import java.security.Key;
-//import org.apache.log4-j.Logger;
+
import org.springframework.beans.factory.annotation.Autowired;
import com.att.dmf.mr.constants.CambriaConstants;
@@ -48,11 +48,11 @@ import com.att.nsa.util.rrConvertor;
*/
public class DMaaPNsaApiDb {
- //private rrNvReadable settings;
+
private DMaaPZkConfigDb cdb;
//private static final Logger log = Logger
- // .getLogger(DMaaPNsaApiDb.class.toString());
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPNsaApiDb.class);
/**
@@ -63,7 +63,7 @@ public class DMaaPNsaApiDb {
*/
@Autowired
public DMaaPNsaApiDb(rrNvReadable settings, DMaaPZkConfigDb cdb) {
- //this.setSettings(settings);
+
this.setCdb(cdb);
}
/**
@@ -79,16 +79,16 @@ public class DMaaPNsaApiDb {
missingReqdSetting {
// Cambria uses an encrypted api key db
- //final String keyBase64 = settings.getString("cambria.secureConfig.key", null);
+
final String keyBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.key");
- // final String initVectorBase64 = settings.getString( "cambria.secureConfig.iv", null);
+
final String initVectorBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.iv");
// if neither value was provided, don't encrypt api key db
if (keyBase64 == null && initVectorBase64 == null) {
log.info("This server is configured to use an unencrypted API key database. See the settings documentation.");
- return new BaseNsaApiDbImpl<NsaSimpleApiKey>(cdb,
+ return new BaseNsaApiDbImpl<>(cdb,
new NsaSimpleApiKeyFactory());
} else if (keyBase64 == null) {
// neither or both, otherwise something's goofed
@@ -100,7 +100,7 @@ public class DMaaPNsaApiDb {
log.info("This server is configured to use an encrypted API key database.");
final Key key = EncryptingLayer.readSecretKey(keyBase64);
final byte[] iv = rrConvertor.base64Decode(initVectorBase64);
- return new EncryptingApiDbImpl<NsaSimpleApiKey>(cdb,
+ return new EncryptingApiDbImpl<>(cdb,
new NsaSimpleApiKeyFactory(), key, iv);
}
}
@@ -109,17 +109,17 @@ public class DMaaPNsaApiDb {
* @return
* returns settings
*/
-/* public rrNvReadable getSettings() {
- return settings;
- }*/
+
+
+
/**
* @param settings
* set settings
*/
- /*public void setSettings(rrNvReadable settings) {
- this.settings = settings;
- }*/
+
+
+
/**
* @return
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java b/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java
index d543721..5aa25fa 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java
@@ -26,7 +26,7 @@ import org.springframework.beans.factory.annotation.Qualifier;
import com.att.dmf.mr.utils.ConfigurationReader;
import com.att.nsa.configs.confimpl.ZkConfigDb;
import com.att.nsa.drumlin.till.nv.rrNvReadable;
-//import com.att.nsa.configs.confimpl.ZkConfigDb;
+
/**
* Provide the zookeeper config db connection
* @author nilanjana.maity
@@ -42,7 +42,7 @@ public class DMaaPZkConfigDb extends ZkConfigDb {
public DMaaPZkConfigDb(@Qualifier("dMaaPZkClient") DMaaPZkClient zk,
@Qualifier("propertyReader") rrNvReadable settings) {
- //super(com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)==null?CambriaConstants.kDefault_ZkConfigDbRoot:com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot));
+
super(ConfigurationReader.getMainZookeeperConnectionString(),ConfigurationReader.getMainZookeeperConnectionSRoot());
}
diff --git a/src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java b/src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java
index a971c3f..db691bd 100644
--- a/src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java
+++ b/src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java
@@ -35,7 +35,7 @@ import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
import org.apache.http.HttpStatus;
-//import org.apache.log-4j.Logger;
+
import org.springframework.beans.factory.annotation.Autowired;
import com.att.eelf.configuration.EELFLogger;
@@ -51,8 +51,7 @@ import com.att.eelf.configuration.EELFManager;
@Singleton
public class DMaaPWebExceptionMapper implements ExceptionMapper<WebApplicationException>{
- //private static final Logger LOGGER = Logger
- // .getLogger(DMaaPWebExceptionMapper.class);
+
private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(DMaaPWebExceptionMapper.class);
private ErrorResponse errRes;
diff --git a/src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java b/src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java
index 6022b91..64b20e8 100644
--- a/src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java
+++ b/src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java
@@ -35,7 +35,7 @@ import com.att.eelf.configuration.EELFManager;
public class CambriaServletContextListener implements ServletContextListener {
DME2EndPointLoader loader = DME2EndPointLoader.getInstance();
-// private static Logger log = Logger.getLogger(CambriaServletContextListener.class);
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaServletContextListener.class);
diff --git a/src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java b/src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java
index 7f27798..f61b6ea 100644
--- a/src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java
+++ b/src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java
@@ -51,7 +51,7 @@ public class DME2EndPointLoader {
private String protocol;
private String serviceURL;
private static DME2EndPointLoader loader = new DME2EndPointLoader();
-// private static final Logger LOG = LoggerFactory.getLogger(EventsServiceImpl.class);
+
private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
private DME2EndPointLoader() {
}
diff --git a/src/main/java/com/att/dmf/mr/metabroker/Topic.java b/src/main/java/com/att/dmf/mr/metabroker/Topic.java
index 422a2cc..d191070 100644
--- a/src/main/java/com/att/dmf/mr/metabroker/Topic.java
+++ b/src/main/java/com/att/dmf/mr/metabroker/Topic.java
@@ -39,16 +39,16 @@ public interface Topic extends ReadWriteSecuredResource
*
*//*
public class AccessDeniedException extends Exception
- {
+
*//**
* AccessDenied Description
*//*
- public AccessDeniedException () { super ( "Access denied." ); }
+
*//**
* AccessDenied Exception for the user while authenticating the user request
* @param user
*//*
- public AccessDeniedException ( String user ) { super ( "Access denied for " + user ); }
+
private static final long serialVersionUID = 1L;
}*/
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java
index 0993aa6..4b219b1 100644
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java
@@ -21,11 +21,11 @@
*******************************************************************************/
package com.att.dmf.mr.metrics.publisher;
-//import org.slf4j.Logger;
+
//
import com.att.eelf.configuration.EELFLogger;
-//import com.att.eelf.configuration.EELFManager;
+
/**
*
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java
index 1510c32..46dfa99 100644
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java
@@ -95,7 +95,7 @@ public class CambriaPublisherUtility
*/
public static List<HttpHost> createHostsList(Collection<String> hosts)
{
- final ArrayList<HttpHost> convertedHosts = new ArrayList<HttpHost> ();
+ final ArrayList<HttpHost> convertedHosts = new ArrayList<>();
for ( String host : hosts )
{
if ( host.length () == 0 ) continue;
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java
index d02438f..9158c96 100644
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java
@@ -386,12 +386,7 @@ public class DMaaPCambriaClientFactory {
* Your API secret
* @return an identity manager
*/
- /*
- * public static CambriaIdentityManager createIdentityManager (
- * Collection<String> hostSet, String apiKey, String apiSecret ) { final
- * CambriaIdentityManager cim = new CambriaMetaClient ( hostSet );
- * cim.setApiCredentials ( apiKey, apiSecret ); return cim; }
- */
+
/**
* Create a topic manager for working with topics.
@@ -405,12 +400,7 @@ public class DMaaPCambriaClientFactory {
* Your API secret
* @return a topic manager
*/
- /*
- * public static CambriaTopicManager createTopicManager ( Collection<String>
- * hostSet, String apiKey, String apiSecret ) { final CambriaMetaClient tmi
- * = new CambriaMetaClient ( hostSet ); tmi.setApiCredentials ( apiKey,
- * apiSecret ); return tmi; }
- */
+
/**
* Inject a consumer. Used to support unit tests.
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java
index 08b2fd1..ebdf3ed 100644
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java
@@ -31,7 +31,7 @@ import org.json.JSONArray;
import org.json.JSONException;
import com.att.dmf.mr.constants.CambriaConstants;
-//import org.slf4j.Logger;
+
//import org.slf4j.LoggerFactory;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
@@ -52,12 +52,12 @@ public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metr
public CambriaBaseClient ( Collection<String> hosts, String clientSignature ) throws MalformedURLException
{
- /*super ( hosts, CambriaConstants.kStdCambriaServicePort, clientSignature,
- CacheUse.NONE, 1, 1, TimeUnit.MILLISECONDS );*/
+
+
super(ConnectionType.HTTP, hosts, CambriaConstants.kStdCambriaServicePort, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000);
- //fLog = LoggerFactory.getLogger ( this.getClass().getName () );
+
fLog = EELFManager.getInstance().getLogger(this.getClass().getName());
//( this.getClass().getName () );
}
@@ -85,7 +85,7 @@ public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metr
{
fLog = log;
- //replaceLogger ( log );
+
}
public EELFLogger getLog ()
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
index d8d8799..dee9e57 100644
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
@@ -186,7 +186,7 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
public void close() {
try {
final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- if (remains.size() > 0) {
+ if (remains.isEmpty()) {
getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
+ "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
}
@@ -251,7 +251,7 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
*/
private synchronized boolean shouldSendNow() {
boolean shouldSend = false;
- if (fPending.size() > 0) {
+ if (fPending.isEmpty()) {
final long nowMs = Clock.now();
shouldSend = (fPending.size() >= fMaxBatchSize);
@@ -273,7 +273,7 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
private synchronized boolean sendBatch() {
// it's possible for this call to be made with an empty list. in this
// case, just return.
- if (fPending.size() < 1) {
+ if (fPending.isEmpty()) {
return true;
}
@@ -305,8 +305,8 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
// code from REST Client Starts
- // final String serverCalculatedSignature = sha1HmacSigner.sign
- // ("2015-09-21T11:38:19-0700", "iHAxArrj6Ve9JgmHvR077QiV");
+
+
Client client = ClientBuilder.newClient();
String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
@@ -323,32 +323,19 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria");
Response response = target.request().post(data);
- // header("X-CambriaAuth",
- // "2OH46YIWa329QpEF:"+serverCalculatedSignature).
- // header("X-CambriaDate", "2015-09-21T11:38:19-0700").
- // post(Entity.json(baseStream.toByteArray()));
-
+
getLog().info("Response received :: " + response.getStatus());
getLog().info("Response received :: " + response.toString());
// code from REST Client Ends
- /*
- * final JSONObject result = post ( url, contentType,
- * baseStream.toByteArray(), true ); final String logLine =
- * "cambria reply ok (" + (Clock.now()-startMs) + " ms):" +
- * result.toString (); getLog().info ( logLine );
- */
+
fPending.clear();
return true;
} catch (IllegalArgumentException x) {
getLog().warn(x.getMessage(), x);
}
- /*
- * catch ( HttpObjectNotFoundException x ) { getLog().warn (
- * x.getMessage(), x ); } catch ( HttpException x ) { getLog().warn (
- * x.getMessage(), x ); }
- */
+
catch (IOException x) {
getLog().warn(x.getMessage(), x);
}
diff --git a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java
index 98ddb50..7a67c92 100644
--- a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java
+++ b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java
@@ -81,10 +81,7 @@ public class CambriaJsonStreamReader implements reader {
final int c = fTokens.next();
- /*if (c ==','){
- fCloseCount++;
- System.out.println("fCloseCount=" + fCloseCount +" fCount "+fCount);
- }*/
+
if (fIsList) {
if (c == ']' || (fCount > 0 && c == 10))
return null;
@@ -125,7 +122,7 @@ public class CambriaJsonStreamReader implements reader {
*
* @param o
*/
- //public msg(JSONObject o){}
+
public msg(JSONObject o) {
diff --git a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java
index 376d140..f64c0de 100644
--- a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java
+++ b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java
@@ -137,5 +137,5 @@ public class CambriaRawStreamReader implements reader
private final InputStream fStream;
private final String fDefPart;
private boolean fClosed;
- //private String transactionId;
+
}
diff --git a/src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java b/src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java
index b550373..ed0893d 100644
--- a/src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java
+++ b/src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java
@@ -46,7 +46,7 @@ public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator {
auth = true;
}
- //System.out.println("role " +role +" user: "+ req.getRemoteUser() +" : auth="+auth);
+
return auth;
}
@@ -57,7 +57,7 @@ public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator {
String permission = "";
String nameSpace ="";
if(topicName.contains(".") && topicName.contains("com.att")) {
- //String topic = topicName.substring(topicName.lastIndexOf(".")+1);
+
nameSpace = topicName.substring(0,topicName.lastIndexOf("."));
}
else {
@@ -67,12 +67,7 @@ public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator {
if(null==nameSpace)nameSpace="com.att.dmaap.mr.ueb";
- /*ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.TOPIC_NOT_IN_AAF.getResponseCode(), "Topic does not exist in AAF"
- , null, Utils.getFormattedDate(new Date()), topicName,
- null, null, null, null);
-
- throw new CambriaApiException(errRes);*/
+
}
permission = nameSpace+".mr.topic|:topic."+topicName+"|"+action;
diff --git a/src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java b/src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java
index e9f28ae..64dbc14 100644
--- a/src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java
+++ b/src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java
@@ -25,7 +25,7 @@ import javax.servlet.http.HttpServletRequest;
import com.att.dmf.mr.beans.DMaaPContext;
import com.att.dmf.mr.security.DMaaPAuthenticator;
-//import com.att.nsa.security.db.NsaApiDb;
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.security.NsaApiKey;
@@ -65,7 +65,7 @@ public class DMaaPMechIdAuthenticator <K extends NsaApiKey> implements DMaaPAuth
log.info ( "AUTH-LOG(" + remoteAddr + "): " + msg );
}
-// private final NsaApiDb<K> fDb;
+
//private static final Logger log = Logger.getLogger( MechIdAuthenticator.class.toString());
private static final EELFLogger log = EELFManager.getInstance().getLogger(MechIdAuthenticator.class);
/**
diff --git a/src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java b/src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java
index a26c9e7..b1e28e7 100644
--- a/src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java
+++ b/src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java
@@ -54,9 +54,9 @@ public class DMaaPOriginalUebAuthenticator<K extends NsaApiKey> implements DMaaP
public DMaaPOriginalUebAuthenticator(NsaApiDb<K> db, long requestTimeWindowMs) {
fDb = db;
fRequestTimeWindowMs = requestTimeWindowMs;
- //fAuthenticators = new LinkedList<DMaaPAuthenticator<K>>();
+
- //fAuthenticators.add(new DMaaPOriginalUebAuthenticator<K>(db, requestTimeWindowMs));
+
}
@@ -243,51 +243,51 @@ public class DMaaPOriginalUebAuthenticator<K extends NsaApiKey> implements DMaaP
"EEEE, dd-MMM-yy HH:mm:ss zzz",
};
- /*private static final String kDateFormats[] = {
- // W3C date format (RFC 3339).
- "yyyy-MM-dd'T'HH:mm:ssz",
+
+
+
- // Preferred HTTP date format (RFC 1123).
- "EEE, dd MMM yyyy HH:mm:ss zzz",
+
+
- // simple unix command line 'date' format
- "EEE MMM dd HH:mm:ss z yyyy",
+
+
- // Common date format (RFC 822).
- "EEE, dd MMM yy HH:mm:ss z", "EEE, dd MMM yy HH:mm z", "dd MMM yy HH:mm:ss z", "dd MMM yy HH:mm z",
+
+
- // Obsoleted HTTP date format (ANSI C asctime() format).
- "EEE MMM dd HH:mm:ss yyyy",
+
+
- // Obsoleted HTTP date format (RFC 1036).
- "EEEE, dd-MMM-yy HH:mm:ss zzz", }; */
+
+
// logger declaration
- //private static final Logger log = Logger.getLogger(DMaaPOriginalUebAuthenticator.class.toString());
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPOriginalUebAuthenticator.class);
@Override
-// public K authenticate(DMaaPContext ctx) {
+
// TODO Auto-generated method stub
- //return null;
+
//}
public K authenticate(DMaaPContext ctx) {
- /*final HttpServletRequest req = ctx.getRequest();
- for (DMaaPAuthenticator<K> a : fAuthenticators) {
- if (a.qualify(req)) {
- final K k = a.isAuthentic(req);
- if (k != null)
- return k;
- }
- // else: this request doesn't look right to the authenticator
- }*/
+
+
+
+
+
+
+
+
+
return null;
}
public void addAuthenticator ( DMaaPAuthenticator<K> a )
{
- //this.fAuthenticators.add(a);
+
}
- //private final LinkedList<DMaaPAuthenticator<K>> fAuthenticators;
+
} \ No newline at end of file
diff --git a/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java
index 110970f..f7c48de 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java
@@ -42,7 +42,7 @@ import com.att.nsa.configs.ConfigDbException;
import com.att.nsa.limits.Blacklist;
import com.att.nsa.security.NsaApiKey;
import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
-//import com.att.sa.highlandPark.util.HpJsonUtil;
+
/**
* @author muzainulhaque.qazi
diff --git a/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java
index c818f88..b0e8a86 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java
@@ -54,7 +54,7 @@ import com.att.nsa.security.db.simple.NsaSimpleApiKey;
@Service
public class ApiKeysServiceImpl implements ApiKeysService {
- //private Logger log = Logger.getLogger(ApiKeysServiceImpl.class.toString());
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(ApiKeysServiceImpl.class.toString());
/**
* This method will provide all the ApiKeys present in kafka server.
@@ -139,7 +139,7 @@ public class ApiKeysServiceImpl implements ApiKeysService {
String kSetting_AllowAnonymousKeys= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"apiKeys.allowAnonymous");
if(null==kSetting_AllowAnonymousKeys) kSetting_AllowAnonymousKeys ="false";
- // if ((contactEmail == null) || (contactEmail.length() == 0))
+
if ( kSetting_AllowAnonymousKeys.equalsIgnoreCase("true") && !emailProvided )
{
DMaaPResponseBuilder.respondWithErrorInJson(dmaapContext, 400, "You must provide an email address.");
@@ -165,7 +165,7 @@ public class ApiKeysServiceImpl implements ApiKeysService {
log.debug("=======ApiKeysServiceImpl: createApiKey : saving api key : "
+ key.toString() + "=====");
apiKeyDb.saveApiKey(key);
- // System.out.println("here4");
+
// email out the secret to validate the email address
if ( emailProvided )
{
@@ -196,9 +196,7 @@ public class ApiKeysServiceImpl implements ApiKeysService {
);
DMaaPResponseBuilder.respondOk(dmaapContext,
o);
- /*o.put("secret", "Emailed to " + contactEmail + ".");
- DMaaPResponseBuilder.respondOk(dmaapContext,
- o); */
+
return;
} else {
log.debug("=======ApiKeysServiceImpl: createApiKey : Error in creating API Key.=====");
diff --git a/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java
index 4ca6446..22b60fe 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java
@@ -94,7 +94,7 @@ import com.att.nsa.util.rrConvertor;
@Service
public class EventsServiceImpl implements EventsService {
// private static final Logger LOG =
- // Logger.getLogger(EventsServiceImpl.class);
+
private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
private static final String BATCH_LENGTH = "event.batch.length";
@@ -103,10 +103,10 @@ public class EventsServiceImpl implements EventsService {
private DMaaPErrorMessages errorMessages;
//@Autowired
- //KafkaLiveLockAvoider2 kafkaLiveLockAvoider;
+
// @Value("${metrics.send.cambria.topic}")
- // private String metricsTopic;
+
public DMaaPErrorMessages getErrorMessages() {
return errorMessages;
@@ -133,7 +133,7 @@ public class EventsServiceImpl implements EventsService {
CambriaApiException, IOException, DMaaPAccessDeniedException {
final long startTime = System.currentTimeMillis();
final HttpServletRequest req = ctx.getRequest();
- //System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"+kafkaLiveLockAvoider);
+
boolean isAAFTopic = false;
// was this host blacklisted?
final String remoteAddr = Utils.getRemoteAddress(ctx);
@@ -158,7 +158,7 @@ public class EventsServiceImpl implements EventsService {
if (strtimeoutMS != null)
timeoutMs = Integer.parseInt(strtimeoutMS);
// int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
- // CambriaConstants.kNoTimeout);
+
if (req.getParameter("timeout") != null) {
timeoutMs = Integer.parseInt(req.getParameter("timeout"));
}
@@ -214,7 +214,7 @@ public class EventsServiceImpl implements EventsService {
// if headers are not provided then user will be null
if (user == null && null != ctx.getRequest().getHeader("Authorization")) {
// the topic name will be sent by the client
- // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"sub";
+
DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
String permission = aaf.aafPermissionString(topic, "sub");
if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
@@ -234,7 +234,7 @@ public class EventsServiceImpl implements EventsService {
logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
+ " " + clientId);
Consumer c = null;
- // String localclientId = clientId;
+
String lhostId = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
"clusterhostid");
if (null == lhostId) {
@@ -481,16 +481,16 @@ public class EventsServiceImpl implements EventsService {
// start processing, building a batch to push to the backend
final long startMs = System.currentTimeMillis();
long count = 0;
- long maxEventBatch = 1024 * 16;
+ long maxEventBatch = 1024L* 16;
String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
if (null != batchlen)
maxEventBatch = Long.parseLong(batchlen);
// long maxEventBatch =
// ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
- final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
+ final LinkedList<Publisher.message> batch = new LinkedList<>();
// final ArrayList<KeyedMessage<String, String>> kms = new
// ArrayList<KeyedMessage<String, String>>();
- final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
+ final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
try {
// for each message...
Publisher.message m = null;
@@ -592,7 +592,7 @@ public class EventsServiceImpl implements EventsService {
// start processing, building a batch to push to the backend
final long startMs = System.currentTimeMillis();
long count = 0;
- long maxEventBatch = 1024 * 16;
+ long maxEventBatch = 1024L * 16;
String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
if (null != evenlen)
maxEventBatch = Long.parseLong(evenlen);
diff --git a/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java
index 83b3770..d867ea8 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java
@@ -49,7 +49,7 @@ import com.att.nsa.metrics.CdmMeasuredItem;
@Component
public class MetricsServiceImpl implements MetricsService {
- //private static final Logger LOG = Logger.getLogger(MetricsService.class.toString());
+
private static final EELFLogger LOG = EELFManager.getInstance().getLogger(MetricsService.class);
/**
*
diff --git a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java
index 01ed1cc..7e9d783 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java
@@ -45,7 +45,7 @@ import com.att.dmf.mr.exception.DMaaPResponseCode;
import com.att.dmf.mr.exception.ErrorResponse;
import com.att.dmf.mr.metabroker.Broker.TopicExistsException;
import com.att.dmf.mr.metabroker.Broker1;
-//import com.att.dmf.mr.metabroker.Broker1;
+
import com.att.dmf.mr.metabroker.Topic;
import com.att.dmf.mr.security.DMaaPAAFAuthenticator;
import com.att.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
@@ -67,13 +67,13 @@ import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
public class TopicServiceImpl implements TopicService {
// private static final Logger LOGGER =
- // Logger.getLogger(TopicServiceImpl.class);
+
private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
@Autowired
private DMaaPErrorMessages errorMessages;
// @Value("${msgRtr.topicfactory.aaf}")
- // private String mrFactory;
+
public DMaaPErrorMessages getErrorMessages() {
return errorMessages;
@@ -125,7 +125,7 @@ public class TopicServiceImpl implements TopicService {
for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
JSONObject obj = new JSONObject();
obj.put("topicName", topic.getName());
- // obj.put("description", topic.getDescription());
+
obj.put("owner", topic.getOwner());
obj.put("txenabled", topic.isTransactionEnabled());
topicsList.put(obj);
@@ -193,7 +193,7 @@ public class TopicServiceImpl implements TopicService {
final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
String key = null;
- //String appName = dmaapContext.getRequest().getHeader("AppName");
+
String enfTopicName = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
"enforced.topic.name.AAF");
@@ -209,55 +209,55 @@ public class TopicServiceImpl implements TopicService {
"Failed to create topic: Access Denied.User does not have permission to perform create topic");
LOGGER.info(errRes.toString());
- // throw new DMaaPAccessDeniedException(errRes);
+
}
}
- // else if (user==null &&
+
// (null==dmaapContext.getRequest().getHeader("Authorization") && null
- // == dmaapContext.getRequest().getHeader("cookie")) ) {
- /*else if (user == null && null == dmaapContext.getRequest().getHeader("Authorization")
- ) {
- LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
+
+
+
+
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- "Failed to create topic: Access Denied.User does not have permission to perform create topic");
+
+
+
- LOGGER.info(errRes.toString());
- // throw new DMaaPAccessDeniedException(errRes);
- }*/
+
+
+
if (user == null /*&& (null != dmaapContext.getRequest().getHeader("Authorization")
)*/) {
- // if (user == null &&
+
// (null!=dmaapContext.getRequest().getHeader("Authorization") ||
- // null != dmaapContext.getRequest().getHeader("cookie"))) {
+
// ACL authentication is not provided so we will use the aaf
// authentication
- /*LOGGER.info("Authorization the topic");
+
- String permission = "";
- String nameSpace = "";
- if (topicBean.getTopicName().indexOf(".") > 1)
- nameSpace = topicBean.getTopicName().substring(0, topicBean.getTopicName().lastIndexOf("."));
+
+
+
+
- String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "msgRtr.topicfactory.aaf");
+
+
- // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
+
- permission = mrFactoryVal + nameSpace + "|create";
- DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();*/
+
+
- //if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
+
if (false) {
LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
"Failed to create topic: Access Denied.User does not have permission to create topic with perm "
- //+ permission);
+
+ "permission");
@@ -267,14 +267,14 @@ public class TopicServiceImpl implements TopicService {
} else {
// if user is null and aaf authentication is ok then key should
// be ""
- // key = "";
+
/**
* Added as part of AAF user it should return username
*/
- //key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
- key="admin";
- LOGGER.info("key ==================== " + key);
+
+
+ //LOGGER.info("key ==================== " + key);
}
}
@@ -283,7 +283,7 @@ public class TopicServiceImpl implements TopicService {
final String topicName = topicBean.getTopicName();
final String desc = topicBean.getTopicDescription();
int partition = topicBean.getPartitionCount();
- // int replica = topicBean.getReplicationCount();
+
if (partition == 0) {
partition = 8;
}
@@ -291,7 +291,7 @@ public class TopicServiceImpl implements TopicService {
int replica = topicBean.getReplicationCount();
if (replica == 0) {
- //replica = 3;
+
replica = 1;
}
final int replicas = replica;
@@ -319,7 +319,7 @@ public class TopicServiceImpl implements TopicService {
throw new CambriaApiException(errRes);
} catch (com.att.dmf.mr.metabroker.Broker1.TopicExistsException e) {
// TODO Auto-generated catch block
- e.printStackTrace();
+ LOGGER.error("Exception is at createTopic( ) ", e);
}
}
@@ -503,25 +503,25 @@ public class TopicServiceImpl implements TopicService {
LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
- // if (user == null) {
+
//
// LOGGER.info("Authenticating the user, as ACL authentication is not
- // provided");
+
//// String permission =
- // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
+
//
- // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
- // String permission = aaf.aafPermissionString(topicName, "manage");
+
+
// if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
// {
// LOGGER.error("Failed to permit write access to producer [" +
// producerId + "] for topic " + topicName
- // + ". Authentication failed.");
+
// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
// errorMessages.getNotPermitted1()+" <Grant publish permissions>
- // "+errorMessages.getNotPermitted2()+ topicName);
- // LOGGER.info(errRes);
+
+
// throw new DMaaPAccessDeniedException(errRes);
// }
// }
@@ -561,25 +561,25 @@ public class TopicServiceImpl implements TopicService {
LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
- // if (user == null) {
+
//
//// String permission =
- // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
+
// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
// String permission = aaf.aafPermissionString(topicName, "manage");
// if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
// {
// LOGGER.error("Failed to revoke write access to producer [" +
// producerId + "] for topic " + topicName
- // + ". Authentication failed.");
+
// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
// errorMessages.getNotPermitted1()+" <Revoke publish permissions>
- // "+errorMessages.getNotPermitted2()+ topicName);
- // LOGGER.info(errRes);
+
+
// throw new DMaaPAccessDeniedException(errRes);
//
- // }
+
// }
Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
@@ -612,22 +612,22 @@ public class TopicServiceImpl implements TopicService {
LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
- // if (user == null) {
+
//
//// String permission =
- // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
- // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+
+
// String permission = aaf.aafPermissionString(topicName, "manage");
// if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
// {
// LOGGER.error("Failed to permit read access to consumer [" +
// consumerId + "] for topic " + topicName
- // + ". Authentication failed.");
+
// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
// errorMessages.getNotPermitted1()+" <Grant consume permissions>
- // "+errorMessages.getNotPermitted2()+ topicName);
- // LOGGER.info(errRes);
+
+
// throw new DMaaPAccessDeniedException(errRes);
// }
// }
@@ -662,27 +662,26 @@ public class TopicServiceImpl implements TopicService {
LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
- // if (user == null) {
+
//// String permission =
- // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
+
// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
// String permission = aaf.aafPermissionString(topicName, "manage");
// if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
// {
// LOGGER.error("Failed to revoke read access to consumer [" +
// consumerId + "] for topic " + topicName
- // + ". Authentication failed.");
+
// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
// errorMessages.getNotPermitted1()+" <Grant consume permissions>
- // "+errorMessages.getNotPermitted2()+ topicName);
+
// LOGGER.info(errRes);
// throw new DMaaPAccessDeniedException(errRes);
// }
//
//
- // }
-
+
Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
if (null == topic) {
diff --git a/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java
index ae2d863..3065928 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java
@@ -52,7 +52,7 @@ public class TransactionServiceImpl implements TransactionService {
throws ConfigDbException, IOException {
/*
- * ConfigurationReader configReader = dmaapContext.getConfigReader();
+
*
* LOG.info("configReader : "+configReader.toString());
*
@@ -77,7 +77,7 @@ public class TransactionServiceImpl implements TransactionService {
IOException {
/*
- * if (null != transactionId) {
+
*
* ConfigurationReader configReader = dmaapContext.getConfigReader();
*
diff --git a/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java
index 33bc2f4..c8bb073 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java
@@ -27,7 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-//import kafka.common.TopicExistsException;
+
import org.apache.kafka.common.errors.TopicExistsException;
import org.json.JSONArray;
import org.json.JSONObject;
@@ -50,7 +50,7 @@ import com.att.nsa.security.db.simple.NsaSimpleApiKey;
@Service
public class UIServiceImpl implements UIService {
- //private static final Logger LOGGER = Logger.getLogger(UIServiceImpl.class);
+
private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(UIServiceImpl.class);
/**
* Returning template of hello page
diff --git a/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java b/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java
index dd1e4eb..fdf2d28 100644
--- a/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java
+++ b/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java
@@ -55,7 +55,7 @@ import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
import com.att.nsa.limits.Blacklist;
import com.att.nsa.security.NsaAuthenticatorService;
-//import com.att.nsa.security.authenticators.OriginalUebAuthenticator;
+
import com.att.nsa.security.db.BaseNsaApiDbImpl;
import com.att.nsa.security.db.NsaApiDb;
import com.att.nsa.security.db.NsaApiDb.KeyExistsException;
@@ -70,7 +70,7 @@ import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
@Component
public class ConfigurationReader {
-// private rrNvReadable settings;
+
private Broker1 fMetaBroker;
private ConsumerFactory fConsumerFactory;
private Publisher fPublisher;
@@ -78,7 +78,7 @@ public class ConfigurationReader {
@Autowired
private DMaaPCambriaLimiter fRateLimiter;
private NsaApiDb<NsaSimpleApiKey> fApiKeyDb;
- /* private DMaaPTransactionObjDB<DMaaPTransactionObj> fTranDb; */
+
private DMaaPAuthenticator<NsaSimpleApiKey> fSecurityManager;
private NsaAuthenticatorService<NsaSimpleApiKey> nsaSecurityManager;
private static CuratorFramework curator;
@@ -90,7 +90,7 @@ public class ConfigurationReader {
private Emailer fEmailer;
private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
- //private static final Logger log = Logger.getLogger(ConfigurationReader.class.toString());
+
/**
* constructor to initialize all the values
@@ -129,7 +129,7 @@ public class ConfigurationReader {
@Qualifier("dMaaPAuthenticatorImpl") DMaaPAuthenticator<NsaSimpleApiKey> fSecurityManager
)
throws missingReqdSetting, invalidSettingValue, ServletException, KafkaConsumerCacheException, ConfigDbException {
- //this.settings = settings;
+
this.fMetrics = fMetrics;
this.zk = zk;
this.fConfigDb = fConfigDb;
@@ -137,18 +137,18 @@ public class ConfigurationReader {
ConfigurationReader.curator = curator;
this.fConsumerFactory = fConsumerFactory;
this.fMetaBroker = fMetaBroker;
- //System.out.println("SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSs " + fMetaBroker);
+
this.q = q;
this.mmb = mmb;
this.fApiKeyDb = fApiKeyDb;
- /* this.fTranDb = fTranDb; */
+
this.fSecurityManager = fSecurityManager;
long allowedtimeSkewMs=600000L;
String strallowedTimeSkewM= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"authentication.allowedTimeSkewMs");
if(null!=strallowedTimeSkewM)allowedtimeSkewMs= Long.parseLong(strallowedTimeSkewM);
- // boolean requireSecureChannel = true;
+
//String strrequireSecureChannel= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"aauthentication.requireSecureChannel");
//if(strrequireSecureChannel!=null)requireSecureChannel=Boolean.parseBoolean(strrequireSecureChannel);
//this.nsaSecurityManager = new NsaAuthenticatorService<NsaSimpleApiKey>(this.fApiKeyDb, settings.getLong("authentication.allowedTimeSkewMs", 600000L), settings.getBoolean("authentication.requireSecureChannel", true));
diff --git a/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java b/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java
index 4c38d57..214aac8 100644
--- a/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java
+++ b/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java
@@ -130,10 +130,10 @@ public class DMaaPResponseBuilder {
*/
public static void respondOkWithStream(DMaaPContext ctx, String mediaType, StreamWriter writer) throws IOException {
ctx.getResponse().setStatus(200);
- OutputStream os = getStreamForBinaryResponse(ctx, mediaType);
- writer.write(os);
- os.close();
-
+ try(OutputStream os = getStreamForBinaryResponse(ctx, mediaType)) {
+ writer.write(os);
+ }
+
}
@@ -218,7 +218,7 @@ public class DMaaPResponseBuilder {
/**
* interface used to define write method for outputStream
*/
- public static abstract interface StreamWriter {
+ public abstract static interface StreamWriter {
/**
* abstract method used to write the response
*
@@ -252,27 +252,20 @@ public class DMaaPResponseBuilder {
boolean fResponseEntityAllowed = (!(ctx.getRequest().getMethod().equalsIgnoreCase("HEAD")));
-
- OutputStream os = null;
- try{
+
if (fResponseEntityAllowed) {
- os = ctx.getResponse().getOutputStream();
- return os;
+ try(OutputStream os = ctx.getResponse().getOutputStream()){
+ return os;
+ }catch (Exception e){
+ log.error("Exception in getStreamForBinaryResponse",e);
+ throw new IOException();
+ }
} else {
- os = new NullStream();
- return os;
- }
- }catch (Exception e){
- throw new IOException();
-
- }
- finally{
- if(null != os){
- try{
- os.close();
- }catch(Exception e) {
-
- }
+ try(OutputStream os = new NullStream()){
+ return os;
+ }catch (Exception e){
+ log.error("Exception in getStreamForBinaryResponse",e);
+ throw new IOException();
}
}
}
diff --git a/src/main/java/com/att/dmf/mr/utils/PropertyReader.java b/src/main/java/com/att/dmf/mr/utils/PropertyReader.java
index 58c9fc9..000869e 100644
--- a/src/main/java/com/att/dmf/mr/utils/PropertyReader.java
+++ b/src/main/java/com/att/dmf/mr/utils/PropertyReader.java
@@ -39,9 +39,9 @@ public class PropertyReader extends nvReadableStack {
* initializing logger
*
*/
- //private static final Logger LOGGER = Logger.getLogger(PropertyReader.class);
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(PropertyReader.class);
-// private static final String MSGRTR_PROPERTIES_FILE = "msgRtrApi.properties";
+
/**
* constructor initialization
@@ -50,11 +50,11 @@ public class PropertyReader extends nvReadableStack {
*
*/
public PropertyReader() throws loadException {
- /* Map<String, String> argMap = new HashMap<String, String>();
- final String config = getSetting(argMap, CambriaConstants.kConfig, MSGRTR_PROPERTIES_FILE);
- final URL settingStream = findStream(config, ConfigurationReader.class);
- push(new nvPropertiesFile(settingStream));
- push(new nvReadableTable(argMap));*/
+
+
+
+
+
}
/**
@@ -83,43 +83,43 @@ public class PropertyReader extends nvReadableStack {
* @exception MalformedURLException
*
*/
- /*public static URL findStream(final String resourceName, Class<?> clazz) {
- try {
- File file = new File(resourceName);
+
+
+
- if (file.isAbsolute()) {
- return file.toURI().toURL();
- }
+
+
+
- String filesRoot = System.getProperty("RRWT_FILES", null);
+
- if (null != filesRoot) {
+
- String fullPath = filesRoot + "/" + resourceName;
+
- LOGGER.debug("Looking for [" + fullPath + "].");
+
- file = new File(fullPath);
- if (file.exists()) {
- return file.toURI().toURL();
- }
- }
+
+
+
+
+
- URL res = clazz.getClassLoader().getResource(resourceName);
+
- if (null != res) {
- return res;
- }
+
+
+
- res = ClassLoader.getSystemResource(resourceName);
+
+
+
+
+
+
+
+
+
+
- if (null != res) {
- return res;
- }
- } catch (MalformedURLException e) {
- LOGGER.error("Unexpected failure to convert a local filename into a URL: " + e.getMessage(), e);
- }
- return null;
- }
-*/
}
diff --git a/src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java b/src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java
index 08380fb..0e2804e 100644
--- a/src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java
+++ b/src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java
@@ -86,7 +86,7 @@ public class DMaaPMetricsSender implements Runnable {
String Setting_CambriaTopic=com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_CambriaTopic);
if(Setting_CambriaTopic==null) Setting_CambriaTopic = "msgrtr.apinode.metrics.dmaap";
- // Setting_CambriaBaseUrl=Setting_CambriaBaseUrl==null?defaultTopic:Setting_CambriaBaseUrl;
+
String Setting_CambriaSendFreqSecs=com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_CambriaSendFreqSecs);
@@ -179,7 +179,7 @@ public class DMaaPMetricsSender implements Runnable {
private final CambriaPublisher fCambria;
private final String fHostname;
- //private static final Logger log = LoggerFactory.getLogger(MetricsSender.class);
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(MetricsSender.class);
/**
diff --git a/src/main/java/com/att/mr/filter/ContentLengthFilter.java b/src/main/java/com/att/mr/filter/ContentLengthFilter.java
index b99f9e6..26f58e0 100644
--- a/src/main/java/com/att/mr/filter/ContentLengthFilter.java
+++ b/src/main/java/com/att/mr/filter/ContentLengthFilter.java
@@ -52,7 +52,7 @@ public class ContentLengthFilter implements Filter {
private FilterConfig filterConfig = null;
DMaaPErrorMessages errorMessages = null;
- //private Logger log = Logger.getLogger(ContentLengthFilter.class.toString());
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(ContentLengthFilter.class);
/**
* Default constructor.
@@ -110,7 +110,7 @@ public class ContentLengthFilter implements Filter {
DMaaPResponseCode.MSG_SIZE_EXCEEDS_MSG_LIMIT.getResponseCode(), errorMessages.getMsgSizeExceeds()
+ jsonObj.toString());
log.info(errRes.toString());
- // throw new CambriaApiException(errRes);
+
}
}
diff --git a/src/test/java/com/att/nsa/cambria/backends/kafka/CuratorFrameworkImpl.java b/src/test/java/com/att/nsa/cambria/backends/kafka/CuratorFrameworkImpl.java
index 64e128c..a12e96c 100644
--- a/src/test/java/com/att/nsa/cambria/backends/kafka/CuratorFrameworkImpl.java
+++ b/src/test/java/com/att/nsa/cambria/backends/kafka/CuratorFrameworkImpl.java
@@ -24,23 +24,32 @@ import java.util.concurrent.TimeUnit;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.CreateBuilder;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.DeleteBuilder;
import org.apache.curator.framework.api.ExistsBuilder;
import org.apache.curator.framework.api.GetACLBuilder;
import org.apache.curator.framework.api.GetChildrenBuilder;
+import org.apache.curator.framework.api.GetConfigBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
+import org.apache.curator.framework.api.ReconfigBuilder;
+import org.apache.curator.framework.api.RemoveWatchesBuilder;
import org.apache.curator.framework.api.SetACLBuilder;
import org.apache.curator.framework.api.SetDataBuilder;
import org.apache.curator.framework.api.SyncBuilder;
import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
+import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.schema.SchemaSet;
+import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
public class CuratorFrameworkImpl implements CuratorFramework {
@@ -200,4 +209,70 @@ public class CuratorFrameworkImpl implements CuratorFramework {
return null;
}
+ @Override
+ public ReconfigBuilder reconfig() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public GetConfigBuilder getConfig() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public CuratorMultiTransaction transaction() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public TransactionOp transactionOp() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void createContainers(String path) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public RemoveWatchesBuilder watches() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ConnectionStateErrorPolicy getConnectionStateErrorPolicy() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public QuorumVerifier getCurrentConfig() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public SchemaSet getSchemaSet() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean isZk34CompatibilityMode() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
}