diff options
author | Daniel Silverthorn <daniel.silverthorn@amdocs.com> | 2018-05-28 17:11:50 -0400 |
---|---|---|
committer | Daniel Silverthorn <daniel.silverthorn@amdocs.com> | 2018-05-28 17:13:30 -0400 |
commit | 1d3498968a4b1492f0d31fe9a4304b2f3407718e (patch) | |
tree | e8464d95c998005e049b5504668de4c7b392f042 | |
parent | 6891c2008b93e09f5a67e45b859696cd13466f8a (diff) |
Add logging into consumer
Change-Id: I0b5c045bac485523c4d1353d81a467c3c8ba918f
Issue-ID: AAI-1182
Signed-off-by: Daniel Silverthorn <daniel.silverthorn@amdocs.com>
-rw-r--r-- | src/main/java/org/onap/aai/event/EventBusConsumer.java | 27 |
1 files changed, 17 insertions, 10 deletions
diff --git a/src/main/java/org/onap/aai/event/EventBusConsumer.java b/src/main/java/org/onap/aai/event/EventBusConsumer.java index e795e2c..da8ffb5 100644 --- a/src/main/java/org/onap/aai/event/EventBusConsumer.java +++ b/src/main/java/org/onap/aai/event/EventBusConsumer.java @@ -31,6 +31,7 @@ import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.cl.mdc.MdcContext; +import java.util.Collections; import java.util.UUID; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -69,21 +70,27 @@ public class EventBusConsumer extends ScheduledPollConsumer { @Override protected int poll() throws Exception { - logger.debug("Checking for event on topic: " + endpoint.getEventTopic()); + String topic = endpoint.getEventTopic(); - int processCount = 0; + logger.debug("Checking for event on topic: " + topic); - //Iterable<String> messages = consumer.fetch(); - Iterable<String> messages = consumer.consumeAndCommit(); + int processCount = 0; - String topic = endpoint.getEventTopic(); + try { + Iterable<String> messages = consumer.consumeAndCommit(); - for (String message : messages) { - Exchange exchange = endpoint.createExchange(); - exchange.getIn().setBody(message); - getScheduledExecutorService().submit(new EventProcessor(exchange, topic)); - ++processCount; + for (String message : messages) { + Exchange exchange = endpoint.createExchange(); + exchange.getIn().setBody(message); + getScheduledExecutorService().submit(new EventProcessor(exchange, topic)); + ++processCount; + } + } catch (Exception e) { + logger.error(RouterCoreMsgs.EVENT_PROCESSING_EXCEPTION, e, e.getLocalizedMessage()); } + + logger.debug(RouterCoreMsgs.PROCESS_EVENT, topic, Integer.toString(processCount)); + return processCount; } @Override |