summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java2
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java2
-rw-r--r--src/main/java/com/att/dmf/mr/constants/CambriaConstants.java2
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java2
-rw-r--r--src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java56
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java6
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java6
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java6
-rw-r--r--src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java2
-rw-r--r--src/main/java/com/att/dmf/mr/utils/Emailer.java2
10 files changed, 43 insertions, 43 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java b/src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java
index 5d3bc47..cc3338b 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java
@@ -69,7 +69,7 @@ public class LockInstructionWatcher implements Watcher {
try {
log.info("node children changed at path: {}", event.getPath());
- //String grpName = new String(curatorFramework.getData().forPath(event.getPath()));
+
List<String> children = curatorFramework.getChildren().forPath(event.getPath());
log.info("found children nodes prodcessing now");
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java b/src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java
index 8e41c9f..eb77dc2 100644
--- a/src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java
@@ -32,7 +32,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import com.att.dmf.mr.backends.Publisher;
-//import kafka.producer.KeyedMessage;
+
/**
* class used for logging perspective
diff --git a/src/main/java/com/att/dmf/mr/constants/CambriaConstants.java b/src/main/java/com/att/dmf/mr/constants/CambriaConstants.java
index b640688..cb6653c 100644
--- a/src/main/java/com/att/dmf/mr/constants/CambriaConstants.java
+++ b/src/main/java/com/att/dmf/mr/constants/CambriaConstants.java
@@ -75,7 +75,7 @@ String msgRtr_prop="MsgRtrApi.properties";
int kDefault_MaxThreads = -1;
-// String kSetting_TomcatProtocolClass = "tomcat.protocolClass";
+
//String kDefault_TomcatProtocolClass = Http11NioProtocol.class.getName ();
String kSetting_ZkConfigDbServers = "config.zk.servers";
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java
index ebdf3ed..80904ea 100644
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java
@@ -59,7 +59,7 @@ public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metr
fLog = EELFManager.getInstance().getLogger(this.getClass().getName());
- //( this.getClass().getName () );
+
}
@Override
diff --git a/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java b/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java
index cde7720..97d1fa5 100644
--- a/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java
+++ b/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java
@@ -171,7 +171,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
}
// public Builder atOffset ( int pos )
- // {
+
// fOffset = pos;
// return this;
// }
@@ -197,7 +197,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
fConsumer = builder.fConsumer;
fLimit = builder.fLimit;
fTimeoutMs = builder.fTimeoutMs;
- // fSettings = builder.fSettings;
+
fSent = 0;
fPretty = builder.fPretty;
fWithMeta = builder.fWithMeta;
@@ -244,7 +244,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
* @param msg
* @throws IOException
*/
- // void onMessage(int count, Message msg) throws IOException;
+
void onMessage(int count, String msg, String transId, long offSet) throws IOException, JSONException;
}
@@ -263,7 +263,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
* throws IOException
*/
public void write(final OutputStream os) throws IOException {
- // final boolean transactionEnabled = topic.isTransactionEnabled();
+
// final boolean transactionEnabled = isTransEnabled();
// final boolean transactionEnabled = istransEnable;
// synchronized(this){
@@ -282,7 +282,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
entry.put("message", msg);
os.write(entry.toString().getBytes());
} else {
- // os.write(message.getBytes());
+
String jsonString = JSONObject.valueToString(msg);
os.write(jsonString.getBytes());
}
@@ -299,7 +299,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
try {
if (istransEnable && istransType) {
// final String transactionId =
- // jsonMessage.getString("transactionId");
+
// responseTransactionId = transId;
StringBuilder consumerInfo = new StringBuilder();
if (null != dmaapContext && null != dmaapContext.getRequest()) {
@@ -338,7 +338,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
}
});
- // if (null != dmaapContext && isTransactionEnabled()) {
+
if (null != dmaapContext && istransEnable && istransType) {
dmaapContext.getResponse().setHeader("transactionId",
@@ -353,10 +353,10 @@ public class CambriaOutboundEventStream implements StreamWriter {
if (null != strclose_out_stream)
close_out_stream = Boolean.parseBoolean(strclose_out_stream);
- // if (fSettings.getBoolean("close.output.stream", true)) {
+
if (close_out_stream) {
os.close();
- // }
+
}
}
@@ -479,10 +479,10 @@ public class CambriaOutboundEventStream implements StreamWriter {
*
* Checks whether filter is initialized
*/
- /*private boolean isFilterInitialized() {
- return (fHpAlarmFilter != null && fHppe != null);
- }
-*/
+
+
+
+
/**
*
* @param msg
@@ -490,18 +490,18 @@ public class CambriaOutboundEventStream implements StreamWriter {
*/
private boolean filterMatches(String msg) {
boolean result = true;
- /*if (isFilterInitialized()) {
- try {
- final HpJsonEvent e = new HpJsonEvent("e", new JSONObject(msg));
- result = fHpAlarmFilter.matches(fHppe, e);
- } catch (JSONException x) {
- // the msg may not be JSON
- result = false;
- log.error("Failed due to " + x.getMessage());
- } catch (Exception x) {
- log.error("Error using filter: " + x.getMessage(), x);
- }
- }*/
+
+
+
+
+
+
+
+
+
+
+
+
return result;
}
@@ -534,11 +534,11 @@ public class CambriaOutboundEventStream implements StreamWriter {
private final Consumer fConsumer;
private final int fLimit;
private final int fTimeoutMs;
- // private final rrNvReadable fSettings;
+
private final boolean fPretty;
private final boolean fWithMeta;
private int fSent;
-// private final HpAlarmFilter<HpJsonEvent> fHpAlarmFilter;
+
//private final HpProcessingEngine<HpJsonEvent> fHppe;
private DMaaPContext dmaapContext;
private String responseTransactionId;
@@ -548,7 +548,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
private ArrayList<Consumer> fKafkaConsumerList;
private boolean istransType = true;
// private static final Logger log =
- // Logger.getLogger(CambriaOutboundEventStream.class);
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class);
} \ No newline at end of file
diff --git a/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java
index 22b60fe..88c5fd9 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java
@@ -376,7 +376,7 @@ public class EventsServiceImpl implements EventsService {
String topicNameStd = null;
// topicNameStd=
- // ctx.getConfigReader().getSettings().getString("enforced.topic.name.AAF");
+
topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
"enforced.topic.name.AAF");
String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
@@ -404,7 +404,7 @@ public class EventsServiceImpl implements EventsService {
if (topicNameEnforced || (user == null && null != ctx.getRequest().getHeader("Authorization")
&& !topic.equalsIgnoreCase(metricTopicname))) {
// the topic name will be sent by the client
- // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"pub";
+
DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
String permission = aaf.aafPermissionString(topic, "pub");
if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
@@ -486,7 +486,7 @@ public class EventsServiceImpl implements EventsService {
if (null != batchlen)
maxEventBatch = Long.parseLong(batchlen);
// long maxEventBatch =
- // ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
+
final LinkedList<Publisher.message> batch = new LinkedList<>();
// final ArrayList<KeyedMessage<String, String>> kms = new
// ArrayList<KeyedMessage<String, String>>();
diff --git a/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java
index da8cb16..e55f510 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java
@@ -114,7 +114,7 @@ public class MMServiceImpl implements MMService {
throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
CambriaApiException, IOException {
- // final long startTime = System.currentTimeMillis();
+
final HttpServletRequest req = ctx.getRequest();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -138,13 +138,13 @@ public class MMServiceImpl implements MMService {
limit = Integer.parseInt(req.getParameter("limit"));
}
limit = 1;
- // int timeoutMs = 60000;
+
int timeoutMs = CambriaConstants.kNoTimeout;
String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout");
if (strtimeoutMS != null)
timeoutMs = Integer.parseInt(strtimeoutMS);
// int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
- // CambriaConstants.kNoTimeout);
+
if (req.getParameter("timeout") != null) {
timeoutMs = Integer.parseInt(req.getParameter("timeout"));
}
diff --git a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java
index cef646e..4c872ad 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java
@@ -618,7 +618,7 @@ public class TopicServiceImpl implements TopicService {
// errorMessages.getNotPermitted1()+" <Grant consume permissions>
- // throw new DMaaPAccessDeniedException(errRes);
+
// }
// }
@@ -655,7 +655,7 @@ public class TopicServiceImpl implements TopicService {
//// String permission =
- // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+
// String permission = aaf.aafPermissionString(topicName, "manage");
// if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
// {
@@ -666,7 +666,7 @@ public class TopicServiceImpl implements TopicService {
// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
// errorMessages.getNotPermitted1()+" <Grant consume permissions>
- // LOGGER.info(errRes);
+
// throw new DMaaPAccessDeniedException(errRes);
// }
//
diff --git a/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java b/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java
index 214aac8..72db9de 100644
--- a/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java
+++ b/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java
@@ -46,7 +46,7 @@ import com.att.eelf.configuration.EELFManager;
public class DMaaPResponseBuilder {
- //private static Logger log = Logger.getLogger(DMaaPResponseBuilder.class);
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPResponseBuilder.class);
protected static final int kBufferLength = 4096;
diff --git a/src/main/java/com/att/dmf/mr/utils/Emailer.java b/src/main/java/com/att/dmf/mr/utils/Emailer.java
index bdff501..a940abf 100644
--- a/src/main/java/com/att/dmf/mr/utils/Emailer.java
+++ b/src/main/java/com/att/dmf/mr/utils/Emailer.java
@@ -108,7 +108,7 @@ public class Emailer
private String getSetting ( String settingKey, String defval )
{
- //return fSettings.getString ( settingKey, defval );
+
String strSet = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,settingKey);
if(strSet==null)strSet=defval;
return strSet;