diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java | 50 |
1 files changed, 10 insertions, 40 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java index 5e8cf489..768046d0 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -21,17 +21,15 @@ package org.onap.policy.drools.event.comm.bus.internal; import java.net.MalformedURLException; -import java.util.ArrayList; import java.util.List; import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.onap.policy.drools.event.comm.FilterableTopicSource; import org.onap.policy.drools.event.comm.TopicListener; import org.onap.policy.drools.event.comm.bus.BusTopicSource; import org.onap.policy.drools.event.comm.bus.internal.BusConsumer.FilterableBusConsumer; import org.onap.policy.drools.utils.NetworkUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This topic source implementation specializes in reading messages @@ -74,23 +72,10 @@ public abstract class SingleThreadedBusTopicSource protected BusConsumer consumer; /** - * Am I running? - * reflects invocation of start()/stop() - * !locked & start() => alive - * stop() => !alive - */ - protected volatile boolean alive = false; - - /** * Independent thread reading message over my topic */ protected Thread busPollerThread; - /** - * All my subscribers for new message notifications - */ - protected final ArrayList<TopicListener> topicListeners = new ArrayList<>(); - /** * @@ -115,8 +100,7 @@ public abstract class SingleThreadedBusTopicSource int fetchTimeout, int fetchLimit, boolean useHttps, - boolean allowSelfSignedCerts) - throws IllegalArgumentException { + boolean allowSelfSignedCerts) { super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts); @@ -152,8 +136,7 @@ public abstract class SingleThreadedBusTopicSource public abstract void init() throws MalformedURLException; @Override - public void register(TopicListener topicListener) - throws IllegalArgumentException { + public void register(TopicListener topicListener) { super.register(topicListener); @@ -182,7 +165,7 @@ public abstract class SingleThreadedBusTopicSource } @Override - public boolean start() throws IllegalStateException { + public boolean start() { logger.info("{}: starting", this); synchronized(this) { @@ -299,23 +282,10 @@ public abstract class SingleThreadedBusTopicSource @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup) - .append(", consumerInstance=").append(consumerInstance).append(", fetchTimeout=").append(fetchTimeout) - .append(", fetchLimit=").append(fetchLimit) - .append(", consumer=").append(this.consumer).append(", alive=") - .append(alive).append(", locked=").append(locked).append(", uebThread=").append(busPollerThread) - .append(", topicListeners=").append(topicListeners.size()).append(", toString()=").append(super.toString()) - .append("]"); - return builder.toString(); - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isAlive() { - return alive; + return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance + + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + + this.consumer + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + + ", topicListeners=" + topicListeners.size() + ", toString()=" + super.toString() + "]"; } /** @@ -338,7 +308,7 @@ public abstract class SingleThreadedBusTopicSource * {@inheritDoc} */ @Override - public void shutdown() throws IllegalStateException { + public void shutdown() { this.stop(); this.topicListeners.clear(); } |