summaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java50
1 files changed, 10 insertions, 40 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index 5e8cf489..768046d0 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -21,17 +21,15 @@
package org.onap.policy.drools.event.comm.bus.internal;
import java.net.MalformedURLException;
-import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.onap.policy.drools.event.comm.FilterableTopicSource;
import org.onap.policy.drools.event.comm.TopicListener;
import org.onap.policy.drools.event.comm.bus.BusTopicSource;
import org.onap.policy.drools.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
import org.onap.policy.drools.utils.NetworkUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This topic source implementation specializes in reading messages
@@ -74,23 +72,10 @@ public abstract class SingleThreadedBusTopicSource
protected BusConsumer consumer;
/**
- * Am I running?
- * reflects invocation of start()/stop()
- * !locked & start() => alive
- * stop() => !alive
- */
- protected volatile boolean alive = false;
-
- /**
* Independent thread reading message over my topic
*/
protected Thread busPollerThread;
- /**
- * All my subscribers for new message notifications
- */
- protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
-
/**
*
@@ -115,8 +100,7 @@ public abstract class SingleThreadedBusTopicSource
int fetchTimeout,
int fetchLimit,
boolean useHttps,
- boolean allowSelfSignedCerts)
- throws IllegalArgumentException {
+ boolean allowSelfSignedCerts) {
super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
@@ -152,8 +136,7 @@ public abstract class SingleThreadedBusTopicSource
public abstract void init() throws MalformedURLException;
@Override
- public void register(TopicListener topicListener)
- throws IllegalArgumentException {
+ public void register(TopicListener topicListener) {
super.register(topicListener);
@@ -182,7 +165,7 @@ public abstract class SingleThreadedBusTopicSource
}
@Override
- public boolean start() throws IllegalStateException {
+ public boolean start() {
logger.info("{}: starting", this);
synchronized(this) {
@@ -299,23 +282,10 @@ public abstract class SingleThreadedBusTopicSource
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup)
- .append(", consumerInstance=").append(consumerInstance).append(", fetchTimeout=").append(fetchTimeout)
- .append(", fetchLimit=").append(fetchLimit)
- .append(", consumer=").append(this.consumer).append(", alive=")
- .append(alive).append(", locked=").append(locked).append(", uebThread=").append(busPollerThread)
- .append(", topicListeners=").append(topicListeners.size()).append(", toString()=").append(super.toString())
- .append("]");
- return builder.toString();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isAlive() {
- return alive;
+ return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
+ + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer="
+ + this.consumer + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread
+ + ", topicListeners=" + topicListeners.size() + ", toString()=" + super.toString() + "]";
}
/**
@@ -338,7 +308,7 @@ public abstract class SingleThreadedBusTopicSource
* {@inheritDoc}
*/
@Override
- public void shutdown() throws IllegalStateException {
+ public void shutdown() {
this.stop();
this.topicListeners.clear();
}