summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/att/dmf/mr/CambriaApiException.java10
-rw-r--r--src/main/java/com/att/dmf/mr/backends/Consumer.java1
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java15
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java8
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java18
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java31
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java6
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java107
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java8
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java31
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java4
-rw-r--r--src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java5
-rw-r--r--src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java2
-rw-r--r--src/main/java/com/att/dmf/mr/metabroker/Topic.java6
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java14
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java10
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java23
-rw-r--r--src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java7
-rw-r--r--src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java2
-rw-r--r--src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java11
-rw-r--r--src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java4
-rw-r--r--src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java58
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java2
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java10
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java14
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java2
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java123
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java4
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java4
-rw-r--r--src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java16
-rw-r--r--src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java41
-rw-r--r--src/main/java/com/att/dmf/mr/utils/PropertyReader.java72
-rw-r--r--src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java4
-rw-r--r--src/main/java/com/att/mr/filter/ContentLengthFilter.java4
-rw-r--r--src/test/java/com/att/nsa/cambria/backends/kafka/CuratorFrameworkImpl.java75
35 files changed, 370 insertions, 382 deletions
diff --git a/src/main/java/com/att/dmf/mr/CambriaApiException.java b/src/main/java/com/att/dmf/mr/CambriaApiException.java
index 84dd32c..cdf95ab 100644
--- a/src/main/java/com/att/dmf/mr/CambriaApiException.java
+++ b/src/main/java/com/att/dmf/mr/CambriaApiException.java
@@ -28,8 +28,12 @@ import com.att.nsa.apiServer.NsaAppException;
public class CambriaApiException extends NsaAppException
{
+ /*
+ * defined long type constant serialVersionUID
+ */
+ private static final long serialVersionUID = 1L;
- private ErrorResponse errRes;
+ private transient ErrorResponse errRes;
/**
* Implements constructor CambriaApiException
* @param jsonObject
@@ -66,10 +70,6 @@ public class CambriaApiException extends NsaAppException
this.errRes = errRes;
}
- /*
- * defined long type constant serialVersionUID
- */
- private static final long serialVersionUID = 1L;
public ErrorResponse getErrRes() {
return errRes;
}
diff --git a/src/main/java/com/att/dmf/mr/backends/Consumer.java b/src/main/java/com/att/dmf/mr/backends/Consumer.java
index 2743fc3..f4a9a80 100644
--- a/src/main/java/com/att/dmf/mr/backends/Consumer.java
+++ b/src/main/java/com/att/dmf/mr/backends/Consumer.java
@@ -21,7 +21,6 @@
*******************************************************************************/
package com.att.dmf.mr.backends;
-import java.util.ArrayList;
/**
* A consumer interface. Consumers pull the next message from a given topic.
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
index 2bf2fb2..83c08ec 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
@@ -101,7 +101,7 @@ public class KafkaConsumerCache {
// the server at least every 30 seconds, timing out after 2 minutes should
// be okay.
// FIXME: consider allowing the client to specify its expected call rate?
- private static final long kDefault_MustTouchEveryMs = 1000 * 60 * 2;
+ private static final long kDefault_MustTouchEveryMs = 1000L*60*2;
// check for expirations pretty regularly
private static final long kDefault_SweepEverySeconds = 15;
@@ -298,8 +298,8 @@ public class KafkaConsumerCache {
try {
curator.blockUntilConnected();
} catch (InterruptedException e) {
- // Ignore
- log.error("error while setting curator framework :" + e.getMessage());
+ log.error("error while setting curator framework :",e);
+ Thread.currentThread().interrupt();
}
}
@@ -511,7 +511,8 @@ public class KafkaConsumerCache {
consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
Thread.sleep(consumerHandoverWaitMs);
} catch (InterruptedException e) {
- // Ignore
+ log.error("InterruptedException in dropTimedOutConsumer",e);
+ Thread.currentThread().interrupt();
}
log.info("Dropped " + key + " consumer due to timeout");
}
@@ -638,9 +639,8 @@ public class KafkaConsumerCache {
throws KafkaConsumerCacheException {
// get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
- final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership");
- try {
+ try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) {
final String consumerPath = fBaseZkPath + "/" + consumerKey;
log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
final CuratorFramework curator = ConfigurationReader.getCurator();
@@ -667,7 +667,8 @@ public class KafkaConsumerCache {
consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
Thread.sleep(consumerHandoverWaitMs);
} catch (InterruptedException e) {
- // Ignore
+ log.error("InterruptedException in signalOwnership",e);
+ Thread.currentThread().interrupt();
}
}
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java
index 805701a..f521b41 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java
@@ -35,9 +35,7 @@ import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Component;
-import org.springframework.stereotype.Service;
//@ComponentScan(basePackages="com.att.dmf.mr.backends.kafka")
@Component
@@ -57,7 +55,7 @@ public class KafkaLiveLockAvoider2 {
@PostConstruct
public void init() {
- System.out.println("Welcome......................................................................................");
+ log.info("Welcome......................................................................................");
try {
if (curatorFramework.checkExists().forPath(locksPath) == null) {
curatorFramework.create().creatingParentsIfNeeded().forPath(locksPath);
@@ -67,7 +65,7 @@ public class KafkaLiveLockAvoider2 {
}
} catch (Exception e) {
- //e.printStackTrace();
+
log.error("Error during creation of permanent Znodes under /live-lock-avoid ",e);
}
@@ -138,7 +136,7 @@ public class KafkaLiveLockAvoider2 {
protected void assignNewProcessNode(String appId, Watcher processNodeWatcher ) {
String taskHolderZnodePath = ZNODE_ROOT+ZNODE_UNSTICK_TASKS+"/"+appId;
- //Watcher processNodeWatcher = createWatcher();
+
try {
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java
index 0c34bfd..237cac8 100644
--- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java
@@ -35,6 +35,9 @@ import com.att.dmf.mr.backends.ConsumerFactory;
*/
public class MemoryConsumerFactory implements ConsumerFactory
{
+
+ private final MemoryQueue fQueue;
+
/**
*
* Initializing constructor
@@ -59,8 +62,6 @@ public class MemoryConsumerFactory implements ConsumerFactory
return new MemoryConsumer ( topic, consumerGroupId );
}
- private final MemoryQueue fQueue;
-
/**
*
* Define nested inner class
@@ -68,6 +69,12 @@ public class MemoryConsumerFactory implements ConsumerFactory
*/
private class MemoryConsumer implements Consumer
{
+
+ private final String fTopic;
+ private final String fConsumer;
+ private final long fCreateMs;
+ private long fLastAccessMs;
+
/**
*
* Initializing MemoryConsumer constructor
@@ -93,11 +100,6 @@ public class MemoryConsumerFactory implements ConsumerFactory
return fQueue.get ( fTopic, fConsumer );
}
- private final String fTopic;
- private final String fConsumer;
- private final long fCreateMs;
- private long fLastAccessMs;
-
@Override
public boolean close() {
//Nothing to close/clean up.
@@ -168,7 +170,7 @@ public class MemoryConsumerFactory implements ConsumerFactory
*/
public Collection<? extends Consumer> getConsumers ()
{
- return new ArrayList<MemoryConsumer> ();
+ return new ArrayList<> ();
}
@Override
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java
index 8c841d4..e0c80bd 100644
--- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java
@@ -39,6 +39,10 @@ import com.att.nsa.security.NsaApiKey;
*
*/
public class MemoryMetaBroker implements Broker {
+
+ private final MemoryQueue fQueue;
+ private final HashMap<String, MemTopic> fTopics;
+
/**
*
* @param mq
@@ -78,10 +82,16 @@ public class MemoryMetaBroker implements Broker {
fQueue.removeTopic(topic);
}
- private final MemoryQueue fQueue;
- private final HashMap<String, MemTopic> fTopics;
-
private static class MemTopic implements Topic {
+
+ private final String fName;
+ private final String fDesc;
+ private final String fOwner;
+ private NsaAcl fReaders;
+ private NsaAcl fWriters;
+ private boolean ftransactionEnabled;
+ private String accessDenied = "User does not own this topic ";
+
/**
* constructor initialization
*
@@ -141,7 +151,7 @@ public class MemoryMetaBroker implements Broker {
@Override
public void permitWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException {
if (!fOwner.equals(asUser.getKey())) {
- throw new AccessDeniedException("User does not own this topic " + fName);
+ throw new AccessDeniedException(accessDenied + fName);
}
if (fWriters == null) {
fWriters = new NsaAcl();
@@ -152,7 +162,7 @@ public class MemoryMetaBroker implements Broker {
@Override
public void denyWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException {
if (!fOwner.equals(asUser.getKey())) {
- throw new AccessDeniedException("User does not own this topic " + fName);
+ throw new AccessDeniedException(accessDenied + fName);
}
fWriters.remove(publisherId);
}
@@ -160,7 +170,7 @@ public class MemoryMetaBroker implements Broker {
@Override
public void permitReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException {
if (!fOwner.equals(asUser.getKey())) {
- throw new AccessDeniedException("User does not own this topic " + fName);
+ throw new AccessDeniedException(accessDenied + fName);
}
if (fReaders == null) {
fReaders = new NsaAcl();
@@ -171,18 +181,11 @@ public class MemoryMetaBroker implements Broker {
@Override
public void denyReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException {
if (!fOwner.equals(asUser.getKey())) {
- throw new AccessDeniedException("User does not own this topic " + fName);
+ throw new AccessDeniedException(accessDenied + fName);
}
fReaders.remove(consumerId);
}
- private final String fName;
- private final String fDesc;
- private final String fOwner;
- private NsaAcl fReaders;
- private NsaAcl fWriters;
- private boolean ftransactionEnabled;
-
@Override
public boolean isTransactionEnabled() {
return ftransactionEnabled;
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java
index 0629972..8ab4619 100644
--- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java
@@ -43,7 +43,7 @@ public class MemoryQueue {
* constructor storing hashMap objects in Queue and Offsets object
*/
public MemoryQueue() {
- fQueue = new HashMap<String, LogBuffer>();
+ fQueue = new HashMap<>();
fOffsets = new HashMap<String, HashMap<String, Integer>>();
}
@@ -102,7 +102,7 @@ public class MemoryQueue {
HashMap<String, Integer> offsetMap = fOffsets.get(consumerName);
if (offsetMap == null) {
- offsetMap = new HashMap<String, Integer>();
+ offsetMap = new HashMap<>();
fOffsets.put(consumerName, offsetMap);
}
Integer offset = offsetMap.get(topic);
@@ -169,7 +169,7 @@ public class MemoryQueue {
public LogBuffer(int maxSize) {
fBaseOffset = 0;
fMaxSize = maxSize;
- fList = new ArrayList<String>();
+ fList = new ArrayList<>();
}
/**
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
index f0bb982..8cbf64f 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
@@ -35,9 +35,6 @@ import com.att.dmf.mr.exception.DMaaPResponseCode;
import com.att.dmf.mr.exception.ErrorResponse;
import com.att.dmf.mr.utils.Utils;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
@@ -54,6 +51,15 @@ import com.att.nsa.metrics.impl.CdmRateTicker;
*/
@Component
public class DMaaPCambriaLimiter {
+ private final HashMap<String, RateInfo> fRateInfo;
+ private final HashMap<String, RateInfoCheck> fRateInfoCheck;
+ private final double fMaxEmptyPollsPerMinute;
+ private final double fMaxPollsPerMinute;
+ private final int fWindowLengthMins;
+ private final long fSleepMs;
+ private final long fSleepMs1;
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
+
/**
* constructor initializes
*
@@ -62,9 +68,8 @@ public class DMaaPCambriaLimiter {
* @throws invalidSettingValue
*/
@Autowired
- public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings)
- throws missingReqdSetting, invalidSettingValue {
- fRateInfo = new HashMap<String, RateInfo>();
+ public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) {
+ fRateInfo = new HashMap<>();
fRateInfoCheck = new HashMap<>();
fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
@@ -78,19 +83,7 @@ public class DMaaPCambriaLimiter {
5000);
}
-
- /**
- * static method provide the sleep time
- *
- * @param ratePerMinute
- * @return
- */
- public static long getSleepMsForRate(double ratePerMinute) {
- if (ratePerMinute <= 0.0)
- return 0;
- return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
- }
-
+
/**
* Construct a rate limiter.
*
@@ -111,7 +104,7 @@ public class DMaaPCambriaLimiter {
* @param windowLengthMins
*/
public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
- fRateInfo = new HashMap<String, RateInfo>();
+ fRateInfo = new HashMap<>();
fRateInfoCheck = new HashMap<>();
fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
@@ -121,6 +114,18 @@ public class DMaaPCambriaLimiter {
}
/**
+ * static method provide the sleep time
+ *
+ * @param ratePerMinute
+ * @return
+ */
+ public static long getSleepMsForRate(double ratePerMinute) {
+ if (ratePerMinute <= 0.0)
+ return 0;
+ return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
+ }
+
+ /**
* Tell the rate limiter about a call to a topic/group/id. If the rate is
* too high, this call delays its return and throws an exception.
*
@@ -151,6 +156,7 @@ public class DMaaPCambriaLimiter {
log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
}
} catch (InterruptedException e) {
+ log.error("Exception "+ e);
// ignore
}
@@ -163,37 +169,7 @@ public class DMaaPCambriaLimiter {
log.info(errRes.toString());
throw new CambriaApiException(errRes);
}
- /*if (fMaxPollsPerMinute <= 0) {
- return;
- }
- final RateInfoCheck ric = getRateInfoCheck(topic, consumerGroup, clientId);
- final double ratevalue = ric.onCall();
- if (ratevalue > fMaxPollsPerMinute) {
- try {
- log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute
- + ".");
- if (fSleepMs1 > fMaxPollsPerMinute) {
- log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
- + " ms sleep, then responding in error.");
- Thread.sleep(fSleepMs1);
- ric.reset();
- } else {
- log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
- }
- } catch (InterruptedException e) {
- // ignore
- }
-
-
- ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests,
- DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
- "This client is making too many requests "
- + ",decrease the number of requests. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
-
- log.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }*/
}
@@ -213,6 +189,8 @@ public class DMaaPCambriaLimiter {
}
private static class RateInfo {
+ private final String fLabel;
+ private final CdmRateTicker fCallRateSinceLastMsgSend;
/**
* constructor initialzes
*
@@ -244,14 +222,14 @@ public class DMaaPCambriaLimiter {
fCallRateSinceLastMsgSend.tick();
return fCallRateSinceLastMsgSend.getRate();
}
-
- private final String fLabel;
- private final CdmRateTicker fCallRateSinceLastMsgSend;
}
private static class RateInfoCheck {
+
+ private final String fLabel;
+ private final CdmRateTicker fCallRateSinceLastMsgSend;
/**
* constructor initialzes
*
@@ -283,21 +261,10 @@ public class DMaaPCambriaLimiter {
fCallRateSinceLastMsgSend.tick();
return fCallRateSinceLastMsgSend.getRate();
}
-
- private final String fLabel;
- private final CdmRateTicker fCallRateSinceLastMsgSend;
}
- private final HashMap<String, RateInfo> fRateInfo;
- private final HashMap<String, RateInfoCheck> fRateInfoCheck;
- private final double fMaxEmptyPollsPerMinute;
- private final double fMaxPollsPerMinute;
- private final int fWindowLengthMins;
- private final long fSleepMs;
- private final long fSleepMs1;
- //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class);
- private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
+
private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
final String key = makeKey(topic, consumerGroup, clientId);
@@ -310,15 +277,7 @@ public class DMaaPCambriaLimiter {
}
- private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) {
- final String key = makeKey(topic, consumerGroup, clientId);
- RateInfoCheck ri = fRateInfoCheck.get(key);
- if (ri == null) {
- ri = new RateInfoCheck(key, 1);
- fRateInfoCheck.put(key, ri);
- }
- return ri;
- }
+
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
index e4e09c8..f60fd53 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
@@ -185,9 +185,11 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
+ "], on topic [" + topic + "].");
-
- fCache.signalOwnership(topic, consumerGroupName, consumerId);
-
+
+ if (fCache != null) {
+ fCache.signalOwnership(topic, consumerGroupName, consumerId);
+ }
+
final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
long fCreateTimeMs = System.currentTimeMillis();
KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
index 643eae9..4bef985 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
@@ -50,7 +50,7 @@ import com.att.dmf.mr.utils.ConfigurationReader;
//import org.apache.log4-j.Logger;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-//import com.att.dmf.mr.backends.kafka.kafka011.SettingsUtil;
+
import com.att.nsa.configs.ConfigDb;
import com.att.nsa.configs.ConfigDbException;
import com.att.nsa.configs.ConfigPath;
@@ -85,11 +85,9 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
- /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");*/
+
fKafkaAdminClient=AdminClient.create ( props );
- // fKafkaAdminClient = null;
+
}
//private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
@@ -122,23 +120,21 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
- /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");*/
+
fKafkaAdminClient=AdminClient.create ( props );
- // fKafkaAdminClient = null;
+
}
public DMaaPKafkaMetaBroker( rrNvReadable settings,
ZkClient zk, ConfigDb configDb,AdminClient client) {
- //fSettings = settings;
+
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
fKafkaAdminClient= client;
- // fKafkaAdminClient = null;
+
}
@@ -235,13 +231,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
}
catch ( InterruptedException e )
{
- //timer.fail ( "Timeout" );
+
log.warn ( "Execution of describeTopics timed out." );
throw new ConfigDbException ( e );
}
catch ( ExecutionException e )
{
- //timer.fail ( "ExecutionError" );
+
log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
throw new ConfigDbException ( e.getCause () );
}
@@ -256,16 +252,11 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
log.info("Loading zookeeper client for topic deletion.");
// topic creation. (Otherwise, the topic is only partially created
// in ZK.)
- /*zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000,
- ZKStringSerializer$.MODULE$);
- String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
- if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers;
- ZkUtils zkutils =new ZkUtils(zkClient , new ZkConnection(strkSettings_KafkaZookeeper),false);
- */
+
fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
log.info("Zookeeper client loaded successfully. Deleting topic.");
- //AdminUtils.deleteTopic(zkutils, topic);
+
} catch (Exception e) {
log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
throw new ConfigDbException(e);
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java b/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java
index 9942837..4c9532b 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java
@@ -84,9 +84,9 @@ public class DMaaPMetricsSet extends CdmMetricsRegistryImpl implements MetricsSe
*
* @param cs
*/
- //public DMaaPMetricsSet() {
+
public DMaaPMetricsSet(rrNvReadable cs) {
- //fSettings = cs;
+
fVersion = new CdmStringConstant("Version " + CambriaApiVersionInfo.getVersion());
super.putItem("version", fVersion);
diff --git a/src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java b/src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java
index a971c3f..db691bd 100644
--- a/src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java
+++ b/src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java
@@ -35,7 +35,7 @@ import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
import org.apache.http.HttpStatus;
-//import org.apache.log-4j.Logger;
+
import org.springframework.beans.factory.annotation.Autowired;
import com.att.eelf.configuration.EELFLogger;
@@ -51,8 +51,7 @@ import com.att.eelf.configuration.EELFManager;
@Singleton
public class DMaaPWebExceptionMapper implements ExceptionMapper<WebApplicationException>{
- //private static final Logger LOGGER = Logger
- // .getLogger(DMaaPWebExceptionMapper.class);
+
private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(DMaaPWebExceptionMapper.class);
private ErrorResponse errRes;
diff --git a/src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java b/src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java
index 6022b91..64b20e8 100644
--- a/src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java
+++ b/src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java
@@ -35,7 +35,7 @@ import com.att.eelf.configuration.EELFManager;
public class CambriaServletContextListener implements ServletContextListener {
DME2EndPointLoader loader = DME2EndPointLoader.getInstance();
-// private static Logger log = Logger.getLogger(CambriaServletContextListener.class);
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaServletContextListener.class);
diff --git a/src/main/java/com/att/dmf/mr/metabroker/Topic.java b/src/main/java/com/att/dmf/mr/metabroker/Topic.java
index 422a2cc..d191070 100644
--- a/src/main/java/com/att/dmf/mr/metabroker/Topic.java
+++ b/src/main/java/com/att/dmf/mr/metabroker/Topic.java
@@ -39,16 +39,16 @@ public interface Topic extends ReadWriteSecuredResource
*
*//*
public class AccessDeniedException extends Exception
- {
+
*//**
* AccessDenied Description
*//*
- public AccessDeniedException () { super ( "Access denied." ); }
+
*//**
* AccessDenied Exception for the user while authenticating the user request
* @param user
*//*
- public AccessDeniedException ( String user ) { super ( "Access denied for " + user ); }
+
private static final long serialVersionUID = 1L;
}*/
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java
index d02438f..9158c96 100644
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java
@@ -386,12 +386,7 @@ public class DMaaPCambriaClientFactory {
* Your API secret
* @return an identity manager
*/
- /*
- * public static CambriaIdentityManager createIdentityManager (
- * Collection<String> hostSet, String apiKey, String apiSecret ) { final
- * CambriaIdentityManager cim = new CambriaMetaClient ( hostSet );
- * cim.setApiCredentials ( apiKey, apiSecret ); return cim; }
- */
+
/**
* Create a topic manager for working with topics.
@@ -405,12 +400,7 @@ public class DMaaPCambriaClientFactory {
* Your API secret
* @return a topic manager
*/
- /*
- * public static CambriaTopicManager createTopicManager ( Collection<String>
- * hostSet, String apiKey, String apiSecret ) { final CambriaMetaClient tmi
- * = new CambriaMetaClient ( hostSet ); tmi.setApiCredentials ( apiKey,
- * apiSecret ); return tmi; }
- */
+
/**
* Inject a consumer. Used to support unit tests.
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java
index 08b2fd1..ebdf3ed 100644
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java
@@ -31,7 +31,7 @@ import org.json.JSONArray;
import org.json.JSONException;
import com.att.dmf.mr.constants.CambriaConstants;
-//import org.slf4j.Logger;
+
//import org.slf4j.LoggerFactory;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
@@ -52,12 +52,12 @@ public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metr
public CambriaBaseClient ( Collection<String> hosts, String clientSignature ) throws MalformedURLException
{
- /*super ( hosts, CambriaConstants.kStdCambriaServicePort, clientSignature,
- CacheUse.NONE, 1, 1, TimeUnit.MILLISECONDS );*/
+
+
super(ConnectionType.HTTP, hosts, CambriaConstants.kStdCambriaServicePort, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000);
- //fLog = LoggerFactory.getLogger ( this.getClass().getName () );
+
fLog = EELFManager.getInstance().getLogger(this.getClass().getName());
//( this.getClass().getName () );
}
@@ -85,7 +85,7 @@ public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metr
{
fLog = log;
- //replaceLogger ( log );
+
}
public EELFLogger getLog ()
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
index 5b937c2..dee9e57 100644
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
@@ -305,8 +305,8 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
// code from REST Client Starts
- // final String serverCalculatedSignature = sha1HmacSigner.sign
- // ("2015-09-21T11:38:19-0700", "iHAxArrj6Ve9JgmHvR077QiV");
+
+
Client client = ClientBuilder.newClient();
String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
@@ -323,32 +323,19 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria");
Response response = target.request().post(data);
- // header("X-CambriaAuth",
- // "2OH46YIWa329QpEF:"+serverCalculatedSignature).
- // header("X-CambriaDate", "2015-09-21T11:38:19-0700").
- // post(Entity.json(baseStream.toByteArray()));
-
+
getLog().info("Response received :: " + response.getStatus());
getLog().info("Response received :: " + response.toString());
// code from REST Client Ends
- /*
- * final JSONObject result = post ( url, contentType,
- * baseStream.toByteArray(), true ); final String logLine =
- * "cambria reply ok (" + (Clock.now()-startMs) + " ms):" +
- * result.toString (); getLog().info ( logLine );
- */
+
fPending.clear();
return true;
} catch (IllegalArgumentException x) {
getLog().warn(x.getMessage(), x);
}
- /*
- * catch ( HttpObjectNotFoundException x ) { getLog().warn (
- * x.getMessage(), x ); } catch ( HttpException x ) { getLog().warn (
- * x.getMessage(), x ); }
- */
+
catch (IOException x) {
getLog().warn(x.getMessage(), x);
}
diff --git a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java
index 98ddb50..7a67c92 100644
--- a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java
+++ b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java
@@ -81,10 +81,7 @@ public class CambriaJsonStreamReader implements reader {
final int c = fTokens.next();
- /*if (c ==','){
- fCloseCount++;
- System.out.println("fCloseCount=" + fCloseCount +" fCount "+fCount);
- }*/
+
if (fIsList) {
if (c == ']' || (fCount > 0 && c == 10))
return null;
@@ -125,7 +122,7 @@ public class CambriaJsonStreamReader implements reader {
*
* @param o
*/
- //public msg(JSONObject o){}
+
public msg(JSONObject o) {
diff --git a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java
index 376d140..f64c0de 100644
--- a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java
+++ b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java
@@ -137,5 +137,5 @@ public class CambriaRawStreamReader implements reader
private final InputStream fStream;
private final String fDefPart;
private boolean fClosed;
- //private String transactionId;
+
}
diff --git a/src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java b/src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java
index b550373..ed0893d 100644
--- a/src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java
+++ b/src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java
@@ -46,7 +46,7 @@ public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator {
auth = true;
}
- //System.out.println("role " +role +" user: "+ req.getRemoteUser() +" : auth="+auth);
+
return auth;
}
@@ -57,7 +57,7 @@ public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator {
String permission = "";
String nameSpace ="";
if(topicName.contains(".") && topicName.contains("com.att")) {
- //String topic = topicName.substring(topicName.lastIndexOf(".")+1);
+
nameSpace = topicName.substring(0,topicName.lastIndexOf("."));
}
else {
@@ -67,12 +67,7 @@ public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator {
if(null==nameSpace)nameSpace="com.att.dmaap.mr.ueb";
- /*ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.TOPIC_NOT_IN_AAF.getResponseCode(), "Topic does not exist in AAF"
- , null, Utils.getFormattedDate(new Date()), topicName,
- null, null, null, null);
-
- throw new CambriaApiException(errRes);*/
+
}
permission = nameSpace+".mr.topic|:topic."+topicName+"|"+action;
diff --git a/src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java b/src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java
index e9f28ae..64dbc14 100644
--- a/src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java
+++ b/src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java
@@ -25,7 +25,7 @@ import javax.servlet.http.HttpServletRequest;
import com.att.dmf.mr.beans.DMaaPContext;
import com.att.dmf.mr.security.DMaaPAuthenticator;
-//import com.att.nsa.security.db.NsaApiDb;
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.security.NsaApiKey;
@@ -65,7 +65,7 @@ public class DMaaPMechIdAuthenticator <K extends NsaApiKey> implements DMaaPAuth
log.info ( "AUTH-LOG(" + remoteAddr + "): " + msg );
}
-// private final NsaApiDb<K> fDb;
+
//private static final Logger log = Logger.getLogger( MechIdAuthenticator.class.toString());
private static final EELFLogger log = EELFManager.getInstance().getLogger(MechIdAuthenticator.class);
/**
diff --git a/src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java b/src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java
index a26c9e7..b1e28e7 100644
--- a/src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java
+++ b/src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java
@@ -54,9 +54,9 @@ public class DMaaPOriginalUebAuthenticator<K extends NsaApiKey> implements DMaaP
public DMaaPOriginalUebAuthenticator(NsaApiDb<K> db, long requestTimeWindowMs) {
fDb = db;
fRequestTimeWindowMs = requestTimeWindowMs;
- //fAuthenticators = new LinkedList<DMaaPAuthenticator<K>>();
+
- //fAuthenticators.add(new DMaaPOriginalUebAuthenticator<K>(db, requestTimeWindowMs));
+
}
@@ -243,51 +243,51 @@ public class DMaaPOriginalUebAuthenticator<K extends NsaApiKey> implements DMaaP
"EEEE, dd-MMM-yy HH:mm:ss zzz",
};
- /*private static final String kDateFormats[] = {
- // W3C date format (RFC 3339).
- "yyyy-MM-dd'T'HH:mm:ssz",
+
+
+
- // Preferred HTTP date format (RFC 1123).
- "EEE, dd MMM yyyy HH:mm:ss zzz",
+
+
- // simple unix command line 'date' format
- "EEE MMM dd HH:mm:ss z yyyy",
+
+
- // Common date format (RFC 822).
- "EEE, dd MMM yy HH:mm:ss z", "EEE, dd MMM yy HH:mm z", "dd MMM yy HH:mm:ss z", "dd MMM yy HH:mm z",
+
+
- // Obsoleted HTTP date format (ANSI C asctime() format).
- "EEE MMM dd HH:mm:ss yyyy",
+
+
- // Obsoleted HTTP date format (RFC 1036).
- "EEEE, dd-MMM-yy HH:mm:ss zzz", }; */
+
+
// logger declaration
- //private static final Logger log = Logger.getLogger(DMaaPOriginalUebAuthenticator.class.toString());
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPOriginalUebAuthenticator.class);
@Override
-// public K authenticate(DMaaPContext ctx) {
+
// TODO Auto-generated method stub
- //return null;
+
//}
public K authenticate(DMaaPContext ctx) {
- /*final HttpServletRequest req = ctx.getRequest();
- for (DMaaPAuthenticator<K> a : fAuthenticators) {
- if (a.qualify(req)) {
- final K k = a.isAuthentic(req);
- if (k != null)
- return k;
- }
- // else: this request doesn't look right to the authenticator
- }*/
+
+
+
+
+
+
+
+
+
return null;
}
public void addAuthenticator ( DMaaPAuthenticator<K> a )
{
- //this.fAuthenticators.add(a);
+
}
- //private final LinkedList<DMaaPAuthenticator<K>> fAuthenticators;
+
} \ No newline at end of file
diff --git a/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java
index 110970f..f7c48de 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java
@@ -42,7 +42,7 @@ import com.att.nsa.configs.ConfigDbException;
import com.att.nsa.limits.Blacklist;
import com.att.nsa.security.NsaApiKey;
import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
-//import com.att.sa.highlandPark.util.HpJsonUtil;
+
/**
* @author muzainulhaque.qazi
diff --git a/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java
index c818f88..b0e8a86 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java
@@ -54,7 +54,7 @@ import com.att.nsa.security.db.simple.NsaSimpleApiKey;
@Service
public class ApiKeysServiceImpl implements ApiKeysService {
- //private Logger log = Logger.getLogger(ApiKeysServiceImpl.class.toString());
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(ApiKeysServiceImpl.class.toString());
/**
* This method will provide all the ApiKeys present in kafka server.
@@ -139,7 +139,7 @@ public class ApiKeysServiceImpl implements ApiKeysService {
String kSetting_AllowAnonymousKeys= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"apiKeys.allowAnonymous");
if(null==kSetting_AllowAnonymousKeys) kSetting_AllowAnonymousKeys ="false";
- // if ((contactEmail == null) || (contactEmail.length() == 0))
+
if ( kSetting_AllowAnonymousKeys.equalsIgnoreCase("true") && !emailProvided )
{
DMaaPResponseBuilder.respondWithErrorInJson(dmaapContext, 400, "You must provide an email address.");
@@ -165,7 +165,7 @@ public class ApiKeysServiceImpl implements ApiKeysService {
log.debug("=======ApiKeysServiceImpl: createApiKey : saving api key : "
+ key.toString() + "=====");
apiKeyDb.saveApiKey(key);
- // System.out.println("here4");
+
// email out the secret to validate the email address
if ( emailProvided )
{
@@ -196,9 +196,7 @@ public class ApiKeysServiceImpl implements ApiKeysService {
);
DMaaPResponseBuilder.respondOk(dmaapContext,
o);
- /*o.put("secret", "Emailed to " + contactEmail + ".");
- DMaaPResponseBuilder.respondOk(dmaapContext,
- o); */
+
return;
} else {
log.debug("=======ApiKeysServiceImpl: createApiKey : Error in creating API Key.=====");
diff --git a/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java
index e9671ce..22b60fe 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java
@@ -94,7 +94,7 @@ import com.att.nsa.util.rrConvertor;
@Service
public class EventsServiceImpl implements EventsService {
// private static final Logger LOG =
- // Logger.getLogger(EventsServiceImpl.class);
+
private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
private static final String BATCH_LENGTH = "event.batch.length";
@@ -103,10 +103,10 @@ public class EventsServiceImpl implements EventsService {
private DMaaPErrorMessages errorMessages;
//@Autowired
- //KafkaLiveLockAvoider2 kafkaLiveLockAvoider;
+
// @Value("${metrics.send.cambria.topic}")
- // private String metricsTopic;
+
public DMaaPErrorMessages getErrorMessages() {
return errorMessages;
@@ -133,7 +133,7 @@ public class EventsServiceImpl implements EventsService {
CambriaApiException, IOException, DMaaPAccessDeniedException {
final long startTime = System.currentTimeMillis();
final HttpServletRequest req = ctx.getRequest();
- //System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"+kafkaLiveLockAvoider);
+
boolean isAAFTopic = false;
// was this host blacklisted?
final String remoteAddr = Utils.getRemoteAddress(ctx);
@@ -158,7 +158,7 @@ public class EventsServiceImpl implements EventsService {
if (strtimeoutMS != null)
timeoutMs = Integer.parseInt(strtimeoutMS);
// int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
- // CambriaConstants.kNoTimeout);
+
if (req.getParameter("timeout") != null) {
timeoutMs = Integer.parseInt(req.getParameter("timeout"));
}
@@ -214,7 +214,7 @@ public class EventsServiceImpl implements EventsService {
// if headers are not provided then user will be null
if (user == null && null != ctx.getRequest().getHeader("Authorization")) {
// the topic name will be sent by the client
- // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"sub";
+
DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
String permission = aaf.aafPermissionString(topic, "sub");
if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
@@ -234,7 +234,7 @@ public class EventsServiceImpl implements EventsService {
logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
+ " " + clientId);
Consumer c = null;
- // String localclientId = clientId;
+
String lhostId = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
"clusterhostid");
if (null == lhostId) {
diff --git a/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java
index 83b3770..d867ea8 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java
@@ -49,7 +49,7 @@ import com.att.nsa.metrics.CdmMeasuredItem;
@Component
public class MetricsServiceImpl implements MetricsService {
- //private static final Logger LOG = Logger.getLogger(MetricsService.class.toString());
+
private static final EELFLogger LOG = EELFManager.getInstance().getLogger(MetricsService.class);
/**
*
diff --git a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java
index 9918024..7e9d783 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java
@@ -45,7 +45,7 @@ import com.att.dmf.mr.exception.DMaaPResponseCode;
import com.att.dmf.mr.exception.ErrorResponse;
import com.att.dmf.mr.metabroker.Broker.TopicExistsException;
import com.att.dmf.mr.metabroker.Broker1;
-//import com.att.dmf.mr.metabroker.Broker1;
+
import com.att.dmf.mr.metabroker.Topic;
import com.att.dmf.mr.security.DMaaPAAFAuthenticator;
import com.att.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
@@ -67,13 +67,13 @@ import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
public class TopicServiceImpl implements TopicService {
// private static final Logger LOGGER =
- // Logger.getLogger(TopicServiceImpl.class);
+
private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
@Autowired
private DMaaPErrorMessages errorMessages;
// @Value("${msgRtr.topicfactory.aaf}")
- // private String mrFactory;
+
public DMaaPErrorMessages getErrorMessages() {
return errorMessages;
@@ -125,7 +125,7 @@ public class TopicServiceImpl implements TopicService {
for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
JSONObject obj = new JSONObject();
obj.put("topicName", topic.getName());
- // obj.put("description", topic.getDescription());
+
obj.put("owner", topic.getOwner());
obj.put("txenabled", topic.isTransactionEnabled());
topicsList.put(obj);
@@ -193,7 +193,7 @@ public class TopicServiceImpl implements TopicService {
final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
String key = null;
- //String appName = dmaapContext.getRequest().getHeader("AppName");
+
String enfTopicName = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
"enforced.topic.name.AAF");
@@ -209,55 +209,55 @@ public class TopicServiceImpl implements TopicService {
"Failed to create topic: Access Denied.User does not have permission to perform create topic");
LOGGER.info(errRes.toString());
- // throw new DMaaPAccessDeniedException(errRes);
+
}
}
- // else if (user==null &&
+
// (null==dmaapContext.getRequest().getHeader("Authorization") && null
- // == dmaapContext.getRequest().getHeader("cookie")) ) {
- /*else if (user == null && null == dmaapContext.getRequest().getHeader("Authorization")
- ) {
- LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
+
+
+
+
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- "Failed to create topic: Access Denied.User does not have permission to perform create topic");
+
+
+
- LOGGER.info(errRes.toString());
- // throw new DMaaPAccessDeniedException(errRes);
- }*/
+
+
+
if (user == null /*&& (null != dmaapContext.getRequest().getHeader("Authorization")
)*/) {
- // if (user == null &&
+
// (null!=dmaapContext.getRequest().getHeader("Authorization") ||
- // null != dmaapContext.getRequest().getHeader("cookie"))) {
+
// ACL authentication is not provided so we will use the aaf
// authentication
- /*LOGGER.info("Authorization the topic");
+
- String permission = "";
- String nameSpace = "";
- if (topicBean.getTopicName().indexOf(".") > 1)
- nameSpace = topicBean.getTopicName().substring(0, topicBean.getTopicName().lastIndexOf("."));
+
+
+
+
- String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "msgRtr.topicfactory.aaf");
+
+
- // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
+
- permission = mrFactoryVal + nameSpace + "|create";
- DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();*/
+
+
- //if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
+
if (false) {
LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
"Failed to create topic: Access Denied.User does not have permission to create topic with perm "
- //+ permission);
+
+ "permission");
@@ -267,13 +267,13 @@ public class TopicServiceImpl implements TopicService {
} else {
// if user is null and aaf authentication is ok then key should
// be ""
- // key = "";
+
/**
* Added as part of AAF user it should return username
*/
- //key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
- //key="admin";
+
+
//LOGGER.info("key ==================== " + key);
}
@@ -283,7 +283,7 @@ public class TopicServiceImpl implements TopicService {
final String topicName = topicBean.getTopicName();
final String desc = topicBean.getTopicDescription();
int partition = topicBean.getPartitionCount();
- // int replica = topicBean.getReplicationCount();
+
if (partition == 0) {
partition = 8;
}
@@ -291,7 +291,7 @@ public class TopicServiceImpl implements TopicService {
int replica = topicBean.getReplicationCount();
if (replica == 0) {
- //replica = 3;
+
replica = 1;
}
final int replicas = replica;
@@ -503,25 +503,25 @@ public class TopicServiceImpl implements TopicService {
LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
- // if (user == null) {
+
//
// LOGGER.info("Authenticating the user, as ACL authentication is not
- // provided");
+
//// String permission =
- // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
+
//
- // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
- // String permission = aaf.aafPermissionString(topicName, "manage");
+
+
// if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
// {
// LOGGER.error("Failed to permit write access to producer [" +
// producerId + "] for topic " + topicName
- // + ". Authentication failed.");
+
// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
// errorMessages.getNotPermitted1()+" <Grant publish permissions>
- // "+errorMessages.getNotPermitted2()+ topicName);
- // LOGGER.info(errRes);
+
+
// throw new DMaaPAccessDeniedException(errRes);
// }
// }
@@ -561,25 +561,25 @@ public class TopicServiceImpl implements TopicService {
LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
- // if (user == null) {
+
//
//// String permission =
- // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
+
// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
// String permission = aaf.aafPermissionString(topicName, "manage");
// if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
// {
// LOGGER.error("Failed to revoke write access to producer [" +
// producerId + "] for topic " + topicName
- // + ". Authentication failed.");
+
// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
// errorMessages.getNotPermitted1()+" <Revoke publish permissions>
- // "+errorMessages.getNotPermitted2()+ topicName);
- // LOGGER.info(errRes);
+
+
// throw new DMaaPAccessDeniedException(errRes);
//
- // }
+
// }
Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
@@ -612,22 +612,22 @@ public class TopicServiceImpl implements TopicService {
LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
- // if (user == null) {
+
//
//// String permission =
- // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
- // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+
+
// String permission = aaf.aafPermissionString(topicName, "manage");
// if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
// {
// LOGGER.error("Failed to permit read access to consumer [" +
// consumerId + "] for topic " + topicName
- // + ". Authentication failed.");
+
// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
// errorMessages.getNotPermitted1()+" <Grant consume permissions>
- // "+errorMessages.getNotPermitted2()+ topicName);
- // LOGGER.info(errRes);
+
+
// throw new DMaaPAccessDeniedException(errRes);
// }
// }
@@ -662,27 +662,26 @@ public class TopicServiceImpl implements TopicService {
LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
- // if (user == null) {
+
//// String permission =
- // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
+
// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
// String permission = aaf.aafPermissionString(topicName, "manage");
// if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
// {
// LOGGER.error("Failed to revoke read access to consumer [" +
// consumerId + "] for topic " + topicName
- // + ". Authentication failed.");
+
// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
// errorMessages.getNotPermitted1()+" <Grant consume permissions>
- // "+errorMessages.getNotPermitted2()+ topicName);
+
// LOGGER.info(errRes);
// throw new DMaaPAccessDeniedException(errRes);
// }
//
//
- // }
-
+
Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
if (null == topic) {
diff --git a/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java
index ae2d863..3065928 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java
@@ -52,7 +52,7 @@ public class TransactionServiceImpl implements TransactionService {
throws ConfigDbException, IOException {
/*
- * ConfigurationReader configReader = dmaapContext.getConfigReader();
+
*
* LOG.info("configReader : "+configReader.toString());
*
@@ -77,7 +77,7 @@ public class TransactionServiceImpl implements TransactionService {
IOException {
/*
- * if (null != transactionId) {
+
*
* ConfigurationReader configReader = dmaapContext.getConfigReader();
*
diff --git a/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java
index 33bc2f4..c8bb073 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java
@@ -27,7 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-//import kafka.common.TopicExistsException;
+
import org.apache.kafka.common.errors.TopicExistsException;
import org.json.JSONArray;
import org.json.JSONObject;
@@ -50,7 +50,7 @@ import com.att.nsa.security.db.simple.NsaSimpleApiKey;
@Service
public class UIServiceImpl implements UIService {
- //private static final Logger LOGGER = Logger.getLogger(UIServiceImpl.class);
+
private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(UIServiceImpl.class);
/**
* Returning template of hello page
diff --git a/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java b/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java
index dd1e4eb..fdf2d28 100644
--- a/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java
+++ b/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java
@@ -55,7 +55,7 @@ import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
import com.att.nsa.limits.Blacklist;
import com.att.nsa.security.NsaAuthenticatorService;
-//import com.att.nsa.security.authenticators.OriginalUebAuthenticator;
+
import com.att.nsa.security.db.BaseNsaApiDbImpl;
import com.att.nsa.security.db.NsaApiDb;
import com.att.nsa.security.db.NsaApiDb.KeyExistsException;
@@ -70,7 +70,7 @@ import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
@Component
public class ConfigurationReader {
-// private rrNvReadable settings;
+
private Broker1 fMetaBroker;
private ConsumerFactory fConsumerFactory;
private Publisher fPublisher;
@@ -78,7 +78,7 @@ public class ConfigurationReader {
@Autowired
private DMaaPCambriaLimiter fRateLimiter;
private NsaApiDb<NsaSimpleApiKey> fApiKeyDb;
- /* private DMaaPTransactionObjDB<DMaaPTransactionObj> fTranDb; */
+
private DMaaPAuthenticator<NsaSimpleApiKey> fSecurityManager;
private NsaAuthenticatorService<NsaSimpleApiKey> nsaSecurityManager;
private static CuratorFramework curator;
@@ -90,7 +90,7 @@ public class ConfigurationReader {
private Emailer fEmailer;
private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
- //private static final Logger log = Logger.getLogger(ConfigurationReader.class.toString());
+
/**
* constructor to initialize all the values
@@ -129,7 +129,7 @@ public class ConfigurationReader {
@Qualifier("dMaaPAuthenticatorImpl") DMaaPAuthenticator<NsaSimpleApiKey> fSecurityManager
)
throws missingReqdSetting, invalidSettingValue, ServletException, KafkaConsumerCacheException, ConfigDbException {
- //this.settings = settings;
+
this.fMetrics = fMetrics;
this.zk = zk;
this.fConfigDb = fConfigDb;
@@ -137,18 +137,18 @@ public class ConfigurationReader {
ConfigurationReader.curator = curator;
this.fConsumerFactory = fConsumerFactory;
this.fMetaBroker = fMetaBroker;
- //System.out.println("SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSs " + fMetaBroker);
+
this.q = q;
this.mmb = mmb;
this.fApiKeyDb = fApiKeyDb;
- /* this.fTranDb = fTranDb; */
+
this.fSecurityManager = fSecurityManager;
long allowedtimeSkewMs=600000L;
String strallowedTimeSkewM= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"authentication.allowedTimeSkewMs");
if(null!=strallowedTimeSkewM)allowedtimeSkewMs= Long.parseLong(strallowedTimeSkewM);
- // boolean requireSecureChannel = true;
+
//String strrequireSecureChannel= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"aauthentication.requireSecureChannel");
//if(strrequireSecureChannel!=null)requireSecureChannel=Boolean.parseBoolean(strrequireSecureChannel);
//this.nsaSecurityManager = new NsaAuthenticatorService<NsaSimpleApiKey>(this.fApiKeyDb, settings.getLong("authentication.allowedTimeSkewMs", 600000L), settings.getBoolean("authentication.requireSecureChannel", true));
diff --git a/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java b/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java
index 4c38d57..214aac8 100644
--- a/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java
+++ b/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java
@@ -130,10 +130,10 @@ public class DMaaPResponseBuilder {
*/
public static void respondOkWithStream(DMaaPContext ctx, String mediaType, StreamWriter writer) throws IOException {
ctx.getResponse().setStatus(200);
- OutputStream os = getStreamForBinaryResponse(ctx, mediaType);
- writer.write(os);
- os.close();
-
+ try(OutputStream os = getStreamForBinaryResponse(ctx, mediaType)) {
+ writer.write(os);
+ }
+
}
@@ -218,7 +218,7 @@ public class DMaaPResponseBuilder {
/**
* interface used to define write method for outputStream
*/
- public static abstract interface StreamWriter {
+ public abstract static interface StreamWriter {
/**
* abstract method used to write the response
*
@@ -252,27 +252,20 @@ public class DMaaPResponseBuilder {
boolean fResponseEntityAllowed = (!(ctx.getRequest().getMethod().equalsIgnoreCase("HEAD")));
-
- OutputStream os = null;
- try{
+
if (fResponseEntityAllowed) {
- os = ctx.getResponse().getOutputStream();
- return os;
+ try(OutputStream os = ctx.getResponse().getOutputStream()){
+ return os;
+ }catch (Exception e){
+ log.error("Exception in getStreamForBinaryResponse",e);
+ throw new IOException();
+ }
} else {
- os = new NullStream();
- return os;
- }
- }catch (Exception e){
- throw new IOException();
-
- }
- finally{
- if(null != os){
- try{
- os.close();
- }catch(Exception e) {
-
- }
+ try(OutputStream os = new NullStream()){
+ return os;
+ }catch (Exception e){
+ log.error("Exception in getStreamForBinaryResponse",e);
+ throw new IOException();
}
}
}
diff --git a/src/main/java/com/att/dmf/mr/utils/PropertyReader.java b/src/main/java/com/att/dmf/mr/utils/PropertyReader.java
index 58c9fc9..000869e 100644
--- a/src/main/java/com/att/dmf/mr/utils/PropertyReader.java
+++ b/src/main/java/com/att/dmf/mr/utils/PropertyReader.java
@@ -39,9 +39,9 @@ public class PropertyReader extends nvReadableStack {
* initializing logger
*
*/
- //private static final Logger LOGGER = Logger.getLogger(PropertyReader.class);
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(PropertyReader.class);
-// private static final String MSGRTR_PROPERTIES_FILE = "msgRtrApi.properties";
+
/**
* constructor initialization
@@ -50,11 +50,11 @@ public class PropertyReader extends nvReadableStack {
*
*/
public PropertyReader() throws loadException {
- /* Map<String, String> argMap = new HashMap<String, String>();
- final String config = getSetting(argMap, CambriaConstants.kConfig, MSGRTR_PROPERTIES_FILE);
- final URL settingStream = findStream(config, ConfigurationReader.class);
- push(new nvPropertiesFile(settingStream));
- push(new nvReadableTable(argMap));*/
+
+
+
+
+
}
/**
@@ -83,43 +83,43 @@ public class PropertyReader extends nvReadableStack {
* @exception MalformedURLException
*
*/
- /*public static URL findStream(final String resourceName, Class<?> clazz) {
- try {
- File file = new File(resourceName);
+
+
+
- if (file.isAbsolute()) {
- return file.toURI().toURL();
- }
+
+
+
- String filesRoot = System.getProperty("RRWT_FILES", null);
+
- if (null != filesRoot) {
+
- String fullPath = filesRoot + "/" + resourceName;
+
- LOGGER.debug("Looking for [" + fullPath + "].");
+
- file = new File(fullPath);
- if (file.exists()) {
- return file.toURI().toURL();
- }
- }
+
+
+
+
+
- URL res = clazz.getClassLoader().getResource(resourceName);
+
- if (null != res) {
- return res;
- }
+
+
+
- res = ClassLoader.getSystemResource(resourceName);
+
+
+
+
+
+
+
+
+
+
- if (null != res) {
- return res;
- }
- } catch (MalformedURLException e) {
- LOGGER.error("Unexpected failure to convert a local filename into a URL: " + e.getMessage(), e);
- }
- return null;
- }
-*/
}
diff --git a/src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java b/src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java
index 08380fb..0e2804e 100644
--- a/src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java
+++ b/src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java
@@ -86,7 +86,7 @@ public class DMaaPMetricsSender implements Runnable {
String Setting_CambriaTopic=com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_CambriaTopic);
if(Setting_CambriaTopic==null) Setting_CambriaTopic = "msgrtr.apinode.metrics.dmaap";
- // Setting_CambriaBaseUrl=Setting_CambriaBaseUrl==null?defaultTopic:Setting_CambriaBaseUrl;
+
String Setting_CambriaSendFreqSecs=com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_CambriaSendFreqSecs);
@@ -179,7 +179,7 @@ public class DMaaPMetricsSender implements Runnable {
private final CambriaPublisher fCambria;
private final String fHostname;
- //private static final Logger log = LoggerFactory.getLogger(MetricsSender.class);
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(MetricsSender.class);
/**
diff --git a/src/main/java/com/att/mr/filter/ContentLengthFilter.java b/src/main/java/com/att/mr/filter/ContentLengthFilter.java
index b99f9e6..26f58e0 100644
--- a/src/main/java/com/att/mr/filter/ContentLengthFilter.java
+++ b/src/main/java/com/att/mr/filter/ContentLengthFilter.java
@@ -52,7 +52,7 @@ public class ContentLengthFilter implements Filter {
private FilterConfig filterConfig = null;
DMaaPErrorMessages errorMessages = null;
- //private Logger log = Logger.getLogger(ContentLengthFilter.class.toString());
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(ContentLengthFilter.class);
/**
* Default constructor.
@@ -110,7 +110,7 @@ public class ContentLengthFilter implements Filter {
DMaaPResponseCode.MSG_SIZE_EXCEEDS_MSG_LIMIT.getResponseCode(), errorMessages.getMsgSizeExceeds()
+ jsonObj.toString());
log.info(errRes.toString());
- // throw new CambriaApiException(errRes);
+
}
}
diff --git a/src/test/java/com/att/nsa/cambria/backends/kafka/CuratorFrameworkImpl.java b/src/test/java/com/att/nsa/cambria/backends/kafka/CuratorFrameworkImpl.java
index 64e128c..a12e96c 100644
--- a/src/test/java/com/att/nsa/cambria/backends/kafka/CuratorFrameworkImpl.java
+++ b/src/test/java/com/att/nsa/cambria/backends/kafka/CuratorFrameworkImpl.java
@@ -24,23 +24,32 @@ import java.util.concurrent.TimeUnit;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.CreateBuilder;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.DeleteBuilder;
import org.apache.curator.framework.api.ExistsBuilder;
import org.apache.curator.framework.api.GetACLBuilder;
import org.apache.curator.framework.api.GetChildrenBuilder;
+import org.apache.curator.framework.api.GetConfigBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
+import org.apache.curator.framework.api.ReconfigBuilder;
+import org.apache.curator.framework.api.RemoveWatchesBuilder;
import org.apache.curator.framework.api.SetACLBuilder;
import org.apache.curator.framework.api.SetDataBuilder;
import org.apache.curator.framework.api.SyncBuilder;
import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
+import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.schema.SchemaSet;
+import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
public class CuratorFrameworkImpl implements CuratorFramework {
@@ -200,4 +209,70 @@ public class CuratorFrameworkImpl implements CuratorFramework {
return null;
}
+ @Override
+ public ReconfigBuilder reconfig() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public GetConfigBuilder getConfig() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public CuratorMultiTransaction transaction() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public TransactionOp transactionOp() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void createContainers(String path) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public RemoveWatchesBuilder watches() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ConnectionStateErrorPolicy getConnectionStateErrorPolicy() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public QuorumVerifier getCurrentConfig() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public SchemaSet getSchemaSet() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean isZk34CompatibilityMode() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
}