summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java')
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java22
1 files changed, 11 insertions, 11 deletions
diff --git a/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java
index 4ca6446..22b60fe 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java
@@ -94,7 +94,7 @@ import com.att.nsa.util.rrConvertor;
@Service
public class EventsServiceImpl implements EventsService {
// private static final Logger LOG =
- // Logger.getLogger(EventsServiceImpl.class);
+
private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
private static final String BATCH_LENGTH = "event.batch.length";
@@ -103,10 +103,10 @@ public class EventsServiceImpl implements EventsService {
private DMaaPErrorMessages errorMessages;
//@Autowired
- //KafkaLiveLockAvoider2 kafkaLiveLockAvoider;
+
// @Value("${metrics.send.cambria.topic}")
- // private String metricsTopic;
+
public DMaaPErrorMessages getErrorMessages() {
return errorMessages;
@@ -133,7 +133,7 @@ public class EventsServiceImpl implements EventsService {
CambriaApiException, IOException, DMaaPAccessDeniedException {
final long startTime = System.currentTimeMillis();
final HttpServletRequest req = ctx.getRequest();
- //System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"+kafkaLiveLockAvoider);
+
boolean isAAFTopic = false;
// was this host blacklisted?
final String remoteAddr = Utils.getRemoteAddress(ctx);
@@ -158,7 +158,7 @@ public class EventsServiceImpl implements EventsService {
if (strtimeoutMS != null)
timeoutMs = Integer.parseInt(strtimeoutMS);
// int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
- // CambriaConstants.kNoTimeout);
+
if (req.getParameter("timeout") != null) {
timeoutMs = Integer.parseInt(req.getParameter("timeout"));
}
@@ -214,7 +214,7 @@ public class EventsServiceImpl implements EventsService {
// if headers are not provided then user will be null
if (user == null && null != ctx.getRequest().getHeader("Authorization")) {
// the topic name will be sent by the client
- // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"sub";
+
DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
String permission = aaf.aafPermissionString(topic, "sub");
if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
@@ -234,7 +234,7 @@ public class EventsServiceImpl implements EventsService {
logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
+ " " + clientId);
Consumer c = null;
- // String localclientId = clientId;
+
String lhostId = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
"clusterhostid");
if (null == lhostId) {
@@ -481,16 +481,16 @@ public class EventsServiceImpl implements EventsService {
// start processing, building a batch to push to the backend
final long startMs = System.currentTimeMillis();
long count = 0;
- long maxEventBatch = 1024 * 16;
+ long maxEventBatch = 1024L* 16;
String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
if (null != batchlen)
maxEventBatch = Long.parseLong(batchlen);
// long maxEventBatch =
// ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
- final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
+ final LinkedList<Publisher.message> batch = new LinkedList<>();
// final ArrayList<KeyedMessage<String, String>> kms = new
// ArrayList<KeyedMessage<String, String>>();
- final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
+ final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
try {
// for each message...
Publisher.message m = null;
@@ -592,7 +592,7 @@ public class EventsServiceImpl implements EventsService {
// start processing, building a batch to push to the backend
final long startMs = System.currentTimeMillis();
long count = 0;
- long maxEventBatch = 1024 * 16;
+ long maxEventBatch = 1024L * 16;
String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
if (null != evenlen)
maxEventBatch = Long.parseLong(evenlen);