diff options
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java')
-rw-r--r-- | feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java | 127 |
1 files changed, 73 insertions, 54 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java index 1e3a907e..421b9225 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java @@ -26,7 +26,7 @@ import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledFuture; +import org.onap.policy.drools.pooling.CancellableScheduledTask; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.PoolingProperties; import org.onap.policy.drools.pooling.message.BucketAssignments; @@ -37,16 +37,19 @@ import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.message.Offline; import org.onap.policy.drools.pooling.message.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A state in the finite state machine. * <p> - * A state may have several timers associated with it, which must be cancelled - * whenever the state is changed. Assumes that timers are not continuously added - * to the same state. + * A state may have several timers associated with it, which must be cancelled whenever + * the state is changed. Assumes that timers are not continuously added to the same state. */ public abstract class State { + private static final Logger logger = LoggerFactory.getLogger(State.class); + /** * Host pool manager. */ @@ -55,7 +58,7 @@ public abstract class State { /** * Timers added by this state. */ - private final List<ScheduledFuture<?>> timers = new LinkedList<>(); + private final List<CancellableScheduledTask> timers = new LinkedList<>(); /** * @@ -66,9 +69,9 @@ public abstract class State { } /** - * Gets the server-side filter to use when polling the DMaaP internal topic. - * The default method returns a filter that accepts messages on the admin - * channel and on the host's own channel. + * Gets the server-side filter to use when polling the DMaaP internal topic. The + * default method returns a filter that accepts messages on the admin channel and on + * the host's own channel. * * @return the server-side filter to use. */ @@ -80,25 +83,15 @@ public abstract class State { /** * Cancels the timers added by this state. */ - public void cancelTimers() { - for (ScheduledFuture<?> fut : timers) { - fut.cancel(false); - } + public final void cancelTimers() { + timers.forEach(timer -> timer.cancel()); } /** - * Starts the state. + * Starts the state. The default method simply logs a message and returns. */ public void start() { - - } - - /** - * Indicates that the finite state machine is stopping. Sends an "offline" - * message to the other hosts. - */ - public void stop() { - publish(makeOffline()); + logger.info("entered {} for topic {}", getClass().getSimpleName(), getTopic()); } /** @@ -106,7 +99,7 @@ public abstract class State { * * @return the new state */ - public State goStart() { + public final State goStart() { return mgr.goStart(); } @@ -124,7 +117,7 @@ public abstract class State { * * @return the new state */ - public State goActive() { + public final State goActive() { return mgr.goActive(); } @@ -138,13 +131,14 @@ public abstract class State { } /** - * Processes a message. The default method passes it to the manager to - * handle and returns {@code null}. + * Processes a message. The default method passes it to the manager to handle and + * returns {@code null}. * * @param msg message to be processed * @return the new state, or {@code null} if the state is unchanged */ public State process(Forward msg) { + logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic()); mgr.handle(msg); return null; } @@ -156,6 +150,7 @@ public abstract class State { * @return the new state, or {@code null} if the state is unchanged */ public State process(Heartbeat msg) { + logger.info("ignored heartbeat message from {} on topic {}", msg.getSource(), getTopic()); return null; } @@ -166,41 +161,54 @@ public abstract class State { * @return the new state, or {@code null} if the state is unchanged */ public State process(Identification msg) { + logger.info("ignored ident message from {} on topic {}", msg.getSource(), getTopic()); return null; } /** - * Processes a message. If this host has a new assignment, then it - * transitions to the active state. Otherwise, it transitions to the - * inactive state. + * Processes a message. The default method copies the assignments and then returns + * {@code null}. * * @param msg message to be processed * @return the new state, or {@code null} if the state is unchanged */ public State process(Leader msg) { + if (isValid(msg)) { + logger.info("extract assignments from Leader message from {} on topic {}", msg.getSource(), getTopic()); + startDistributing(msg.getAssignments()); + } + + return null; + } + + /** + * Determines if a message is valid and did not originate from this host. + * + * @param msg message to be validated + * @return {@code true} if the message is valid, {@code false} otherwise + */ + protected boolean isValid(Leader msg) { BucketAssignments asgn = msg.getAssignments(); if (asgn == null) { - return null; + logger.warn("Leader message from {} has no assignments for topic {}", msg.getSource(), getTopic()); + return false; } + // ignore Leader messages from ourself String source = msg.getSource(); - if (source == null) { - return null; + if (source == null || source.equals(getHost())) { + logger.debug("ignore Leader message from {} for topic {}", msg.getSource(), getTopic()); + return false; } // the new leader must equal the source - if (source.equals(asgn.getLeader())) { - startDistributing(asgn); - - if (asgn.hasAssignment(getHost())) { - return goActive(); + boolean result = source.equals(asgn.getLeader()); - } else { - return goInactive(); - } + if (!result) { + logger.warn("Leader message from {} has an invalid assignment for topic {}", msg.getSource(), getTopic()); } - return null; + return result; } /** @@ -210,6 +218,7 @@ public abstract class State { * @return the new state, or {@code null} if the state is unchanged */ public State process(Offline msg) { + logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic()); return null; } @@ -220,6 +229,7 @@ public abstract class State { * @return the new state, or {@code null} if the state is unchanged */ public State process(Query msg) { + logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic()); return null; } @@ -228,7 +238,7 @@ public abstract class State { * * @param msg message to be published */ - protected void publish(Identification msg) { + protected final void publish(Identification msg) { mgr.publishAdmin(msg); } @@ -237,7 +247,7 @@ public abstract class State { * * @param msg message to be published */ - protected void publish(Leader msg) { + protected final void publish(Leader msg) { mgr.publishAdmin(msg); } @@ -246,7 +256,7 @@ public abstract class State { * * @param msg message to be published */ - protected void publish(Offline msg) { + protected final void publish(Offline msg) { mgr.publishAdmin(msg); } @@ -255,7 +265,7 @@ public abstract class State { * * @param msg message to be published */ - protected void publish(Query msg) { + protected final void publish(Query msg) { mgr.publishAdmin(msg); } @@ -265,7 +275,7 @@ public abstract class State { * @param channel * @param msg message to be published */ - protected void publish(String channel, Forward msg) { + protected final void publish(String channel, Forward msg) { mgr.publish(channel, msg); } @@ -275,7 +285,7 @@ public abstract class State { * @param channel * @param msg message to be published */ - protected void publish(String channel, Heartbeat msg) { + protected final void publish(String channel, Heartbeat msg) { mgr.publish(channel, msg); } @@ -284,7 +294,7 @@ public abstract class State { * * @param assignments */ - protected void startDistributing(BucketAssignments assignments) { + protected final void startDistributing(BucketAssignments assignments) { if (assignments != null) { mgr.startDistributing(assignments); } @@ -296,7 +306,7 @@ public abstract class State { * @param delayMs * @param task */ - protected void schedule(long delayMs, StateTimerTask task) { + protected final void schedule(long delayMs, StateTimerTask task) { timers.add(mgr.schedule(delayMs, task)); } @@ -307,7 +317,7 @@ public abstract class State { * @param delayMs * @param task */ - protected void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) { + protected final void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) { timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task)); } @@ -316,7 +326,7 @@ public abstract class State { * * @return a new {@link InactiveState} */ - protected State internalTopicFailed() { + protected final State internalTopicFailed() { publish(makeOffline()); mgr.internalTopicFailed(); @@ -330,16 +340,25 @@ public abstract class State { * * @return a new message */ - protected Heartbeat makeHeartbeat(long timestampMs) { + protected final Heartbeat makeHeartbeat(long timestampMs) { return new Heartbeat(getHost(), timestampMs); } /** + * Makes an Identification message. + * + * @return a new message + */ + protected Identification makeIdentification() { + return new Identification(getHost(), getAssignments()); + } + + /** * Makes an "offline" message. * * @return a new message */ - protected Offline makeOffline() { + protected final Offline makeOffline() { return new Offline(getHost()); } @@ -348,7 +367,7 @@ public abstract class State { * * @return a new message */ - protected Query makeQuery() { + protected final Query makeQuery() { return new Query(getHost()); } |