diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java | 134 |
1 files changed, 6 insertions, 128 deletions
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java index 85da3f01..0fa90b50 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -79,15 +79,6 @@ public abstract class SingleThreadedBusTopicSource protected volatile boolean alive = false; /** - * Am I locked? - * reflects invocation of lock()/unlock() operations - * locked => !alive (but not in the other direction necessarily) - * locked => !offer, !run, !start, !stop (but this last one is obvious - * since locked => !alive) - */ - protected volatile boolean locked = false; - - /** * Independent thread reading message over my topic */ protected Thread busPollerThread; @@ -157,52 +148,28 @@ public abstract class SingleThreadedBusTopicSource */ public abstract void init() throws Exception; - /** - * {@inheritDoc} - */ @Override public void register(TopicListener topicListener) throws IllegalArgumentException { - logger.info("{}: registering {}", this, topicListener); - - synchronized(this) { - if (topicListener == null) - throw new IllegalArgumentException("TopicListener must be provided"); - - /* check that this listener is not registered already */ - for (TopicListener listener: this.topicListeners) { - if (listener == topicListener) { - // already registered - return; - } - } - - this.topicListeners.add(topicListener); - } + super.register(topicListener); try { - this.start(); + if (!alive && !locked) + this.start(); + else + logger.info("{}: register: start not attempted", this); } catch (Exception e) { logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e.getMessage(), e); } } - /** - * {@inheritDoc} - */ @Override public void unregister(TopicListener topicListener) { - - logger.info("{}: unregistering {}", this, topicListener); - boolean stop = false; synchronized (this) { - if (topicListener == null) - throw new IllegalArgumentException("TopicListener must be provided"); - - this.topicListeners.remove(topicListener); + super.unregister(topicListener); stop = (this.topicListeners.isEmpty()); } @@ -211,49 +178,6 @@ public abstract class SingleThreadedBusTopicSource } } - /** - * {@inheritDoc} - */ - @Override - public boolean lock() { - - logger.info("{}: locking", this); - - synchronized (this) { - if (this.locked) - return true; - - this.locked = true; - } - - return this.stop(); - } - - /** - * {@inheritDoc} - */ - @Override - public boolean unlock() { - logger.info("{}: unlocking", this); - - synchronized(this) { - if (!this.locked) - return true; - - this.locked = false; - } - - try { - return this.start(); - } catch (Exception e) { - logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e); - return false; - } - } - - /** - * {@inheritDoc} - */ @Override public boolean start() throws IllegalStateException { logger.info("{}: starting", this); @@ -286,9 +210,6 @@ public abstract class SingleThreadedBusTopicSource return this.alive; } - /** - * {@inheritDoc} - */ @Override public boolean stop() { logger.info("{}: stopping", this); @@ -312,49 +233,6 @@ public abstract class SingleThreadedBusTopicSource return true; } - - /** - * {@inheritDoc} - */ - @Override - public boolean isLocked() { - return this.locked; - } - - /** - * broadcast event to all listeners - * - * @param message the event - * @return true if all notifications are performed with no error, false otherwise - */ - protected boolean broadcast(String message) { - - /* take a snapshot of listeners */ - List<TopicListener> snapshotListeners = this.snapshotTopicListeners(); - - boolean success = true; - for (TopicListener topicListener: snapshotListeners) { - try { - topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message); - } catch (Exception e) { - logger.warn("{}: notification error @ {} because of {}", - this, topicListener, e.getMessage(), e); - success = false; - } - } - return success; - } - - /** - * take a snapshot of current topic listeners - * - * @return the topic listeners - */ - protected synchronized List<TopicListener> snapshotTopicListeners() { - @SuppressWarnings("unchecked") - List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone(); - return listeners; - } /** * Run thread method for the Bus Reader |