diff options
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java')
-rw-r--r-- | feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java | 95 |
1 files changed, 60 insertions, 35 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java index 5f503a3b..b0a36cd9 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java @@ -24,16 +24,20 @@ import java.util.Arrays; import java.util.TreeSet; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.message.Heartbeat; +import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Offline; -import org.onap.policy.drools.pooling.message.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * The active state. In this state, this host has one more more bucket - * assignments and processes any events associated with one of its buckets. - * Other events are forwarded to appropriate target hosts. + * The active state. In this state, this host has one more more bucket assignments and + * processes any events associated with one of its buckets. Other events are forwarded to + * appropriate target hosts. */ public class ActiveState extends ProcessingState { + private static final Logger logger = LoggerFactory.getLogger(ActiveState.class); + /** * Set of hosts that have been assigned a bucket. */ @@ -50,8 +54,8 @@ public class ActiveState extends ProcessingState { private String predHost = ""; /** - * {@code True} if we saw this host's heart beat since the last check, - * {@code false} otherwise. + * {@code True} if we saw this host's heart beat since the last check, {@code false} + * otherwise. */ private boolean myHeartbeatSeen = false; @@ -74,14 +78,14 @@ public class ActiveState extends ProcessingState { } /** - * Determine this host's neighbors based on the order of the host UUIDs. - * Updates {@link #succHost} and {@link #predHost}. + * Determine this host's neighbors based on the order of the host UUIDs. Updates + * {@link #succHost} and {@link #predHost}. */ private void detmNeighbors() { if (assigned.size() < 2) { + logger.info("this host has no neighbors on topic {}", getTopic()); /* - * this host is the only one with any assignments - it has no - * neighbors + * this host is the only one with any assignments - it has no neighbors */ succHost = null; predHost = ""; @@ -91,11 +95,13 @@ public class ActiveState extends ProcessingState { if ((succHost = assigned.higher(getHost())) == null) { // wrapped around - successor is the first host in the set succHost = assigned.first(); + logger.info("this host's successor is {} on topic {}", succHost, getTopic()); } if ((predHost = assigned.lower(getHost())) == null) { // wrapped around - predecessor is the last host in the set predHost = assigned.last(); + logger.info("this host's predecessor is {} on topic {}", predHost, getTopic()); } } @@ -109,13 +115,14 @@ public class ActiveState extends ProcessingState { * Adds the timers. */ private void addTimers() { + logger.info("add timers"); /* * heart beat generator */ long genMs = getProperties().getActiveHeartbeatMs(); - scheduleWithFixedDelay(genMs, genMs, xxx -> { + scheduleWithFixedDelay(genMs, genMs, () -> { genHeartbeat(); return null; }); @@ -125,13 +132,14 @@ public class ActiveState extends ProcessingState { */ long interMs = getProperties().getInterHeartbeatMs(); - scheduleWithFixedDelay(interMs, interMs, xxx -> { + scheduleWithFixedDelay(interMs, interMs, () -> { if (myHeartbeatSeen) { myHeartbeatSeen = false; return null; } // missed my heart beat + logger.error("missed my heartbeat on topic {}", getTopic()); return internalTopicFailed(); }); @@ -141,13 +149,15 @@ public class ActiveState extends ProcessingState { */ if (!predHost.isEmpty()) { - scheduleWithFixedDelay(interMs, interMs, xxx -> { + scheduleWithFixedDelay(interMs, interMs, () -> { if (predHeartbeatSeen) { predHeartbeatSeen = false; return null; } // missed the predecessor's heart beat + logger.warn("missed predecessor's heartbeat on topic {}", getTopic()); + publish(makeQuery()); return goQuery(); @@ -172,70 +182,85 @@ public class ActiveState extends ProcessingState { String src = msg.getSource(); if (src == null) { + logger.warn("Heartbeat message has no source on topic {}", getTopic()); return null; } else if (src.equals(getHost())) { + logger.info("saw my heartbeat on topic {}", getTopic()); myHeartbeatSeen = true; } else if (src.equals(predHost)) { + logger.info("saw heartbeat from {} on topic {}", src, getTopic()); predHeartbeatSeen = true; - + + } else { + logger.info("ignored heartbeat message from {} on topic {}", src, getTopic()); } return null; } @Override + public State process(Leader msg) { + if (!isValid(msg)) { + return null; + } + + String src = msg.getSource(); + + if (getHost().compareTo(src) < 0) { + // our host would be a better leader - find out what's up + logger.warn("unexpected Leader message from {} on topic {}", src, getTopic()); + return goQuery(); + } + + logger.info("have a new leader {} on topic {}", src, getTopic()); + + return goActive(msg.getAssignments()); + } + + @Override public State process(Offline msg) { String src = msg.getSource(); if (src == null) { + logger.warn("Offline message has no source on topic {}", getTopic()); return null; } else if (!assigned.contains(src)) { /* - * the offline host wasn't assigned any buckets, so just ignore the - * message + * the offline host wasn't assigned any buckets, so just ignore the message */ + logger.info("ignore Offline message from unassigned source {} on topic {}", src, getTopic()); return null; } else if (isLeader() || (predHost.equals(src) && predHost.equals(assigned.first()))) { /* * Case 1: We are the leader. * - * Case 2: Our predecessor was the leader and it has gone offline - - * we should become the leader. + * Case 2: Our predecessor was the leader and it has gone offline - we should + * become the leader. * - * In either case, we are now the leader and we must re-balance the - * buckets since one of the hosts has gone offline. + * In either case, we are now the leader and we must re-balance the buckets + * since one of the hosts has gone offline. */ + logger.info("Offline message from source {} on topic {}", src, getTopic()); + assigned.remove(src); return becomeLeader(assigned); } else { /* - * Otherwise, we don't care right now - we'll wait for the leader to - * tell us it's been removed. + * Otherwise, we don't care right now - we'll wait for the leader to tell us + * it's been removed. */ + logger.info("ignore Offline message from source {} on topic {}", src, getTopic()); return null; } } - /** - * Transitions to the query state. - */ - @Override - public State process(Query msg) { - State next = super.process(msg); - if (next != null) { - return next; - } - - return goQuery(); - } - protected String getSuccHost() { return succHost; } |