diff options
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java')
-rw-r--r-- | feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java | 142 |
1 files changed, 77 insertions, 65 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java index 57521960..9045165b 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java @@ -26,26 +26,31 @@ import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Identification; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Offline; - -// TODO add logging +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * The Query state. In this state, the host waits for the other hosts to - * identify themselves. Eventually, a leader should come forth. If not, it will - * transition to the active or inactive state, depending on whether or not it - * has an assignment in the current bucket assignments. The other possibility is - * that it may <i>become</i> the leader, in which case it will also transition - * to the active state. + * The Query state. In this state, the host waits for the other hosts to identify + * themselves. Eventually, a leader should come forth. If not, it will transition to the + * active or inactive state, depending on whether or not it has an assignment in the + * current bucket assignments. The other possibility is that it may <i>become</i> the + * leader, in which case it will also transition to the active state. */ public class QueryState extends ProcessingState { + private static final Logger logger = LoggerFactory.getLogger(QueryState.class); + /** - * Hosts that have sent an "Identification" message. Always includes this - * host. + * Hosts that have sent an "Identification" message. Always includes this host. */ private TreeSet<String> alive = new TreeSet<>(); /** + * {@code True} if we saw our own Identification method, {@code false} otherwise. + */ + private boolean sawSelfIdent = false; + + /** * * @param mgr */ @@ -71,44 +76,42 @@ public class QueryState extends ProcessingState { private void awaitIdentification() { /* - * Once we've waited long enough for all Identification messages to - * arrive, become the leader, assuming we should. + * Once we've waited long enough for all Identification messages to arrive, become + * the leader, assuming we should. */ - schedule(getProperties().getIdentificationMs(), xxx -> { + schedule(getProperties().getIdentificationMs(), () -> { + + if (!sawSelfIdent) { + // didn't see our identification + logger.error("missed our own Ident message on topic {}", getTopic()); + return internalTopicFailed(); - if (isLeader()) { + } else if (isLeader()) { // "this" host is the new leader + logger.info("this host is the new leader for topic {}", getTopic()); return becomeLeader(alive); } else if (hasAssignment()) { /* - * this host is not the new leader, but it does have an - * assignment - return to the active state while we wait for the - * leader + * this host is not the new leader, but it does have an assignment - + * return to the active state while we wait for the leader */ + logger.info("no new leader on topic {}", getTopic()); return goActive(); } else { // not the leader and no assignment yet + logger.info("no new leader on topic {}", getTopic()); return goInactive(); } }); } /** - * Remains in this state. - */ - @Override - public State goQuery() { - return null; - } - - /** * Determines if this host has an assignment in the CURRENT assignments. * - * @return {@code true} if this host has an assignment, {@code false} - * otherwise + * @return {@code true} if this host has an assignment, {@code false} otherwise */ protected boolean hasAssignment() { BucketAssignments asgn = getAssignments(); @@ -116,53 +119,73 @@ public class QueryState extends ProcessingState { } @Override + public State goQuery() { + return null; + } + + @Override public State process(Identification msg) { - recordInfo(msg.getSource(), msg.getAssignments()); + if (getHost().equals(msg.getSource())) { + logger.info("saw our own Ident message on topic {}", getTopic()); + sawSelfIdent = true; + + } else { + logger.info("received Ident message from {} on topic {}", msg.getSource(), getTopic()); + recordInfo(msg.getSource(), msg.getAssignments()); + } return null; } /** - * If the message leader is better than the leader we have, then go active - * with it. Otherwise, simply treat it like an {@link Identification} - * message. + * If the message leader is better than the leader we have, then go active with it. + * Otherwise, simply treat it like an {@link Identification} message. */ @Override public State process(Leader msg) { - BucketAssignments asgn = msg.getAssignments(); - if (asgn == null) { + if (!isValid(msg)) { return null; } - // ignore Leader messages from ourself String source = msg.getSource(); - if (source == null || source.equals(getHost())) { - return null; - } - - // the new leader must equal the source - if (!source.equals(asgn.getLeader())) { - return null; - } + BucketAssignments asgn = msg.getAssignments(); - // go active, if this has a better leader than the one we have - if (source.compareTo(getLeader()) < 0) { - return super.process(msg); + // go active, if this has a leader that's the same or better than the one we have + if (source.compareTo(getLeader()) <= 0) { + logger.warn("leader with {} on topic {}", source, getTopic()); + return goActive(asgn); } /* - * The message does not have an acceptable leader, but we'll still - * record its info. + * The message does not have an acceptable leader, but we'll still record its + * info. */ - recordInfo(msg.getSource(), msg.getAssignments()); + logger.info("record leader info from {} on topic {}", source, getTopic()); + recordInfo(source, asgn); + + return null; + } + + @Override + public State process(Offline msg) { + String host = msg.getSource(); + + if (host != null && !host.equals(getHost())) { + logger.warn("host {} offline on topic {}", host, getTopic()); + alive.remove(host); + setLeader(alive.first()); + + } else { + logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic()); + } return null; } /** - * Records info from a message, adding the source host name to - * {@link #alive}, and updating the bucket assignments. + * Records info from a message, adding the source host name to {@link #alive}, and + * updating the bucket assignments. * * @param source the message's source host * @param assignments assignments, or {@code null} @@ -181,29 +204,18 @@ public class QueryState extends ProcessingState { // record assignments, if we don't have any yet BucketAssignments current = getAssignments(); if (current == null) { + logger.info("received initial assignments on topic {}", getTopic()); setAssignments(assignments); return; } /* - * Record assignments, if the new assignments have a better (i.e., - * lesser) leader. + * Record assignments, if the new assignments have a better (i.e., lesser) leader. */ String curldr = current.getLeader(); - if (curldr == null || curldr.compareTo(assignments.getLeader()) > 0) { + if (curldr == null || assignments.getLeader().compareTo(curldr) < 0) { + logger.info("use new assignments from {} on topic {}", source, getTopic()); setAssignments(assignments); } } - - @Override - public State process(Offline msg) { - String host = msg.getSource(); - - if (host != null && !host.equals(getHost())) { - alive.remove(host); - setLeader(alive.first()); - } - - return null; - } } |