aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java142
1 files changed, 77 insertions, 65 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
index 57521960..9045165b 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/QueryState.java
@@ -26,26 +26,31 @@ import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Offline;
-
-// TODO add logging
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * The Query state. In this state, the host waits for the other hosts to
- * identify themselves. Eventually, a leader should come forth. If not, it will
- * transition to the active or inactive state, depending on whether or not it
- * has an assignment in the current bucket assignments. The other possibility is
- * that it may <i>become</i> the leader, in which case it will also transition
- * to the active state.
+ * The Query state. In this state, the host waits for the other hosts to identify
+ * themselves. Eventually, a leader should come forth. If not, it will transition to the
+ * active or inactive state, depending on whether or not it has an assignment in the
+ * current bucket assignments. The other possibility is that it may <i>become</i> the
+ * leader, in which case it will also transition to the active state.
*/
public class QueryState extends ProcessingState {
+ private static final Logger logger = LoggerFactory.getLogger(QueryState.class);
+
/**
- * Hosts that have sent an "Identification" message. Always includes this
- * host.
+ * Hosts that have sent an "Identification" message. Always includes this host.
*/
private TreeSet<String> alive = new TreeSet<>();
/**
+ * {@code True} if we saw our own Identification method, {@code false} otherwise.
+ */
+ private boolean sawSelfIdent = false;
+
+ /**
*
* @param mgr
*/
@@ -71,44 +76,42 @@ public class QueryState extends ProcessingState {
private void awaitIdentification() {
/*
- * Once we've waited long enough for all Identification messages to
- * arrive, become the leader, assuming we should.
+ * Once we've waited long enough for all Identification messages to arrive, become
+ * the leader, assuming we should.
*/
- schedule(getProperties().getIdentificationMs(), xxx -> {
+ schedule(getProperties().getIdentificationMs(), () -> {
+
+ if (!sawSelfIdent) {
+ // didn't see our identification
+ logger.error("missed our own Ident message on topic {}", getTopic());
+ return internalTopicFailed();
- if (isLeader()) {
+ } else if (isLeader()) {
// "this" host is the new leader
+ logger.info("this host is the new leader for topic {}", getTopic());
return becomeLeader(alive);
} else if (hasAssignment()) {
/*
- * this host is not the new leader, but it does have an
- * assignment - return to the active state while we wait for the
- * leader
+ * this host is not the new leader, but it does have an assignment -
+ * return to the active state while we wait for the leader
*/
+ logger.info("no new leader on topic {}", getTopic());
return goActive();
} else {
// not the leader and no assignment yet
+ logger.info("no new leader on topic {}", getTopic());
return goInactive();
}
});
}
/**
- * Remains in this state.
- */
- @Override
- public State goQuery() {
- return null;
- }
-
- /**
* Determines if this host has an assignment in the CURRENT assignments.
*
- * @return {@code true} if this host has an assignment, {@code false}
- * otherwise
+ * @return {@code true} if this host has an assignment, {@code false} otherwise
*/
protected boolean hasAssignment() {
BucketAssignments asgn = getAssignments();
@@ -116,53 +119,73 @@ public class QueryState extends ProcessingState {
}
@Override
+ public State goQuery() {
+ return null;
+ }
+
+ @Override
public State process(Identification msg) {
- recordInfo(msg.getSource(), msg.getAssignments());
+ if (getHost().equals(msg.getSource())) {
+ logger.info("saw our own Ident message on topic {}", getTopic());
+ sawSelfIdent = true;
+
+ } else {
+ logger.info("received Ident message from {} on topic {}", msg.getSource(), getTopic());
+ recordInfo(msg.getSource(), msg.getAssignments());
+ }
return null;
}
/**
- * If the message leader is better than the leader we have, then go active
- * with it. Otherwise, simply treat it like an {@link Identification}
- * message.
+ * If the message leader is better than the leader we have, then go active with it.
+ * Otherwise, simply treat it like an {@link Identification} message.
*/
@Override
public State process(Leader msg) {
- BucketAssignments asgn = msg.getAssignments();
- if (asgn == null) {
+ if (!isValid(msg)) {
return null;
}
- // ignore Leader messages from ourself
String source = msg.getSource();
- if (source == null || source.equals(getHost())) {
- return null;
- }
-
- // the new leader must equal the source
- if (!source.equals(asgn.getLeader())) {
- return null;
- }
+ BucketAssignments asgn = msg.getAssignments();
- // go active, if this has a better leader than the one we have
- if (source.compareTo(getLeader()) < 0) {
- return super.process(msg);
+ // go active, if this has a leader that's the same or better than the one we have
+ if (source.compareTo(getLeader()) <= 0) {
+ logger.warn("leader with {} on topic {}", source, getTopic());
+ return goActive(asgn);
}
/*
- * The message does not have an acceptable leader, but we'll still
- * record its info.
+ * The message does not have an acceptable leader, but we'll still record its
+ * info.
*/
- recordInfo(msg.getSource(), msg.getAssignments());
+ logger.info("record leader info from {} on topic {}", source, getTopic());
+ recordInfo(source, asgn);
+
+ return null;
+ }
+
+ @Override
+ public State process(Offline msg) {
+ String host = msg.getSource();
+
+ if (host != null && !host.equals(getHost())) {
+ logger.warn("host {} offline on topic {}", host, getTopic());
+ alive.remove(host);
+ setLeader(alive.first());
+
+ } else {
+ logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic());
+ }
return null;
}
/**
- * Records info from a message, adding the source host name to
- * {@link #alive}, and updating the bucket assignments.
+ * Records info from a message, adding the source host name to {@link #alive}, and
+ * updating the bucket assignments.
*
* @param source the message's source host
* @param assignments assignments, or {@code null}
@@ -181,29 +204,18 @@ public class QueryState extends ProcessingState {
// record assignments, if we don't have any yet
BucketAssignments current = getAssignments();
if (current == null) {
+ logger.info("received initial assignments on topic {}", getTopic());
setAssignments(assignments);
return;
}
/*
- * Record assignments, if the new assignments have a better (i.e.,
- * lesser) leader.
+ * Record assignments, if the new assignments have a better (i.e., lesser) leader.
*/
String curldr = current.getLeader();
- if (curldr == null || curldr.compareTo(assignments.getLeader()) > 0) {
+ if (curldr == null || assignments.getLeader().compareTo(curldr) < 0) {
+ logger.info("use new assignments from {} on topic {}", source, getTopic());
setAssignments(assignments);
}
}
-
- @Override
- public State process(Offline msg) {
- String host = msg.getSource();
-
- if (host != null && !host.equals(getHost())) {
- alive.remove(host);
- setLeader(alive.first());
- }
-
- return null;
- }
}