From 1d3498968a4b1492f0d31fe9a4304b2f3407718e Mon Sep 17 00:00:00 2001 From: Daniel Silverthorn Date: Mon, 28 May 2018 17:11:50 -0400 Subject: Add logging into consumer Change-Id: I0b5c045bac485523c4d1353d81a467c3c8ba918f Issue-ID: AAI-1182 Signed-off-by: Daniel Silverthorn --- .../java/org/onap/aai/event/EventBusConsumer.java | 27 ++++++++++++++-------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/onap/aai/event/EventBusConsumer.java b/src/main/java/org/onap/aai/event/EventBusConsumer.java index e795e2c..da8ffb5 100644 --- a/src/main/java/org/onap/aai/event/EventBusConsumer.java +++ b/src/main/java/org/onap/aai/event/EventBusConsumer.java @@ -31,6 +31,7 @@ import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.cl.mdc.MdcContext; +import java.util.Collections; import java.util.UUID; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -69,21 +70,27 @@ public class EventBusConsumer extends ScheduledPollConsumer { @Override protected int poll() throws Exception { - logger.debug("Checking for event on topic: " + endpoint.getEventTopic()); + String topic = endpoint.getEventTopic(); - int processCount = 0; + logger.debug("Checking for event on topic: " + topic); - //Iterable messages = consumer.fetch(); - Iterable messages = consumer.consumeAndCommit(); + int processCount = 0; - String topic = endpoint.getEventTopic(); + try { + Iterable messages = consumer.consumeAndCommit(); - for (String message : messages) { - Exchange exchange = endpoint.createExchange(); - exchange.getIn().setBody(message); - getScheduledExecutorService().submit(new EventProcessor(exchange, topic)); - ++processCount; + for (String message : messages) { + Exchange exchange = endpoint.createExchange(); + exchange.getIn().setBody(message); + getScheduledExecutorService().submit(new EventProcessor(exchange, topic)); + ++processCount; + } + } catch (Exception e) { + logger.error(RouterCoreMsgs.EVENT_PROCESSING_EXCEPTION, e, e.getLocalizedMessage()); } + + logger.debug(RouterCoreMsgs.PROCESS_EVENT, topic, Integer.toString(processCount)); + return processCount; } @Override -- cgit 1.2.3-korg