aboutsummaryrefslogtreecommitdiffstats
path: root/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java')
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java211
1 files changed, 113 insertions, 98 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java
index 9d864bd7..06b02527 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java
@@ -83,6 +83,13 @@ class Leader {
private static int stableVotingCycles;
/**
+ * Hide implicit public constructor.
+ */
+ private Leader() {
+ // everything here is static -- no instances of this class are created
+ }
+
+ /**
* Invoked at startup, or after some events -- immediately start a new vote.
*/
static void startup() {
@@ -125,23 +132,18 @@ class Leader {
// decode base64 data
final byte[] packet = Base64.getDecoder().decode(data);
- MainLoop.queueWork(new Runnable() {
- /**
- * This method is running within the 'MainLoop' thread.
- */
- @Override
- public void run() {
- // create the 'VoteCycle' state machine, if needed
- if (voteCycle == null) {
- voteCycle = new VoteCycle();
- MainLoop.addBackgroundWork(voteCycle);
- }
- try {
- // pass data to 'VoteCycle' state machine
- voteCycle.packetReceived(packet);
- } catch (IOException e) {
- logger.error("Exception in 'Leader.voteData", e);
- }
+ MainLoop.queueWork(() -> {
+ // This runs within the 'MainLoop' thread --
+ // create the 'VoteCycle' state machine, if needed
+ if (voteCycle == null) {
+ voteCycle = new VoteCycle();
+ MainLoop.addBackgroundWork(voteCycle);
+ }
+ try {
+ // pass data to 'VoteCycle' state machine
+ voteCycle.packetReceived(packet);
+ } catch (IOException e) {
+ logger.error("Exception in 'Leader.voteData", e);
}
});
}
@@ -250,94 +252,107 @@ class Leader {
@Override
public void run() {
switch (state) {
- case STARTUP: {
- // 5-second grace period -- wait for things to stablize before
- // starting the vote
- if ((cycleCount -= 1) <= 0) {
- logger.info("VoteCycle: {} seconds have passed",
- stableIdleCycles);
- //MainLoop.removeBackgroundWork(this);
- updateMyVote();
- sendOutUpdates();
- state = State.VOTING;
- cycleCount = stableVotingCycles;
- }
+ case STARTUP:
+ startupState();
break;
- }
- case VOTING: {
- // need to be in the VOTING state without any vote changes
- // for 5 seconds -- once this happens, the leader is chosen
- if (sendOutUpdates()) {
- // changes have occurred -- set the grace period to 5 seconds
- cycleCount = stableVotingCycles;
- } else if ((cycleCount -= 1) <= 0) {
- // 5 second grace period has passed -- the leader is one with
- // the most votes, which is the first entry in 'voteData'
- Server oldLeader = leader;
- leader = Server.getServer(voteData.first().uuid);
- if (leader != oldLeader) {
- // the leader has changed -- send out notifications
- for (Events listener : Events.getListeners()) {
- listener.newLeader(leader);
- }
- } else {
- // the election is over, and the leader has been confirmed
- for (Events listener : Events.getListeners()) {
- listener.leaderConfirmed(leader);
- }
- }
- if (leader == Server.getThisServer()) {
- // this is the lead server --
- // make sure the 'Discovery' threads are running
- Discovery.startDiscovery();
- } else {
- // this is not the lead server -- stop 'Discovery' threads
- Discovery.stopDiscovery();
- }
-
- // we are done with voting -- clean up, and report results
- MainLoop.removeBackgroundWork(this);
- voteCycle = null;
-
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- PrintStream out = new PrintStream(bos);
-
- out.println("Voting results:");
-
- // x(36) xxxxx x(36)
- // UUID Votes Voter
- String format = "%-36s %5s %-36s\n";
- out.format(format, "UUID", "Votes", "Voter(s)");
- out.format(format, "----", "-----", "--------");
-
- for (VoteData vote : voteData) {
- if (vote.voters.isEmpty()) {
- out.format(format, vote.uuid, 0, "");
- } else {
- boolean headerNeeded = true;
- for (VoterData voter : vote.voters) {
- if (headerNeeded) {
- out.format(format, vote.uuid,
- vote.voters.size(), voter.uuid);
- headerNeeded = false;
- } else {
- out.format(format, "", "", voter.uuid);
- }
- }
- }
- }
-
- logger.info(bos.toString());
- }
+ case VOTING:
+ votingState();
break;
- }
+
default:
logger.error("Unknown state: {}", state);
break;
}
}
+ private void startupState() {
+ // 5-second grace period -- wait for things to stablize before
+ // starting the vote
+ cycleCount -= 1;
+ if (cycleCount <= 0) {
+ logger.info("VoteCycle: {} seconds have passed",
+ stableIdleCycles);
+ updateMyVote();
+ sendOutUpdates();
+ state = State.VOTING;
+ cycleCount = stableVotingCycles;
+ }
+ }
+
+ private void votingState() {
+ // need to be in the VOTING state without any vote changes
+ // for 5 seconds -- once this happens, the leader is chosen
+ if (sendOutUpdates()) {
+ // changes have occurred -- set the grace period to 5 seconds
+ cycleCount = stableVotingCycles;
+ return;
+ }
+
+ cycleCount -= 1;
+ if (cycleCount > 0) {
+ return;
+ }
+
+ // 5 second grace period has passed -- the leader is one with
+ // the most votes, which is the first entry in 'voteData'
+ Server oldLeader = leader;
+ leader = Server.getServer(voteData.first().uuid);
+ if (leader != oldLeader) {
+ // the leader has changed -- send out notifications
+ for (Events listener : Events.getListeners()) {
+ listener.newLeader(leader);
+ }
+ } else {
+ // the election is over, and the leader has been confirmed
+ for (Events listener : Events.getListeners()) {
+ listener.leaderConfirmed(leader);
+ }
+ }
+ if (leader == Server.getThisServer()) {
+ // this is the lead server --
+ // make sure the 'Discovery' threads are running
+ Discovery.startDiscovery();
+ } else {
+ // this is not the lead server -- stop 'Discovery' threads
+ Discovery.stopDiscovery();
+ }
+
+ // we are done with voting -- clean up, and report results
+ MainLoop.removeBackgroundWork(this);
+ voteCycle = null;
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ PrintStream out = new PrintStream(bos);
+
+ out.println("Voting results:");
+
+ // x(36) xxxxx x(36)
+ // UUID Votes Voter
+ String format = "%-36s %5s %-36s\n";
+ out.format(format, "UUID", "Votes", "Voter(s)");
+ out.format(format, "----", "-----", "--------");
+
+ for (VoteData vote : voteData) {
+ if (vote.voters.isEmpty()) {
+ out.format(format, vote.uuid, 0, "");
+ continue;
+ }
+ boolean headerNeeded = true;
+ for (VoterData voter : vote.voters) {
+ if (headerNeeded) {
+ out.format(format, vote.uuid,
+ vote.voters.size(), voter.uuid);
+ headerNeeded = false;
+ } else {
+ out.format(format, "", "", voter.uuid);
+ }
+ }
+ }
+
+ logger.info(bos.toString());
+ }
+
/**
* Process an incoming /vote REST message.
*
@@ -375,7 +390,7 @@ class Leader {
private void processVote(UUID voter, UUID vote, long timestamp) {
// fetch old data for this voter
VoterData voterData = uuidToVoterData.computeIfAbsent(voter,
- (key) -> new VoterData(voter, timestamp));
+ key -> new VoterData(voter, timestamp));
if (timestamp >= voterData.timestamp) {
// this is a new vote for this voter -- update the timestamp
voterData.timestamp = timestamp;
@@ -389,7 +404,7 @@ class Leader {
VoteData newVoteData = null;
if (vote != null) {
- newVoteData = uuidToVoteData.computeIfAbsent(vote, (key) -> new VoteData(vote));
+ newVoteData = uuidToVoteData.computeIfAbsent(vote, key -> new VoteData(vote));
}
if (oldVoteData != newVoteData) {