diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java | 31 |
1 files changed, 18 insertions, 13 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java index 98e30e27..0953465b 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -21,6 +21,7 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import java.io.IOException; import java.net.MalformedURLException; import java.util.UUID; @@ -223,19 +224,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase public void run() { while (this.alive) { try { - for (String event : this.consumer.fetch()) { - synchronized (this) { - this.recentEvents.add(event); - } - - NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event); - - broadcast(event); - - if (!this.alive) { - break; - } - } + fetchAllMessages(); } catch (Exception e) { logger.error("{}: cannot fetch because of ", this, e.getMessage(), e); } @@ -244,6 +233,22 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase logger.info("{}: exiting thread", this); } + private void fetchAllMessages() throws InterruptedException, IOException { + for (String event : this.consumer.fetch()) { + synchronized (this) { + this.recentEvents.add(event); + } + + NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event); + + broadcast(event); + + if (!this.alive) { + return; + } + } + } + @Override public boolean offer(String event) { if (!this.alive) { |