aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java31
1 files changed, 18 insertions, 13 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index 98e30e27..0953465b 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -21,6 +21,7 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import java.io.IOException;
import java.net.MalformedURLException;
import java.util.UUID;
@@ -223,19 +224,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
public void run() {
while (this.alive) {
try {
- for (String event : this.consumer.fetch()) {
- synchronized (this) {
- this.recentEvents.add(event);
- }
-
- NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
-
- broadcast(event);
-
- if (!this.alive) {
- break;
- }
- }
+ fetchAllMessages();
} catch (Exception e) {
logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
}
@@ -244,6 +233,22 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
logger.info("{}: exiting thread", this);
}
+ private void fetchAllMessages() throws InterruptedException, IOException {
+ for (String event : this.consumer.fetch()) {
+ synchronized (this) {
+ this.recentEvents.add(event);
+ }
+
+ NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
+
+ broadcast(event);
+
+ if (!this.alive) {
+ return;
+ }
+ }
+ }
+
@Override
public boolean offer(String event) {
if (!this.alive) {