summaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java22
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java6
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java11
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java24
4 files changed, 32 insertions, 31 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
index 93374fb..b3f3c8f 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
@@ -77,7 +77,7 @@ public class Kafka011Consumer implements Consumer {
fId = id;
fCreateTimeMs = System.currentTimeMillis();
fLastTouch = fCreateTimeMs;
- fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String, String>>();
+ fPendingMsgs = new LinkedBlockingQueue<>();
fLogTag = fGroup + "(" + fId + ")/" + fTopic;
offset = 0;
state = Kafka011Consumer.State.OPENED;
@@ -113,12 +113,13 @@ public class Kafka011Consumer implements Consumer {
public synchronized Consumer.Message nextMessage() {
try {
- if (fPendingMsgs.size() > 0) {
+ if (fPendingMsgs.isEmpty()) {
return makeMessage(fPendingMsgs.take());
}
} catch (InterruptedException x) {
log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
x);
+ Thread.currentThread().interrupt();
}
Callable<Boolean> run = new Callable<Boolean>() {
@@ -135,11 +136,10 @@ public class Kafka011Consumer implements Consumer {
}
} catch (KafkaException x) {
- log.debug(fLogTag + ": KafkaException " + x.getMessage());
+ log.debug(fLogTag + ": KafkaException ", x);
} catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
- log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. "
- + x.getMessage());
+ log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. ", x);
}
@@ -156,25 +156,26 @@ public class Kafka011Consumer implements Consumer {
future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
// second
} catch (TimeoutException ex) {
+ log.error("TimeoutException in in Kafka consumer ", ex);
// timed out. Try to stop the code if possible.
String apiNodeId = null;
try {
apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
} catch (UnknownHostException e1) {
- // TODO Auto-generated catch block
- log.error("unable to get the localhost address");
+ log.error("unable to get the localhost address ", e1);
}
try {
if (fKafkaLiveLockAvoider != null)
fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
} catch (Exception e) {
- log.error("unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup);
+ log.error("Exception in unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup, e);
}
forcePollOnConsumer();
future.cancel(true);
} catch (Exception ex) {
+ log.error("Exception in in Kafka consumer ", ex);
// timed out. Try to stop the code if possible.
future.cancel(true);
}
@@ -307,13 +308,12 @@ public class Kafka011Consumer implements Consumer {
// second
} catch (TimeoutException ex) {
// timed out. Try to stop the code if possible.
- log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task");
+ log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
future.cancel(true);
setState(Kafka011Consumer.State.OPENED);
} catch (Exception ex) {
// timed out. Try to stop the code if possible.
- log.error("Exception occured Occured - Kafka connection closure with in 300 seconds by a Executors task"
- + ex);
+ log.error("Exception Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
future.cancel(true);
setState(Kafka011Consumer.State.OPENED);
return false;
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java
index a93ac33..9f8f26d 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java
@@ -72,7 +72,7 @@ public class Kafka011ConsumerUtil {
}
} catch (Exception e) {
- log.error("Failed and go to Exception block for " + fGroup + " " + e.getMessage());
+ log.error("Failed and go to Exception block for " + fGroup +" ", e);
}
}
});
@@ -108,9 +108,9 @@ public class Kafka011ConsumerUtil {
}
} catch (java.util.ConcurrentModificationException e) {
- log.error("Error occurs for " + e);
+ log.error("Error occurs for ", e);
} catch (Exception e) {
- log.error("Failed and go to Exception block for " + group + " " + e.getMessage());
+ log.error("Failed and go to Exception block for " + group + " ", e);
}
}
});
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java
index a38d77b..04d1d9e 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java
@@ -320,9 +320,9 @@ public class KafkaConsumerCache {
curatorConsumerCache.close();
log.info("Curator client closed");
} catch (ZkInterruptedException e) {
- log.warn("Curator client close interrupted: " + e.getMessage());
+ log.warn("Curator client close interrupted: ", e);
} catch (IOException e) {
- log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache" + e.getMessage());
+ log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache ", e);
}
curatorConsumerCache = null;
@@ -497,10 +497,10 @@ public class KafkaConsumerCache {
log.info(" ^ deleted " + fBaseZkPath + "/" + key);
} catch (NoNodeException e) {
log.warn("A consumer was deleted from " + fApiId
- + "'s cache, but no Cambria API node had ownership of it in ZooKeeper");
+ + "'s cache, but no Cambria API node had ownership of it in ZooKeeper ", e);
} catch (Exception e) {
- log.debug("Unexpected exception while deleting consumer: " + e.getMessage());
- log.info(" %%%%%%@# Unexpected exception while deleting consumer: " + e.getMessage());
+ log.debug("Unexpected exception while deleting consumer: ", e);
+ log.info(" %%%%%%@# Unexpected exception while deleting consumer: ", e);
}
try {
@@ -648,6 +648,7 @@ public class KafkaConsumerCache {
try {
curator.setData().forPath(consumerPath, fApiId.getBytes());
} catch (KeeperException.NoNodeException e) {
+ log.info("KeeperException.NoNodeException occured", e);
curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes());
}
log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey);
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java
index 387b667..2f436ec 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java
@@ -637,7 +637,7 @@ public class EventsServiceImpl implements EventsService {
// final KeyedMessage<String, String> data = new
// KeyedMessage<String, String>(topic, m.getKey(),
- // kms.add(data);
+
final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
m.getMessage());
@@ -650,7 +650,7 @@ public class EventsServiceImpl implements EventsService {
+ batchId + "]");
try {
// ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
- // kms);
+
ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
// transactionLogs(batch);
for (message msg : batch) {
@@ -681,7 +681,7 @@ public class EventsServiceImpl implements EventsService {
metricsSet.publishTick(sizeNow);
publishBatchCount = sizeNow;
count += sizeNow;
- // batchId++;
+
String endTime = sdf.format(new Date());
LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
+ batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
@@ -698,9 +698,9 @@ public class EventsServiceImpl implements EventsService {
+ batchId + "]");
try {
// ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
- // kms);
+
ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
- // transactionLogs(batch);
+
for (message msg : batch) {
LogDetails logDetails = msg.getLogDetails();
LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
@@ -727,7 +727,7 @@ public class EventsServiceImpl implements EventsService {
pms.clear();
metricsSet.publishTick(sizeNow);
count += sizeNow;
- // batchId++;
+
String endTime = sdf.format(new Date());
publishBatchCount = sizeNow;
LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
@@ -855,12 +855,12 @@ public class EventsServiceImpl implements EventsService {
return logDetails;
}
- /*
- * public String getMetricsTopic() { return metricsTopic; }
- *
- * public void setMetricsTopic(String metricsTopic) { this.metricsTopic =
- * metricsTopic; }
- */
+
+
+
+
+
+