summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/org/onap/aai/event/EventBusConsumer.java27
1 files changed, 17 insertions, 10 deletions
diff --git a/src/main/java/org/onap/aai/event/EventBusConsumer.java b/src/main/java/org/onap/aai/event/EventBusConsumer.java
index e795e2c..da8ffb5 100644
--- a/src/main/java/org/onap/aai/event/EventBusConsumer.java
+++ b/src/main/java/org/onap/aai/event/EventBusConsumer.java
@@ -31,6 +31,7 @@ import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.cl.mdc.MdcContext;
+import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -69,21 +70,27 @@ public class EventBusConsumer extends ScheduledPollConsumer {
@Override
protected int poll() throws Exception {
- logger.debug("Checking for event on topic: " + endpoint.getEventTopic());
+ String topic = endpoint.getEventTopic();
- int processCount = 0;
+ logger.debug("Checking for event on topic: " + topic);
- //Iterable<String> messages = consumer.fetch();
- Iterable<String> messages = consumer.consumeAndCommit();
+ int processCount = 0;
- String topic = endpoint.getEventTopic();
+ try {
+ Iterable<String> messages = consumer.consumeAndCommit();
- for (String message : messages) {
- Exchange exchange = endpoint.createExchange();
- exchange.getIn().setBody(message);
- getScheduledExecutorService().submit(new EventProcessor(exchange, topic));
- ++processCount;
+ for (String message : messages) {
+ Exchange exchange = endpoint.createExchange();
+ exchange.getIn().setBody(message);
+ getScheduledExecutorService().submit(new EventProcessor(exchange, topic));
+ ++processCount;
+ }
+ } catch (Exception e) {
+ logger.error(RouterCoreMsgs.EVENT_PROCESSING_EXCEPTION, e, e.getLocalizedMessage());
}
+
+ logger.debug(RouterCoreMsgs.PROCESS_EVENT, topic, Integer.toString(processCount));
+
return processCount;
}
@Override