diff options
10 files changed, 43 insertions, 43 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java b/src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java index 5d3bc47..cc3338b 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java @@ -69,7 +69,7 @@ public class LockInstructionWatcher implements Watcher { try { log.info("node children changed at path: {}", event.getPath()); - //String grpName = new String(curatorFramework.getData().forPath(event.getPath())); + List<String> children = curatorFramework.getChildren().forPath(event.getPath()); log.info("found children nodes prodcessing now"); diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java b/src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java index 8e41c9f..eb77dc2 100644 --- a/src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java +++ b/src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java @@ -32,7 +32,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import com.att.dmf.mr.backends.Publisher; -//import kafka.producer.KeyedMessage; + /** * class used for logging perspective diff --git a/src/main/java/com/att/dmf/mr/constants/CambriaConstants.java b/src/main/java/com/att/dmf/mr/constants/CambriaConstants.java index b640688..cb6653c 100644 --- a/src/main/java/com/att/dmf/mr/constants/CambriaConstants.java +++ b/src/main/java/com/att/dmf/mr/constants/CambriaConstants.java @@ -75,7 +75,7 @@ String msgRtr_prop="MsgRtrApi.properties"; int kDefault_MaxThreads = -1; -// String kSetting_TomcatProtocolClass = "tomcat.protocolClass"; + //String kDefault_TomcatProtocolClass = Http11NioProtocol.class.getName (); String kSetting_ZkConfigDbServers = "config.zk.servers"; diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java index ebdf3ed..80904ea 100644 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java @@ -59,7 +59,7 @@ public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metr fLog = EELFManager.getInstance().getLogger(this.getClass().getName()); - //( this.getClass().getName () ); + } @Override diff --git a/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java b/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java index cde7720..97d1fa5 100644 --- a/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java +++ b/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java @@ -171,7 +171,7 @@ public class CambriaOutboundEventStream implements StreamWriter { } // public Builder atOffset ( int pos ) - // { + // fOffset = pos; // return this; // } @@ -197,7 +197,7 @@ public class CambriaOutboundEventStream implements StreamWriter { fConsumer = builder.fConsumer; fLimit = builder.fLimit; fTimeoutMs = builder.fTimeoutMs; - // fSettings = builder.fSettings; + fSent = 0; fPretty = builder.fPretty; fWithMeta = builder.fWithMeta; @@ -244,7 +244,7 @@ public class CambriaOutboundEventStream implements StreamWriter { * @param msg * @throws IOException */ - // void onMessage(int count, Message msg) throws IOException; + void onMessage(int count, String msg, String transId, long offSet) throws IOException, JSONException; } @@ -263,7 +263,7 @@ public class CambriaOutboundEventStream implements StreamWriter { * throws IOException */ public void write(final OutputStream os) throws IOException { - // final boolean transactionEnabled = topic.isTransactionEnabled(); + // final boolean transactionEnabled = isTransEnabled(); // final boolean transactionEnabled = istransEnable; // synchronized(this){ @@ -282,7 +282,7 @@ public class CambriaOutboundEventStream implements StreamWriter { entry.put("message", msg); os.write(entry.toString().getBytes()); } else { - // os.write(message.getBytes()); + String jsonString = JSONObject.valueToString(msg); os.write(jsonString.getBytes()); } @@ -299,7 +299,7 @@ public class CambriaOutboundEventStream implements StreamWriter { try { if (istransEnable && istransType) { // final String transactionId = - // jsonMessage.getString("transactionId"); + // responseTransactionId = transId; StringBuilder consumerInfo = new StringBuilder(); if (null != dmaapContext && null != dmaapContext.getRequest()) { @@ -338,7 +338,7 @@ public class CambriaOutboundEventStream implements StreamWriter { } }); - // if (null != dmaapContext && isTransactionEnabled()) { + if (null != dmaapContext && istransEnable && istransType) { dmaapContext.getResponse().setHeader("transactionId", @@ -353,10 +353,10 @@ public class CambriaOutboundEventStream implements StreamWriter { if (null != strclose_out_stream) close_out_stream = Boolean.parseBoolean(strclose_out_stream); - // if (fSettings.getBoolean("close.output.stream", true)) { + if (close_out_stream) { os.close(); - // } + } } @@ -479,10 +479,10 @@ public class CambriaOutboundEventStream implements StreamWriter { * * Checks whether filter is initialized */ - /*private boolean isFilterInitialized() { - return (fHpAlarmFilter != null && fHppe != null); - } -*/ + + + + /** * * @param msg @@ -490,18 +490,18 @@ public class CambriaOutboundEventStream implements StreamWriter { */ private boolean filterMatches(String msg) { boolean result = true; - /*if (isFilterInitialized()) { - try { - final HpJsonEvent e = new HpJsonEvent("e", new JSONObject(msg)); - result = fHpAlarmFilter.matches(fHppe, e); - } catch (JSONException x) { - // the msg may not be JSON - result = false; - log.error("Failed due to " + x.getMessage()); - } catch (Exception x) { - log.error("Error using filter: " + x.getMessage(), x); - } - }*/ + + + + + + + + + + + + return result; } @@ -534,11 +534,11 @@ public class CambriaOutboundEventStream implements StreamWriter { private final Consumer fConsumer; private final int fLimit; private final int fTimeoutMs; - // private final rrNvReadable fSettings; + private final boolean fPretty; private final boolean fWithMeta; private int fSent; -// private final HpAlarmFilter<HpJsonEvent> fHpAlarmFilter; + //private final HpProcessingEngine<HpJsonEvent> fHppe; private DMaaPContext dmaapContext; private String responseTransactionId; @@ -548,7 +548,7 @@ public class CambriaOutboundEventStream implements StreamWriter { private ArrayList<Consumer> fKafkaConsumerList; private boolean istransType = true; // private static final Logger log = - // Logger.getLogger(CambriaOutboundEventStream.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class); }
\ No newline at end of file diff --git a/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java index 22b60fe..88c5fd9 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java @@ -376,7 +376,7 @@ public class EventsServiceImpl implements EventsService { String topicNameStd = null; // topicNameStd= - // ctx.getConfigReader().getSettings().getString("enforced.topic.name.AAF"); + topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"); String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, @@ -404,7 +404,7 @@ public class EventsServiceImpl implements EventsService { if (topicNameEnforced || (user == null && null != ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname))) { // the topic name will be sent by the client - // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"pub"; + DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); String permission = aaf.aafPermissionString(topic, "pub"); if (!aaf.aafAuthentication(ctx.getRequest(), permission)) { @@ -486,7 +486,7 @@ public class EventsServiceImpl implements EventsService { if (null != batchlen) maxEventBatch = Long.parseLong(batchlen); // long maxEventBatch = - // ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16); + final LinkedList<Publisher.message> batch = new LinkedList<>(); // final ArrayList<KeyedMessage<String, String>> kms = new // ArrayList<KeyedMessage<String, String>>(); diff --git a/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java index da8cb16..e55f510 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java @@ -114,7 +114,7 @@ public class MMServiceImpl implements MMService { throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException, CambriaApiException, IOException { - // final long startTime = System.currentTimeMillis(); + final HttpServletRequest req = ctx.getRequest(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -138,13 +138,13 @@ public class MMServiceImpl implements MMService { limit = Integer.parseInt(req.getParameter("limit")); } limit = 1; - // int timeoutMs = 60000; + int timeoutMs = CambriaConstants.kNoTimeout; String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout"); if (strtimeoutMS != null) timeoutMs = Integer.parseInt(strtimeoutMS); // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout", - // CambriaConstants.kNoTimeout); + if (req.getParameter("timeout") != null) { timeoutMs = Integer.parseInt(req.getParameter("timeout")); } diff --git a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java index cef646e..4c872ad 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java @@ -618,7 +618,7 @@ public class TopicServiceImpl implements TopicService { // errorMessages.getNotPermitted1()+" <Grant consume permissions> - // throw new DMaaPAccessDeniedException(errRes); + // } // } @@ -655,7 +655,7 @@ public class TopicServiceImpl implements TopicService { //// String permission = - // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + // String permission = aaf.aafPermissionString(topicName, "manage"); // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) // { @@ -666,7 +666,7 @@ public class TopicServiceImpl implements TopicService { // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), // errorMessages.getNotPermitted1()+" <Grant consume permissions> - // LOGGER.info(errRes); + // throw new DMaaPAccessDeniedException(errRes); // } // diff --git a/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java b/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java index 214aac8..72db9de 100644 --- a/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java +++ b/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java @@ -46,7 +46,7 @@ import com.att.eelf.configuration.EELFManager; public class DMaaPResponseBuilder { - //private static Logger log = Logger.getLogger(DMaaPResponseBuilder.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPResponseBuilder.class); protected static final int kBufferLength = 4096; diff --git a/src/main/java/com/att/dmf/mr/utils/Emailer.java b/src/main/java/com/att/dmf/mr/utils/Emailer.java index bdff501..a940abf 100644 --- a/src/main/java/com/att/dmf/mr/utils/Emailer.java +++ b/src/main/java/com/att/dmf/mr/utils/Emailer.java @@ -108,7 +108,7 @@ public class Emailer private String getSetting ( String settingKey, String defval ) { - //return fSettings.getString ( settingKey, defval ); + String strSet = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,settingKey); if(strSet==null)strSet=defval; return strSet; |