summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java4
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java24
2 files changed, 14 insertions, 14 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
index 93374fb..b66a251 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
@@ -77,7 +77,7 @@ public class Kafka011Consumer implements Consumer {
fId = id;
fCreateTimeMs = System.currentTimeMillis();
fLastTouch = fCreateTimeMs;
- fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String, String>>();
+ fPendingMsgs = new LinkedBlockingQueue<>();
fLogTag = fGroup + "(" + fId + ")/" + fTopic;
offset = 0;
state = Kafka011Consumer.State.OPENED;
@@ -113,7 +113,7 @@ public class Kafka011Consumer implements Consumer {
public synchronized Consumer.Message nextMessage() {
try {
- if (fPendingMsgs.size() > 0) {
+ if (fPendingMsgs.isEmpty()) {
return makeMessage(fPendingMsgs.take());
}
} catch (InterruptedException x) {
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java
index 387b667..2f436ec 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java
@@ -637,7 +637,7 @@ public class EventsServiceImpl implements EventsService {
// final KeyedMessage<String, String> data = new
// KeyedMessage<String, String>(topic, m.getKey(),
- // kms.add(data);
+
final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
m.getMessage());
@@ -650,7 +650,7 @@ public class EventsServiceImpl implements EventsService {
+ batchId + "]");
try {
// ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
- // kms);
+
ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
// transactionLogs(batch);
for (message msg : batch) {
@@ -681,7 +681,7 @@ public class EventsServiceImpl implements EventsService {
metricsSet.publishTick(sizeNow);
publishBatchCount = sizeNow;
count += sizeNow;
- // batchId++;
+
String endTime = sdf.format(new Date());
LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
+ batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
@@ -698,9 +698,9 @@ public class EventsServiceImpl implements EventsService {
+ batchId + "]");
try {
// ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
- // kms);
+
ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
- // transactionLogs(batch);
+
for (message msg : batch) {
LogDetails logDetails = msg.getLogDetails();
LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
@@ -727,7 +727,7 @@ public class EventsServiceImpl implements EventsService {
pms.clear();
metricsSet.publishTick(sizeNow);
count += sizeNow;
- // batchId++;
+
String endTime = sdf.format(new Date());
publishBatchCount = sizeNow;
LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
@@ -855,12 +855,12 @@ public class EventsServiceImpl implements EventsService {
return logDetails;
}
- /*
- * public String getMetricsTopic() { return metricsTopic; }
- *
- * public void setMetricsTopic(String metricsTopic) { this.metricsTopic =
- * metricsTopic; }
- */
+
+
+
+
+
+