aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java')
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java93
1 files changed, 10 insertions, 83 deletions
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
index 64037749..3f1f3610 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
@@ -46,23 +46,6 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
protected String partitionId;
/**
- * Am I running?
- * reflects invocation of start()/stop()
- * !locked & start() => alive
- * stop() => !alive
- */
- protected volatile boolean alive = false;
-
- /**
- * Am I locked?
- * reflects invocation of lock()/unlock() operations
- * locked => !alive (but not in the other direction necessarily)
- * locked => !offer, !run, !start, !stop (but this last one is obvious
- * since locked => !alive)
- */
- protected volatile boolean locked = false;
-
- /**
* message bus publisher
*/
protected BusPublisher publisher;
@@ -99,8 +82,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
* {@inheritDoc}
*/
@Override
- public boolean start() throws IllegalStateException {
-
+ public boolean start() throws IllegalStateException {
logger.info("{}: starting", this);
synchronized(this) {
@@ -144,64 +126,6 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
}
return true;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean lock() {
-
- logger.info("{}: locking", this);
-
- synchronized (this) {
- if (this.locked)
- return true;
-
- this.locked = true;
- }
-
- return this.stop();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean unlock() {
-
- logger.info("{}: unlocking", this);
-
- synchronized(this) {
- if (!this.locked)
- return true;
-
- this.locked = false;
- }
-
- try {
- return this.start();
- } catch (Exception e) {
- logger.warn("{}: cannot start after unlocking because of {}",
- this, e.getMessage(), e);
- return false;
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isLocked() {
- return this.locked;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isAlive() {
- return this.alive;
}
/**
@@ -226,7 +150,8 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(),
this.topic, System.lineSeparator(), message);
- publisher.send(this.partitionId, message);
+ publisher.send(this.partitionId, message);
+ broadcast(message);
} catch (Exception e) {
logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
return false;
@@ -260,10 +185,12 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
this.stop();
}
- /**
- * {@inheritDoc}
- */
- @Override
- public abstract CommInfrastructure getTopicCommInfrastructure();
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("InlineBusTopicSink [partitionId=").append(partitionId).append(", alive=").append(alive)
+ .append(", publisher=").append(publisher).append("]");
+ return builder.toString();
+ }
}