aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniel Silverthorn <daniel.silverthorn@amdocs.com>2018-05-28 17:11:50 -0400
committerDaniel Silverthorn <daniel.silverthorn@amdocs.com>2018-05-28 17:13:30 -0400
commit1d3498968a4b1492f0d31fe9a4304b2f3407718e (patch)
treee8464d95c998005e049b5504668de4c7b392f042
parent6891c2008b93e09f5a67e45b859696cd13466f8a (diff)
Add logging into consumer
Change-Id: I0b5c045bac485523c4d1353d81a467c3c8ba918f Issue-ID: AAI-1182 Signed-off-by: Daniel Silverthorn <daniel.silverthorn@amdocs.com>
-rw-r--r--src/main/java/org/onap/aai/event/EventBusConsumer.java27
1 files changed, 17 insertions, 10 deletions
diff --git a/src/main/java/org/onap/aai/event/EventBusConsumer.java b/src/main/java/org/onap/aai/event/EventBusConsumer.java
index e795e2c..da8ffb5 100644
--- a/src/main/java/org/onap/aai/event/EventBusConsumer.java
+++ b/src/main/java/org/onap/aai/event/EventBusConsumer.java
@@ -31,6 +31,7 @@ import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.cl.mdc.MdcContext;
+import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -69,21 +70,27 @@ public class EventBusConsumer extends ScheduledPollConsumer {
@Override
protected int poll() throws Exception {
- logger.debug("Checking for event on topic: " + endpoint.getEventTopic());
+ String topic = endpoint.getEventTopic();
- int processCount = 0;
+ logger.debug("Checking for event on topic: " + topic);
- //Iterable<String> messages = consumer.fetch();
- Iterable<String> messages = consumer.consumeAndCommit();
+ int processCount = 0;
- String topic = endpoint.getEventTopic();
+ try {
+ Iterable<String> messages = consumer.consumeAndCommit();
- for (String message : messages) {
- Exchange exchange = endpoint.createExchange();
- exchange.getIn().setBody(message);
- getScheduledExecutorService().submit(new EventProcessor(exchange, topic));
- ++processCount;
+ for (String message : messages) {
+ Exchange exchange = endpoint.createExchange();
+ exchange.getIn().setBody(message);
+ getScheduledExecutorService().submit(new EventProcessor(exchange, topic));
+ ++processCount;
+ }
+ } catch (Exception e) {
+ logger.error(RouterCoreMsgs.EVENT_PROCESSING_EXCEPTION, e, e.getLocalizedMessage());
}
+
+ logger.debug(RouterCoreMsgs.PROCESS_EVENT, topic, Integer.toString(processCount));
+
return processCount;
}
@Override