diff options
Diffstat (limited to 'feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java')
-rw-r--r-- | feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java | 255 |
1 files changed, 155 insertions, 100 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java index b82f2e1d..6241a297 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java @@ -360,63 +360,31 @@ public class Bucket { case OWNER_NULL: // <OWNER_NULL> -- owner UUID should be set to 'null' - if (bucket.getOwner() != null) { - logger.info("Bucket {} owner: {}->null", - index, bucket.getOwner()); - bucketChanges = true; - synchronized (bucket) { - bucket.setOwner(null); - bucket.setState(null); - } - } + bucketChanges = nullifyOwner(index, bucket, bucketChanges); break; case PRIMARY_BACKUP_UPDATE: // <PRIMARY_BACKUP_UPDATE> <primary-backup-uuid> -- // primary backup UUID specified - Server newPrimaryBackup = - Server.getServer(Util.readUuid(dis)); - if (bucket.primaryBackup != newPrimaryBackup) { - logger.info("Bucket {} primary backup: {}->{}", index, - bucket.primaryBackup, newPrimaryBackup); - bucketChanges = true; - bucket.primaryBackup = newPrimaryBackup; - } + bucketChanges = updatePrimaryBackup(dis, index, bucket, bucketChanges); break; case PRIMARY_BACKUP_NULL: // <PRIMARY_BACKUP_NULL> -- // primary backup should be set to 'null' - if (bucket.primaryBackup != null) { - logger.info("Bucket {} primary backup: {}->null", - index, bucket.primaryBackup); - bucketChanges = true; - bucket.primaryBackup = null; - } + bucketChanges = nullifyPrimaryBackup(index, bucket, bucketChanges); break; case SECONDARY_BACKUP_UPDATE: // <SECONDARY_BACKUP_UPDATE> <secondary-backup-uuid> -- // secondary backup UUID specified - Server newSecondaryBackup = - Server.getServer(Util.readUuid(dis)); - if (bucket.secondaryBackup != newSecondaryBackup) { - logger.info("Bucket {} secondary backup: {}->{}", index, - bucket.secondaryBackup, newSecondaryBackup); - bucketChanges = true; - bucket.secondaryBackup = newSecondaryBackup; - } + bucketChanges = updateSecondaryBackup(dis, index, bucket, bucketChanges); break; case SECONDARY_BACKUP_NULL: // <SECONDARY_BACKUP_NULL> -- // secondary backup should be set to 'null' - if (bucket.secondaryBackup != null) { - logger.info("Bucket {} secondary backup: {}->null", - index, bucket.secondaryBackup); - bucketChanges = true; - bucket.secondaryBackup = null; - } + bucketChanges = nullifySecondaryBackup(index, bucket, bucketChanges); break; default: @@ -433,6 +401,65 @@ public class Bucket { return changes; } + private static boolean nullifyOwner(int index, Bucket bucket, boolean bucketChanges) { + if (bucket.getOwner() != null) { + logger.info("Bucket {} owner: {}->null", + index, bucket.getOwner()); + bucketChanges = true; + synchronized (bucket) { + bucket.setOwner(null); + bucket.setState(null); + } + } + return bucketChanges; + } + + private static boolean updatePrimaryBackup(DataInputStream dis, int index, Bucket bucket, boolean bucketChanges) + throws IOException { + Server newPrimaryBackup = + Server.getServer(Util.readUuid(dis)); + if (bucket.primaryBackup != newPrimaryBackup) { + logger.info("Bucket {} primary backup: {}->{}", index, + bucket.primaryBackup, newPrimaryBackup); + bucketChanges = true; + bucket.primaryBackup = newPrimaryBackup; + } + return bucketChanges; + } + + private static boolean nullifyPrimaryBackup(int index, Bucket bucket, boolean bucketChanges) { + if (bucket.primaryBackup != null) { + logger.info("Bucket {} primary backup: {}->null", + index, bucket.primaryBackup); + bucketChanges = true; + bucket.primaryBackup = null; + } + return bucketChanges; + } + + private static boolean updateSecondaryBackup(DataInputStream dis, int index, Bucket bucket, boolean bucketChanges) + throws IOException { + Server newSecondaryBackup = + Server.getServer(Util.readUuid(dis)); + if (bucket.secondaryBackup != newSecondaryBackup) { + logger.info("Bucket {} secondary backup: {}->{}", index, + bucket.secondaryBackup, newSecondaryBackup); + bucketChanges = true; + bucket.secondaryBackup = newSecondaryBackup; + } + return bucketChanges; + } + + private static boolean nullifySecondaryBackup(int index, Bucket bucket, boolean bucketChanges) { + if (bucket.secondaryBackup != null) { + logger.info("Bucket {} secondary backup: {}->null", + index, bucket.secondaryBackup); + bucketChanges = true; + bucket.secondaryBackup = null; + } + return bucketChanges; + } + /** * Update bucket owner information. * @@ -768,19 +795,7 @@ public class Bucket { out.println("Moving bucket " + bucketNumber + " from " + oldHost + " to " + newHost); - /* - * update the owner, and ensure that the primary and secondary backup - * remain different from the owner. - */ - bucket.setOwner(newHost); - if (newHost == bucket.primaryBackup) { - out.println("Moving primary back from " + newHost + " to " + oldHost); - bucket.setPrimaryBackup(oldHost); - } else if (newHost == bucket.secondaryBackup) { - out.println("Moving secondary back from " + newHost - + " to " + oldHost); - bucket.setSecondaryBackup(oldHost); - } + updateOwner(out, bucket, oldHost, newHost); try { /* @@ -796,6 +811,22 @@ public class Bucket { } } + private static void updateOwner(PrintStream out, TestBucket bucket, TestServer oldHost, TestServer newHost) { + /* + * update the owner, and ensure that the primary and secondary backup + * remain different from the owner. + */ + bucket.setOwner(newHost); + if (newHost == bucket.primaryBackup) { + out.println("Moving primary back from " + newHost + " to " + oldHost); + bucket.setPrimaryBackup(oldHost); + } else if (newHost == bucket.secondaryBackup) { + out.println("Moving secondary back from " + newHost + + " to " + oldHost); + bucket.setSecondaryBackup(oldHost); + } + } + /** * This method is called when an incoming /bucket/sessionData message is * received from the old owner of the bucket, which presumably means that @@ -1485,6 +1516,10 @@ public class Bucket { return; } + makeTestBucket(bucketSnapshot); + } + + private void makeTestBucket(final Bucket[] bucketSnapshot) { /* * Now, create a 'TestBucket' table that mirrors the 'Bucket' table. * Unlike the standard 'Bucket' and 'Server' tables, the 'TestServer' @@ -1796,28 +1831,13 @@ public class Bucket { dos.writeShort(i); // 'owner' field - if (newOwner != null) { - dos.writeByte(OWNER_UPDATE); - Util.writeUuid(dos, newOwner); - } else { - dos.writeByte(OWNER_NULL); - } + writeOwner(dos, newOwner); // 'primaryBackup' field - if (newPrimary != null) { - dos.writeByte(PRIMARY_BACKUP_UPDATE); - Util.writeUuid(dos, newPrimary); - } else { - dos.writeByte(PRIMARY_BACKUP_NULL); - } + writePrimary(dos, newPrimary); // 'secondaryBackup' field - if (newSecondary != null) { - dos.writeByte(SECONDARY_BACKUP_UPDATE); - Util.writeUuid(dos, newSecondary); - } else { - dos.writeByte(SECONDARY_BACKUP_NULL); - } + writeSecondary(dos, newSecondary); dos.writeByte(END_OF_PARAMETERS_TAG); } @@ -1851,6 +1871,33 @@ public class Bucket { MainLoop.queueWork(task); } + private void writeOwner(DataOutputStream dos, UUID newOwner) throws IOException { + if (newOwner != null) { + dos.writeByte(OWNER_UPDATE); + Util.writeUuid(dos, newOwner); + } else { + dos.writeByte(OWNER_NULL); + } + } + + private void writePrimary(DataOutputStream dos, UUID newPrimary) throws IOException { + if (newPrimary != null) { + dos.writeByte(PRIMARY_BACKUP_UPDATE); + Util.writeUuid(dos, newPrimary); + } else { + dos.writeByte(PRIMARY_BACKUP_NULL); + } + } + + private void writeSecondary(DataOutputStream dos, UUID newSecondary) throws IOException { + if (newSecondary != null) { + dos.writeByte(SECONDARY_BACKUP_UPDATE); + Util.writeUuid(dos, newSecondary); + } else { + dos.writeByte(SECONDARY_BACKUP_NULL); + } + } + /** * Supports the '/cmd/dumpBuckets' REST message -- this isn't part of * a 'rebalance' operation, but it turned out to be a convenient way @@ -2304,22 +2351,7 @@ public class Bucket { message = messages.poll(); if (message == null) { // no messages left - if (state == this) { - if (owner == Server.getThisServer()) { - // we can now exit the state - state = null; - stateChanged(); - } else { - /* - * We need a grace period before we can - * remove the 'state' value (this can happen - * if we receive and process the bulk data - * before receiving official confirmation - * that we are owner of the bucket. - */ - messages = null; - } - } + noMoreMessages(); break; } } @@ -2330,25 +2362,48 @@ public class Bucket { } if (messages == null) { // this indicates we need to enter a grace period before cleanup, - try { - logger.info("{}: entering grace period before terminating", - this); - Thread.sleep(unconfirmedGracePeriod); - } catch (InterruptedException e) { - // we are exiting in any case - Thread.currentThread().interrupt(); - } finally { - synchronized (Bucket.this) { - // Do we need to confirm that we really are the owner? - // What does it mean if we are not? - if (state == this) { - state = null; - stateChanged(); - } + sleepBeforeCleanup(); + } + logger.info("{}: exiting cleanup state", this); + } + + private void noMoreMessages() { + if (state == this) { + if (owner == Server.getThisServer()) { + // we can now exit the state + state = null; + stateChanged(); + } else { + /* + * We need a grace period before we can + * remove the 'state' value (this can happen + * if we receive and process the bulk data + * before receiving official confirmation + * that we are owner of the bucket. + */ + messages = null; + } + } + } + + private void sleepBeforeCleanup() { + try { + logger.info("{}: entering grace period before terminating", + this); + Thread.sleep(unconfirmedGracePeriod); + } catch (InterruptedException e) { + // we are exiting in any case + Thread.currentThread().interrupt(); + } finally { + synchronized (Bucket.this) { + // Do we need to confirm that we really are the owner? + // What does it mean if we are not? + if (state == this) { + state = null; + stateChanged(); } } } - logger.info("{}: exiting cleanup state", this); } /** |