aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java127
1 files changed, 73 insertions, 54 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
index 1e3a907e..421b9225 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -26,7 +26,7 @@ import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ScheduledFuture;
+import org.onap.policy.drools.pooling.CancellableScheduledTask;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.PoolingProperties;
import org.onap.policy.drools.pooling.message.BucketAssignments;
@@ -37,16 +37,19 @@ import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A state in the finite state machine.
* <p>
- * A state may have several timers associated with it, which must be cancelled
- * whenever the state is changed. Assumes that timers are not continuously added
- * to the same state.
+ * A state may have several timers associated with it, which must be cancelled whenever
+ * the state is changed. Assumes that timers are not continuously added to the same state.
*/
public abstract class State {
+ private static final Logger logger = LoggerFactory.getLogger(State.class);
+
/**
* Host pool manager.
*/
@@ -55,7 +58,7 @@ public abstract class State {
/**
* Timers added by this state.
*/
- private final List<ScheduledFuture<?>> timers = new LinkedList<>();
+ private final List<CancellableScheduledTask> timers = new LinkedList<>();
/**
*
@@ -66,9 +69,9 @@ public abstract class State {
}
/**
- * Gets the server-side filter to use when polling the DMaaP internal topic.
- * The default method returns a filter that accepts messages on the admin
- * channel and on the host's own channel.
+ * Gets the server-side filter to use when polling the DMaaP internal topic. The
+ * default method returns a filter that accepts messages on the admin channel and on
+ * the host's own channel.
*
* @return the server-side filter to use.
*/
@@ -80,25 +83,15 @@ public abstract class State {
/**
* Cancels the timers added by this state.
*/
- public void cancelTimers() {
- for (ScheduledFuture<?> fut : timers) {
- fut.cancel(false);
- }
+ public final void cancelTimers() {
+ timers.forEach(timer -> timer.cancel());
}
/**
- * Starts the state.
+ * Starts the state. The default method simply logs a message and returns.
*/
public void start() {
-
- }
-
- /**
- * Indicates that the finite state machine is stopping. Sends an "offline"
- * message to the other hosts.
- */
- public void stop() {
- publish(makeOffline());
+ logger.info("entered {} for topic {}", getClass().getSimpleName(), getTopic());
}
/**
@@ -106,7 +99,7 @@ public abstract class State {
*
* @return the new state
*/
- public State goStart() {
+ public final State goStart() {
return mgr.goStart();
}
@@ -124,7 +117,7 @@ public abstract class State {
*
* @return the new state
*/
- public State goActive() {
+ public final State goActive() {
return mgr.goActive();
}
@@ -138,13 +131,14 @@ public abstract class State {
}
/**
- * Processes a message. The default method passes it to the manager to
- * handle and returns {@code null}.
+ * Processes a message. The default method passes it to the manager to handle and
+ * returns {@code null}.
*
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Forward msg) {
+ logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic());
mgr.handle(msg);
return null;
}
@@ -156,6 +150,7 @@ public abstract class State {
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Heartbeat msg) {
+ logger.info("ignored heartbeat message from {} on topic {}", msg.getSource(), getTopic());
return null;
}
@@ -166,41 +161,54 @@ public abstract class State {
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Identification msg) {
+ logger.info("ignored ident message from {} on topic {}", msg.getSource(), getTopic());
return null;
}
/**
- * Processes a message. If this host has a new assignment, then it
- * transitions to the active state. Otherwise, it transitions to the
- * inactive state.
+ * Processes a message. The default method copies the assignments and then returns
+ * {@code null}.
*
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Leader msg) {
+ if (isValid(msg)) {
+ logger.info("extract assignments from Leader message from {} on topic {}", msg.getSource(), getTopic());
+ startDistributing(msg.getAssignments());
+ }
+
+ return null;
+ }
+
+ /**
+ * Determines if a message is valid and did not originate from this host.
+ *
+ * @param msg message to be validated
+ * @return {@code true} if the message is valid, {@code false} otherwise
+ */
+ protected boolean isValid(Leader msg) {
BucketAssignments asgn = msg.getAssignments();
if (asgn == null) {
- return null;
+ logger.warn("Leader message from {} has no assignments for topic {}", msg.getSource(), getTopic());
+ return false;
}
+ // ignore Leader messages from ourself
String source = msg.getSource();
- if (source == null) {
- return null;
+ if (source == null || source.equals(getHost())) {
+ logger.debug("ignore Leader message from {} for topic {}", msg.getSource(), getTopic());
+ return false;
}
// the new leader must equal the source
- if (source.equals(asgn.getLeader())) {
- startDistributing(asgn);
-
- if (asgn.hasAssignment(getHost())) {
- return goActive();
+ boolean result = source.equals(asgn.getLeader());
- } else {
- return goInactive();
- }
+ if (!result) {
+ logger.warn("Leader message from {} has an invalid assignment for topic {}", msg.getSource(), getTopic());
}
- return null;
+ return result;
}
/**
@@ -210,6 +218,7 @@ public abstract class State {
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Offline msg) {
+ logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic());
return null;
}
@@ -220,6 +229,7 @@ public abstract class State {
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Query msg) {
+ logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic());
return null;
}
@@ -228,7 +238,7 @@ public abstract class State {
*
* @param msg message to be published
*/
- protected void publish(Identification msg) {
+ protected final void publish(Identification msg) {
mgr.publishAdmin(msg);
}
@@ -237,7 +247,7 @@ public abstract class State {
*
* @param msg message to be published
*/
- protected void publish(Leader msg) {
+ protected final void publish(Leader msg) {
mgr.publishAdmin(msg);
}
@@ -246,7 +256,7 @@ public abstract class State {
*
* @param msg message to be published
*/
- protected void publish(Offline msg) {
+ protected final void publish(Offline msg) {
mgr.publishAdmin(msg);
}
@@ -255,7 +265,7 @@ public abstract class State {
*
* @param msg message to be published
*/
- protected void publish(Query msg) {
+ protected final void publish(Query msg) {
mgr.publishAdmin(msg);
}
@@ -265,7 +275,7 @@ public abstract class State {
* @param channel
* @param msg message to be published
*/
- protected void publish(String channel, Forward msg) {
+ protected final void publish(String channel, Forward msg) {
mgr.publish(channel, msg);
}
@@ -275,7 +285,7 @@ public abstract class State {
* @param channel
* @param msg message to be published
*/
- protected void publish(String channel, Heartbeat msg) {
+ protected final void publish(String channel, Heartbeat msg) {
mgr.publish(channel, msg);
}
@@ -284,7 +294,7 @@ public abstract class State {
*
* @param assignments
*/
- protected void startDistributing(BucketAssignments assignments) {
+ protected final void startDistributing(BucketAssignments assignments) {
if (assignments != null) {
mgr.startDistributing(assignments);
}
@@ -296,7 +306,7 @@ public abstract class State {
* @param delayMs
* @param task
*/
- protected void schedule(long delayMs, StateTimerTask task) {
+ protected final void schedule(long delayMs, StateTimerTask task) {
timers.add(mgr.schedule(delayMs, task));
}
@@ -307,7 +317,7 @@ public abstract class State {
* @param delayMs
* @param task
*/
- protected void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
+ protected final void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task));
}
@@ -316,7 +326,7 @@ public abstract class State {
*
* @return a new {@link InactiveState}
*/
- protected State internalTopicFailed() {
+ protected final State internalTopicFailed() {
publish(makeOffline());
mgr.internalTopicFailed();
@@ -330,16 +340,25 @@ public abstract class State {
*
* @return a new message
*/
- protected Heartbeat makeHeartbeat(long timestampMs) {
+ protected final Heartbeat makeHeartbeat(long timestampMs) {
return new Heartbeat(getHost(), timestampMs);
}
/**
+ * Makes an Identification message.
+ *
+ * @return a new message
+ */
+ protected Identification makeIdentification() {
+ return new Identification(getHost(), getAssignments());
+ }
+
+ /**
* Makes an "offline" message.
*
* @return a new message
*/
- protected Offline makeOffline() {
+ protected final Offline makeOffline() {
return new Offline(getHost());
}
@@ -348,7 +367,7 @@ public abstract class State {
*
* @return a new message
*/
- protected Query makeQuery() {
+ protected final Query makeQuery() {
return new Query(getHost());
}