summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java95
1 files changed, 60 insertions, 35 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
index 5f503a3b..b0a36cd9 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/ActiveState.java
@@ -24,16 +24,20 @@ import java.util.Arrays;
import java.util.TreeSet;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Offline;
-import org.onap.policy.drools.pooling.message.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * The active state. In this state, this host has one more more bucket
- * assignments and processes any events associated with one of its buckets.
- * Other events are forwarded to appropriate target hosts.
+ * The active state. In this state, this host has one more more bucket assignments and
+ * processes any events associated with one of its buckets. Other events are forwarded to
+ * appropriate target hosts.
*/
public class ActiveState extends ProcessingState {
+ private static final Logger logger = LoggerFactory.getLogger(ActiveState.class);
+
/**
* Set of hosts that have been assigned a bucket.
*/
@@ -50,8 +54,8 @@ public class ActiveState extends ProcessingState {
private String predHost = "";
/**
- * {@code True} if we saw this host's heart beat since the last check,
- * {@code false} otherwise.
+ * {@code True} if we saw this host's heart beat since the last check, {@code false}
+ * otherwise.
*/
private boolean myHeartbeatSeen = false;
@@ -74,14 +78,14 @@ public class ActiveState extends ProcessingState {
}
/**
- * Determine this host's neighbors based on the order of the host UUIDs.
- * Updates {@link #succHost} and {@link #predHost}.
+ * Determine this host's neighbors based on the order of the host UUIDs. Updates
+ * {@link #succHost} and {@link #predHost}.
*/
private void detmNeighbors() {
if (assigned.size() < 2) {
+ logger.info("this host has no neighbors on topic {}", getTopic());
/*
- * this host is the only one with any assignments - it has no
- * neighbors
+ * this host is the only one with any assignments - it has no neighbors
*/
succHost = null;
predHost = "";
@@ -91,11 +95,13 @@ public class ActiveState extends ProcessingState {
if ((succHost = assigned.higher(getHost())) == null) {
// wrapped around - successor is the first host in the set
succHost = assigned.first();
+ logger.info("this host's successor is {} on topic {}", succHost, getTopic());
}
if ((predHost = assigned.lower(getHost())) == null) {
// wrapped around - predecessor is the last host in the set
predHost = assigned.last();
+ logger.info("this host's predecessor is {} on topic {}", predHost, getTopic());
}
}
@@ -109,13 +115,14 @@ public class ActiveState extends ProcessingState {
* Adds the timers.
*/
private void addTimers() {
+ logger.info("add timers");
/*
* heart beat generator
*/
long genMs = getProperties().getActiveHeartbeatMs();
- scheduleWithFixedDelay(genMs, genMs, xxx -> {
+ scheduleWithFixedDelay(genMs, genMs, () -> {
genHeartbeat();
return null;
});
@@ -125,13 +132,14 @@ public class ActiveState extends ProcessingState {
*/
long interMs = getProperties().getInterHeartbeatMs();
- scheduleWithFixedDelay(interMs, interMs, xxx -> {
+ scheduleWithFixedDelay(interMs, interMs, () -> {
if (myHeartbeatSeen) {
myHeartbeatSeen = false;
return null;
}
// missed my heart beat
+ logger.error("missed my heartbeat on topic {}", getTopic());
return internalTopicFailed();
});
@@ -141,13 +149,15 @@ public class ActiveState extends ProcessingState {
*/
if (!predHost.isEmpty()) {
- scheduleWithFixedDelay(interMs, interMs, xxx -> {
+ scheduleWithFixedDelay(interMs, interMs, () -> {
if (predHeartbeatSeen) {
predHeartbeatSeen = false;
return null;
}
// missed the predecessor's heart beat
+ logger.warn("missed predecessor's heartbeat on topic {}", getTopic());
+
publish(makeQuery());
return goQuery();
@@ -172,70 +182,85 @@ public class ActiveState extends ProcessingState {
String src = msg.getSource();
if (src == null) {
+ logger.warn("Heartbeat message has no source on topic {}", getTopic());
return null;
} else if (src.equals(getHost())) {
+ logger.info("saw my heartbeat on topic {}", getTopic());
myHeartbeatSeen = true;
} else if (src.equals(predHost)) {
+ logger.info("saw heartbeat from {} on topic {}", src, getTopic());
predHeartbeatSeen = true;
-
+
+ } else {
+ logger.info("ignored heartbeat message from {} on topic {}", src, getTopic());
}
return null;
}
@Override
+ public State process(Leader msg) {
+ if (!isValid(msg)) {
+ return null;
+ }
+
+ String src = msg.getSource();
+
+ if (getHost().compareTo(src) < 0) {
+ // our host would be a better leader - find out what's up
+ logger.warn("unexpected Leader message from {} on topic {}", src, getTopic());
+ return goQuery();
+ }
+
+ logger.info("have a new leader {} on topic {}", src, getTopic());
+
+ return goActive(msg.getAssignments());
+ }
+
+ @Override
public State process(Offline msg) {
String src = msg.getSource();
if (src == null) {
+ logger.warn("Offline message has no source on topic {}", getTopic());
return null;
} else if (!assigned.contains(src)) {
/*
- * the offline host wasn't assigned any buckets, so just ignore the
- * message
+ * the offline host wasn't assigned any buckets, so just ignore the message
*/
+ logger.info("ignore Offline message from unassigned source {} on topic {}", src, getTopic());
return null;
} else if (isLeader() || (predHost.equals(src) && predHost.equals(assigned.first()))) {
/*
* Case 1: We are the leader.
*
- * Case 2: Our predecessor was the leader and it has gone offline -
- * we should become the leader.
+ * Case 2: Our predecessor was the leader and it has gone offline - we should
+ * become the leader.
*
- * In either case, we are now the leader and we must re-balance the
- * buckets since one of the hosts has gone offline.
+ * In either case, we are now the leader and we must re-balance the buckets
+ * since one of the hosts has gone offline.
*/
+ logger.info("Offline message from source {} on topic {}", src, getTopic());
+
assigned.remove(src);
return becomeLeader(assigned);
} else {
/*
- * Otherwise, we don't care right now - we'll wait for the leader to
- * tell us it's been removed.
+ * Otherwise, we don't care right now - we'll wait for the leader to tell us
+ * it's been removed.
*/
+ logger.info("ignore Offline message from source {} on topic {}", src, getTopic());
return null;
}
}
- /**
- * Transitions to the query state.
- */
- @Override
- public State process(Query msg) {
- State next = super.process(msg);
- if (next != null) {
- return next;
- }
-
- return goQuery();
- }
-
protected String getSuccHost() {
return succHost;
}