diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/main/java/org/onap/aai/event/EventBusConsumer.java | 27 |
1 files changed, 17 insertions, 10 deletions
diff --git a/src/main/java/org/onap/aai/event/EventBusConsumer.java b/src/main/java/org/onap/aai/event/EventBusConsumer.java index e795e2c..da8ffb5 100644 --- a/src/main/java/org/onap/aai/event/EventBusConsumer.java +++ b/src/main/java/org/onap/aai/event/EventBusConsumer.java @@ -31,6 +31,7 @@ import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.cl.mdc.MdcContext; +import java.util.Collections; import java.util.UUID; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -69,21 +70,27 @@ public class EventBusConsumer extends ScheduledPollConsumer { @Override protected int poll() throws Exception { - logger.debug("Checking for event on topic: " + endpoint.getEventTopic()); + String topic = endpoint.getEventTopic(); - int processCount = 0; + logger.debug("Checking for event on topic: " + topic); - //Iterable<String> messages = consumer.fetch(); - Iterable<String> messages = consumer.consumeAndCommit(); + int processCount = 0; - String topic = endpoint.getEventTopic(); + try { + Iterable<String> messages = consumer.consumeAndCommit(); - for (String message : messages) { - Exchange exchange = endpoint.createExchange(); - exchange.getIn().setBody(message); - getScheduledExecutorService().submit(new EventProcessor(exchange, topic)); - ++processCount; + for (String message : messages) { + Exchange exchange = endpoint.createExchange(); + exchange.getIn().setBody(message); + getScheduledExecutorService().submit(new EventProcessor(exchange, topic)); + ++processCount; + } + } catch (Exception e) { + logger.error(RouterCoreMsgs.EVENT_PROCESSING_EXCEPTION, e, e.getLocalizedMessage()); } + + logger.debug(RouterCoreMsgs.PROCESS_EVENT, topic, Integer.toString(processCount)); + return processCount; } @Override |