aboutsummaryrefslogtreecommitdiffstats
path: root/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java')
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java255
1 files changed, 155 insertions, 100 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
index b82f2e1d..6241a297 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
@@ -360,63 +360,31 @@ public class Bucket {
case OWNER_NULL:
// <OWNER_NULL> -- owner UUID should be set to 'null'
- if (bucket.getOwner() != null) {
- logger.info("Bucket {} owner: {}->null",
- index, bucket.getOwner());
- bucketChanges = true;
- synchronized (bucket) {
- bucket.setOwner(null);
- bucket.setState(null);
- }
- }
+ bucketChanges = nullifyOwner(index, bucket, bucketChanges);
break;
case PRIMARY_BACKUP_UPDATE:
// <PRIMARY_BACKUP_UPDATE> <primary-backup-uuid> --
// primary backup UUID specified
- Server newPrimaryBackup =
- Server.getServer(Util.readUuid(dis));
- if (bucket.primaryBackup != newPrimaryBackup) {
- logger.info("Bucket {} primary backup: {}->{}", index,
- bucket.primaryBackup, newPrimaryBackup);
- bucketChanges = true;
- bucket.primaryBackup = newPrimaryBackup;
- }
+ bucketChanges = updatePrimaryBackup(dis, index, bucket, bucketChanges);
break;
case PRIMARY_BACKUP_NULL:
// <PRIMARY_BACKUP_NULL> --
// primary backup should be set to 'null'
- if (bucket.primaryBackup != null) {
- logger.info("Bucket {} primary backup: {}->null",
- index, bucket.primaryBackup);
- bucketChanges = true;
- bucket.primaryBackup = null;
- }
+ bucketChanges = nullifyPrimaryBackup(index, bucket, bucketChanges);
break;
case SECONDARY_BACKUP_UPDATE:
// <SECONDARY_BACKUP_UPDATE> <secondary-backup-uuid> --
// secondary backup UUID specified
- Server newSecondaryBackup =
- Server.getServer(Util.readUuid(dis));
- if (bucket.secondaryBackup != newSecondaryBackup) {
- logger.info("Bucket {} secondary backup: {}->{}", index,
- bucket.secondaryBackup, newSecondaryBackup);
- bucketChanges = true;
- bucket.secondaryBackup = newSecondaryBackup;
- }
+ bucketChanges = updateSecondaryBackup(dis, index, bucket, bucketChanges);
break;
case SECONDARY_BACKUP_NULL:
// <SECONDARY_BACKUP_NULL> --
// secondary backup should be set to 'null'
- if (bucket.secondaryBackup != null) {
- logger.info("Bucket {} secondary backup: {}->null",
- index, bucket.secondaryBackup);
- bucketChanges = true;
- bucket.secondaryBackup = null;
- }
+ bucketChanges = nullifySecondaryBackup(index, bucket, bucketChanges);
break;
default:
@@ -433,6 +401,65 @@ public class Bucket {
return changes;
}
+ private static boolean nullifyOwner(int index, Bucket bucket, boolean bucketChanges) {
+ if (bucket.getOwner() != null) {
+ logger.info("Bucket {} owner: {}->null",
+ index, bucket.getOwner());
+ bucketChanges = true;
+ synchronized (bucket) {
+ bucket.setOwner(null);
+ bucket.setState(null);
+ }
+ }
+ return bucketChanges;
+ }
+
+ private static boolean updatePrimaryBackup(DataInputStream dis, int index, Bucket bucket, boolean bucketChanges)
+ throws IOException {
+ Server newPrimaryBackup =
+ Server.getServer(Util.readUuid(dis));
+ if (bucket.primaryBackup != newPrimaryBackup) {
+ logger.info("Bucket {} primary backup: {}->{}", index,
+ bucket.primaryBackup, newPrimaryBackup);
+ bucketChanges = true;
+ bucket.primaryBackup = newPrimaryBackup;
+ }
+ return bucketChanges;
+ }
+
+ private static boolean nullifyPrimaryBackup(int index, Bucket bucket, boolean bucketChanges) {
+ if (bucket.primaryBackup != null) {
+ logger.info("Bucket {} primary backup: {}->null",
+ index, bucket.primaryBackup);
+ bucketChanges = true;
+ bucket.primaryBackup = null;
+ }
+ return bucketChanges;
+ }
+
+ private static boolean updateSecondaryBackup(DataInputStream dis, int index, Bucket bucket, boolean bucketChanges)
+ throws IOException {
+ Server newSecondaryBackup =
+ Server.getServer(Util.readUuid(dis));
+ if (bucket.secondaryBackup != newSecondaryBackup) {
+ logger.info("Bucket {} secondary backup: {}->{}", index,
+ bucket.secondaryBackup, newSecondaryBackup);
+ bucketChanges = true;
+ bucket.secondaryBackup = newSecondaryBackup;
+ }
+ return bucketChanges;
+ }
+
+ private static boolean nullifySecondaryBackup(int index, Bucket bucket, boolean bucketChanges) {
+ if (bucket.secondaryBackup != null) {
+ logger.info("Bucket {} secondary backup: {}->null",
+ index, bucket.secondaryBackup);
+ bucketChanges = true;
+ bucket.secondaryBackup = null;
+ }
+ return bucketChanges;
+ }
+
/**
* Update bucket owner information.
*
@@ -768,19 +795,7 @@ public class Bucket {
out.println("Moving bucket " + bucketNumber + " from "
+ oldHost + " to " + newHost);
- /*
- * update the owner, and ensure that the primary and secondary backup
- * remain different from the owner.
- */
- bucket.setOwner(newHost);
- if (newHost == bucket.primaryBackup) {
- out.println("Moving primary back from " + newHost + " to " + oldHost);
- bucket.setPrimaryBackup(oldHost);
- } else if (newHost == bucket.secondaryBackup) {
- out.println("Moving secondary back from " + newHost
- + " to " + oldHost);
- bucket.setSecondaryBackup(oldHost);
- }
+ updateOwner(out, bucket, oldHost, newHost);
try {
/*
@@ -796,6 +811,22 @@ public class Bucket {
}
}
+ private static void updateOwner(PrintStream out, TestBucket bucket, TestServer oldHost, TestServer newHost) {
+ /*
+ * update the owner, and ensure that the primary and secondary backup
+ * remain different from the owner.
+ */
+ bucket.setOwner(newHost);
+ if (newHost == bucket.primaryBackup) {
+ out.println("Moving primary back from " + newHost + " to " + oldHost);
+ bucket.setPrimaryBackup(oldHost);
+ } else if (newHost == bucket.secondaryBackup) {
+ out.println("Moving secondary back from " + newHost
+ + " to " + oldHost);
+ bucket.setSecondaryBackup(oldHost);
+ }
+ }
+
/**
* This method is called when an incoming /bucket/sessionData message is
* received from the old owner of the bucket, which presumably means that
@@ -1485,6 +1516,10 @@ public class Bucket {
return;
}
+ makeTestBucket(bucketSnapshot);
+ }
+
+ private void makeTestBucket(final Bucket[] bucketSnapshot) {
/*
* Now, create a 'TestBucket' table that mirrors the 'Bucket' table.
* Unlike the standard 'Bucket' and 'Server' tables, the 'TestServer'
@@ -1796,28 +1831,13 @@ public class Bucket {
dos.writeShort(i);
// 'owner' field
- if (newOwner != null) {
- dos.writeByte(OWNER_UPDATE);
- Util.writeUuid(dos, newOwner);
- } else {
- dos.writeByte(OWNER_NULL);
- }
+ writeOwner(dos, newOwner);
// 'primaryBackup' field
- if (newPrimary != null) {
- dos.writeByte(PRIMARY_BACKUP_UPDATE);
- Util.writeUuid(dos, newPrimary);
- } else {
- dos.writeByte(PRIMARY_BACKUP_NULL);
- }
+ writePrimary(dos, newPrimary);
// 'secondaryBackup' field
- if (newSecondary != null) {
- dos.writeByte(SECONDARY_BACKUP_UPDATE);
- Util.writeUuid(dos, newSecondary);
- } else {
- dos.writeByte(SECONDARY_BACKUP_NULL);
- }
+ writeSecondary(dos, newSecondary);
dos.writeByte(END_OF_PARAMETERS_TAG);
}
@@ -1851,6 +1871,33 @@ public class Bucket {
MainLoop.queueWork(task);
}
+ private void writeOwner(DataOutputStream dos, UUID newOwner) throws IOException {
+ if (newOwner != null) {
+ dos.writeByte(OWNER_UPDATE);
+ Util.writeUuid(dos, newOwner);
+ } else {
+ dos.writeByte(OWNER_NULL);
+ }
+ }
+
+ private void writePrimary(DataOutputStream dos, UUID newPrimary) throws IOException {
+ if (newPrimary != null) {
+ dos.writeByte(PRIMARY_BACKUP_UPDATE);
+ Util.writeUuid(dos, newPrimary);
+ } else {
+ dos.writeByte(PRIMARY_BACKUP_NULL);
+ }
+ }
+
+ private void writeSecondary(DataOutputStream dos, UUID newSecondary) throws IOException {
+ if (newSecondary != null) {
+ dos.writeByte(SECONDARY_BACKUP_UPDATE);
+ Util.writeUuid(dos, newSecondary);
+ } else {
+ dos.writeByte(SECONDARY_BACKUP_NULL);
+ }
+ }
+
/**
* Supports the '/cmd/dumpBuckets' REST message -- this isn't part of
* a 'rebalance' operation, but it turned out to be a convenient way
@@ -2304,22 +2351,7 @@ public class Bucket {
message = messages.poll();
if (message == null) {
// no messages left
- if (state == this) {
- if (owner == Server.getThisServer()) {
- // we can now exit the state
- state = null;
- stateChanged();
- } else {
- /*
- * We need a grace period before we can
- * remove the 'state' value (this can happen
- * if we receive and process the bulk data
- * before receiving official confirmation
- * that we are owner of the bucket.
- */
- messages = null;
- }
- }
+ noMoreMessages();
break;
}
}
@@ -2330,25 +2362,48 @@ public class Bucket {
}
if (messages == null) {
// this indicates we need to enter a grace period before cleanup,
- try {
- logger.info("{}: entering grace period before terminating",
- this);
- Thread.sleep(unconfirmedGracePeriod);
- } catch (InterruptedException e) {
- // we are exiting in any case
- Thread.currentThread().interrupt();
- } finally {
- synchronized (Bucket.this) {
- // Do we need to confirm that we really are the owner?
- // What does it mean if we are not?
- if (state == this) {
- state = null;
- stateChanged();
- }
+ sleepBeforeCleanup();
+ }
+ logger.info("{}: exiting cleanup state", this);
+ }
+
+ private void noMoreMessages() {
+ if (state == this) {
+ if (owner == Server.getThisServer()) {
+ // we can now exit the state
+ state = null;
+ stateChanged();
+ } else {
+ /*
+ * We need a grace period before we can
+ * remove the 'state' value (this can happen
+ * if we receive and process the bulk data
+ * before receiving official confirmation
+ * that we are owner of the bucket.
+ */
+ messages = null;
+ }
+ }
+ }
+
+ private void sleepBeforeCleanup() {
+ try {
+ logger.info("{}: entering grace period before terminating",
+ this);
+ Thread.sleep(unconfirmedGracePeriod);
+ } catch (InterruptedException e) {
+ // we are exiting in any case
+ Thread.currentThread().interrupt();
+ } finally {
+ synchronized (Bucket.this) {
+ // Do we need to confirm that we really are the owner?
+ // What does it mean if we are not?
+ if (state == this) {
+ state = null;
+ stateChanged();
}
}
}
- logger.info("{}: exiting cleanup state", this);
}
/**