aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java')
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java134
1 files changed, 6 insertions, 128 deletions
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index 85da3f01..0fa90b50 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -79,15 +79,6 @@ public abstract class SingleThreadedBusTopicSource
protected volatile boolean alive = false;
/**
- * Am I locked?
- * reflects invocation of lock()/unlock() operations
- * locked => !alive (but not in the other direction necessarily)
- * locked => !offer, !run, !start, !stop (but this last one is obvious
- * since locked => !alive)
- */
- protected volatile boolean locked = false;
-
- /**
* Independent thread reading message over my topic
*/
protected Thread busPollerThread;
@@ -157,52 +148,28 @@ public abstract class SingleThreadedBusTopicSource
*/
public abstract void init() throws Exception;
- /**
- * {@inheritDoc}
- */
@Override
public void register(TopicListener topicListener)
throws IllegalArgumentException {
- logger.info("{}: registering {}", this, topicListener);
-
- synchronized(this) {
- if (topicListener == null)
- throw new IllegalArgumentException("TopicListener must be provided");
-
- /* check that this listener is not registered already */
- for (TopicListener listener: this.topicListeners) {
- if (listener == topicListener) {
- // already registered
- return;
- }
- }
-
- this.topicListeners.add(topicListener);
- }
+ super.register(topicListener);
try {
- this.start();
+ if (!alive && !locked)
+ this.start();
+ else
+ logger.info("{}: register: start not attempted", this);
} catch (Exception e) {
logger.warn("{}: cannot start after registration of because of: {}",
this, topicListener, e.getMessage(), e);
}
}
- /**
- * {@inheritDoc}
- */
@Override
public void unregister(TopicListener topicListener) {
-
- logger.info("{}: unregistering {}", this, topicListener);
-
boolean stop = false;
synchronized (this) {
- if (topicListener == null)
- throw new IllegalArgumentException("TopicListener must be provided");
-
- this.topicListeners.remove(topicListener);
+ super.unregister(topicListener);
stop = (this.topicListeners.isEmpty());
}
@@ -211,49 +178,6 @@ public abstract class SingleThreadedBusTopicSource
}
}
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean lock() {
-
- logger.info("{}: locking", this);
-
- synchronized (this) {
- if (this.locked)
- return true;
-
- this.locked = true;
- }
-
- return this.stop();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean unlock() {
- logger.info("{}: unlocking", this);
-
- synchronized(this) {
- if (!this.locked)
- return true;
-
- this.locked = false;
- }
-
- try {
- return this.start();
- } catch (Exception e) {
- logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
- return false;
- }
- }
-
- /**
- * {@inheritDoc}
- */
@Override
public boolean start() throws IllegalStateException {
logger.info("{}: starting", this);
@@ -286,9 +210,6 @@ public abstract class SingleThreadedBusTopicSource
return this.alive;
}
- /**
- * {@inheritDoc}
- */
@Override
public boolean stop() {
logger.info("{}: stopping", this);
@@ -312,49 +233,6 @@ public abstract class SingleThreadedBusTopicSource
return true;
}
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isLocked() {
- return this.locked;
- }
-
- /**
- * broadcast event to all listeners
- *
- * @param message the event
- * @return true if all notifications are performed with no error, false otherwise
- */
- protected boolean broadcast(String message) {
-
- /* take a snapshot of listeners */
- List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
-
- boolean success = true;
- for (TopicListener topicListener: snapshotListeners) {
- try {
- topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
- } catch (Exception e) {
- logger.warn("{}: notification error @ {} because of {}",
- this, topicListener, e.getMessage(), e);
- success = false;
- }
- }
- return success;
- }
-
- /**
- * take a snapshot of current topic listeners
- *
- * @return the topic listeners
- */
- protected synchronized List<TopicListener> snapshotTopicListeners() {
- @SuppressWarnings("unchecked")
- List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
- return listeners;
- }
/**
* Run thread method for the Bus Reader