diff options
Diffstat (limited to 'feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java')
-rw-r--r-- | feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java | 429 |
1 files changed, 216 insertions, 213 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java index 2236506e..b949134f 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java @@ -174,7 +174,13 @@ public class Bucket { private volatile State state = null; // storage for additional data - private Map<Class<?>, Object> adjuncts = new HashMap<Class<?>, Object>(); + private Map<Class<?>, Object> adjuncts = new HashMap<>(); + + // HTTP query parameters + private static final String QP_BUCKET = "bucket"; + private static final String QP_KEYWORD = "keyword"; + private static final String QP_DEST = "dest"; + private static final String QP_TTL = "ttl"; // BACKUP data (only buckets for where we are the owner, or a backup) @@ -287,92 +293,6 @@ public class Bucket { } /** - * This method is called to start a 'rebalance' operation in a background - * thread, but it only does this on the lead server. Being balanced means - * the following: - * 1) Each server owns approximately the same number of buckets - * 2) If any server were to fail, and the designated primaries take over - * for all of that server's buckets, all remaining servers would still - * own approximately the same number of buckets. - * 3) If any two servers were to fail, and the designated primaries were - * to take over for the failed server's buckets (secondaries would take - * for buckets where the owner and primary are OOS), all remaining - * servers would still own approximately the same number of buckets. - * 4) Each server should have approximately the same number of - * (primary-backup + secondary-backup) buckets that it is responsible for. - * 5) The primary backup for each bucket must be on the same site as the - * owner, and the secondary backup must be on a different site. - */ - private static void rebalance() { - if (Leader.getLeader() == Server.getThisServer()) { - Rebalance rb = new Rebalance(); - synchronized (rebalanceLock) { - // the most recent 'Rebalance' instance is the only valid one - rebalance = rb; - } - - new Thread("BUCKET REBALANCER") { - @Override - public void run() { - /* - * copy bucket and host data, - * generating a temporary internal table. - */ - rb.copyData(); - - /* - * allocate owners for all buckets without an owner, - * and rebalance bucket owners, if necessary -- - * this takes card of item #1, above. - */ - rb.allocateBuckets(); - - /* - * make sure that primary backups always have the same site - * as the owner, and secondary backups always have a different - * site -- this takes care of #5, above. - */ - rb.checkSiteValues(); - - /* - * adjust primary backup lists to take care of item #2, above - * (taking #5 into account). - */ - rb.rebalancePrimaryBackups(); - - /* - * allocate secondary backups, and take care of items - * #3 and #4, above (taking #5 into account). - */ - rb.rebalanceSecondaryBackups(); - - try { - synchronized (rebalanceLock) { - /* - * if another 'Rebalance' instance has started in the - * mean time, don't do the update. - */ - if (rebalance == rb) { - /* - * build a message containing all of the updated bucket - * information, process it internally in this host - * (lead server), and send it out to others in the - * "notify list". - */ - rb.generateBucketMessage(); - rebalance = null; - } - } - } catch (IOException e) { - logger.error("Exception in Rebalance.generateBucketMessage", - e); - } - } - }.start(); - } - } - - /** * Handle an incoming /bucket/update REST message. * * @param data base64-encoded data, containing all bucket updates @@ -433,13 +353,12 @@ public class Bucket { int tag; while ((tag = dis.readUnsignedByte()) != END_OF_PARAMETERS_TAG) { switch (tag) { - case OWNER_UPDATE: { + case OWNER_UPDATE: // <OWNER_UPDATE> <owner-uuid> -- owner UUID specified bucketChanges = updateBucketInternalOwnerUpdate(bucket, dis, index); break; - } - case OWNER_NULL: { + case OWNER_NULL: // <OWNER_NULL> -- owner UUID should be set to 'null' if (bucket.getOwner() != null) { logger.info("Bucket {} owner: {}->null", @@ -451,9 +370,8 @@ public class Bucket { } } break; - } - case PRIMARY_BACKUP_UPDATE: { + case PRIMARY_BACKUP_UPDATE: // <PRIMARY_BACKUP_UPDATE> <primary-backup-uuid> -- // primary backup UUID specified Server newPrimaryBackup = @@ -465,9 +383,8 @@ public class Bucket { bucket.primaryBackup = newPrimaryBackup; } break; - } - case PRIMARY_BACKUP_NULL: { + case PRIMARY_BACKUP_NULL: // <PRIMARY_BACKUP_NULL> -- // primary backup should be set to 'null' if (bucket.primaryBackup != null) { @@ -477,9 +394,8 @@ public class Bucket { bucket.primaryBackup = null; } break; - } - case SECONDARY_BACKUP_UPDATE: { + case SECONDARY_BACKUP_UPDATE: // <SECONDARY_BACKUP_UPDATE> <secondary-backup-uuid> -- // secondary backup UUID specified Server newSecondaryBackup = @@ -491,9 +407,8 @@ public class Bucket { bucket.secondaryBackup = newSecondaryBackup; } break; - } - case SECONDARY_BACKUP_NULL: { + case SECONDARY_BACKUP_NULL: // <SECONDARY_BACKUP_NULL> -- // secondary backup should be set to 'null' if (bucket.secondaryBackup != null) { @@ -503,7 +418,6 @@ public class Bucket { bucket.secondaryBackup = null; } break; - } default: logger.error("Illegal tag: {}", tag); @@ -550,7 +464,6 @@ public class Bucket { bucket.state = bucket.new NewOwner(true, oldOwner); } else { // new owner has been confirmed - // orig bucket.state.newOwner(); bucket.state.newOwner(); } } @@ -685,22 +598,23 @@ public class Bucket { * selected bucket has no server assigned -- this should only be a * transient situation, until 'rebalance' is run. */ - out.println("Bucket is " + bucketNumber + ", which has no owner"); + out.format("Bucket is %d, which has no owner\n", bucketNumber); } else if (server == Server.getThisServer()) { /* * the selected bucket is associated with this particular server -- * no forwarding is needed. */ - out.println("Bucket is " + bucketNumber - + ", which is owned by this server: " + server.getUuid()); + out.format("Bucket is %d, which is owned by this server: %s\n", + bucketNumber, server.getUuid()); } else { /* * the selected bucket is assigned to a different server -- forward * the message. */ - out.println("Bucket is " + bucketNumber + ": sending from\n" - + " " + Server.getThisServer().getUuid() + " to \n" - + " " + server.getUuid()); + out.format("Bucket is %d: sending from\n" + + " %s to\n" + + " %s\n", + bucketNumber, Server.getThisServer().getUuid(), server.getUuid()); // do a POST call of /bucket/bucketResponse to the remoote server Entity<String> entity = @@ -723,8 +637,8 @@ public class Bucket { // we need to include the 'bucket' and 'keyword' parameters // in the POST that we are sending out return webTarget - .queryParam("bucket", bucketNumber) - .queryParam("keyword", keyword); + .queryParam(QP_BUCKET, bucketNumber) + .queryParam(QP_KEYWORD, keyword); } /** @@ -744,12 +658,12 @@ public class Bucket { if (response == null) { out.println("Timed out waiting for a response"); } else { - out.println("Received response code " + response.getStatus()); - out.println("Entity = " + response.readEntity(String.class)); + out.format("Received response code %s\nEntity = %s\n", + response.getStatus(), response.readEntity(String.class)); } } catch (InterruptedException e) { out.println(e); - throw new IOException(e); + Thread.currentThread().interrupt(); } } } @@ -905,7 +819,8 @@ public class Bucket { Server server; WebTarget webTarget; - if ((ttl -= 1) > 0 + ttl -= 1; + if (ttl > 0 && (server = Server.getServer(dest)) != null && (webTarget = server.getWebTarget("bucket/sessionData")) != null) { logger.info("Forwarding 'bucket/sessionData' to uuid {}", @@ -915,9 +830,9 @@ public class Bucket { MediaType.APPLICATION_OCTET_STREAM_TYPE); Response response = webTarget - .queryParam("bucket", bucketNumber) - .queryParam("dest", dest) - .queryParam("ttl", String.valueOf(ttl)) + .queryParam(QP_BUCKET, bucketNumber) + .queryParam(QP_DEST, dest) + .queryParam(QP_TTL, String.valueOf(ttl)) .request().post(entity); logger.info("/bucket/sessionData response code = {}", response.getStatus()); @@ -977,23 +892,16 @@ public class Bucket { * the 'newInstance' method is unable to create the adjunct) */ public <T> T getAdjunct(Class<T> clazz) { - synchronized (adjuncts) { - // look up the adjunct in the table - Object adj = adjuncts.get(clazz); - if (adj == null) { - // lookup failed -- create one - try { - // create the adjunct (may trigger an exception) - adj = clazz.newInstance(); - - // update the table - adjuncts.put(clazz, adj); - } catch (Exception e) { - logger.error("Can't create adjunct of {}", clazz, e); - } + Object adj = adjuncts.computeIfAbsent(clazz, key -> { + try { + // create the adjunct, if needed + return clazz.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + logger.error("Can't create adjunct of {}", clazz, e); + return null; } - return clazz.cast(adj); - } + }); + return clazz.cast(adj); } /** @@ -1019,7 +927,7 @@ public class Bucket { */ public Object putAdjunct(Object adj) { synchronized (adjuncts) { - Class clazz = adj.getClass(); + Class<?> clazz = adj.getClass(); return adjuncts.put(clazz, adj); } } @@ -1148,6 +1056,92 @@ public class Bucket { // trigger a rebalance (only happens if we are the lead server) rebalance(); } + + /** + * This method is called to start a 'rebalance' operation in a background + * thread, but it only does this on the lead server. Being balanced means + * the following: + * 1) Each server owns approximately the same number of buckets + * 2) If any server were to fail, and the designated primaries take over + * for all of that server's buckets, all remaining servers would still + * own approximately the same number of buckets. + * 3) If any two servers were to fail, and the designated primaries were + * to take over for the failed server's buckets (secondaries would take + * for buckets where the owner and primary are OOS), all remaining + * servers would still own approximately the same number of buckets. + * 4) Each server should have approximately the same number of + * (primary-backup + secondary-backup) buckets that it is responsible for. + * 5) The primary backup for each bucket must be on the same site as the + * owner, and the secondary backup must be on a different site. + */ + private void rebalance() { + if (Leader.getLeader() == Server.getThisServer()) { + Rebalance rb = new Rebalance(); + synchronized (rebalanceLock) { + // the most recent 'Rebalance' instance is the only valid one + rebalance = rb; + } + + new Thread("BUCKET REBALANCER") { + @Override + public void run() { + /* + * copy bucket and host data, + * generating a temporary internal table. + */ + rb.copyData(); + + /* + * allocate owners for all buckets without an owner, + * and rebalance bucket owners, if necessary -- + * this takes card of item #1, above. + */ + rb.allocateBuckets(); + + /* + * make sure that primary backups always have the same site + * as the owner, and secondary backups always have a different + * site -- this takes care of #5, above. + */ + rb.checkSiteValues(); + + /* + * adjust primary backup lists to take care of item #2, above + * (taking #5 into account). + */ + rb.rebalancePrimaryBackups(); + + /* + * allocate secondary backups, and take care of items + * #3 and #4, above (taking #5 into account). + */ + rb.rebalanceSecondaryBackups(); + + try { + synchronized (rebalanceLock) { + /* + * if another 'Rebalance' instance has started in the + * mean time, don't do the update. + */ + if (rebalance == rb) { + /* + * build a message containing all of the updated bucket + * information, process it internally in this host + * (lead server), and send it out to others in the + * "notify list". + */ + rb.generateBucketMessage(); + rebalance = null; + } + } + } catch (IOException e) { + logger.error("Exception in Rebalance.generateBucketMessage", + e); + } + } + }.start(); + } + } } /* ============================================================ */ @@ -1477,11 +1471,15 @@ public class Bucket { return 0; }; - FutureTask<Integer> ft = new FutureTask(callable); + FutureTask<Integer> ft = new FutureTask<>(callable); MainLoop.queueWork(ft); try { ft.get(60, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { + } catch (InterruptedException e) { + logger.error("Interrupted", e); + Thread.currentThread().interrupt(); + return; + } catch (ExecutionException | TimeoutException e) { logger.error("Exception in Rebalance.copyData", e); return; } @@ -1534,7 +1532,7 @@ public class Bucket { * 'needBuckets' TreeSet: those with the fewest buckets allocated are * at the head of the list. */ - Comparator<TestServer> bucketCount = new Comparator<TestServer>() { + Comparator<TestServer> bucketCount = new Comparator<>() { @Override public int compare(TestServer s1, TestServer s2) { int rval = s1.buckets.size() - s2.buckets.size(); @@ -1662,8 +1660,7 @@ public class Bucket { // populate a 'TreeSet' of 'AdjustedTestServer' instances based // the failure of 'failedServer' - TreeSet<AdjustedTestServer> adjustedTestServers = - new TreeSet<AdjustedTestServer>(); + TreeSet<AdjustedTestServer> adjustedTestServers = new TreeSet<>(); for (TestServer server : testServers.values()) { if (server == failedServer || !Objects.equals(siteSocketAddress, @@ -1943,7 +1940,7 @@ public class Bucket { int size = buckets.size(); if (size != 0) { // generate a linked list of the bucket data to display - LinkedList<String> data = new LinkedList<String>(); + LinkedList<String> data = new LinkedList<>(); StringBuilder sb = new StringBuilder(); int count = 8; @@ -1956,7 +1953,8 @@ public class Bucket { // add the bucket number sb.append(String.format("%4s", bucket.index)); - if ((count -= 1) <= 0) { + count -= 1; + if (count <= 0) { // filled up a row -- // add it to the list, and start a new line data.add(sb.toString()); @@ -2109,7 +2107,7 @@ public class Bucket { // when 'System.currentTimeMillis()' reaches this value, we time out long endTime; - // If not 'null', we are queueing messages for this bucket; + // If not 'null', we are queueing messages for this bucket // otherwise, we are sending them through. Queue<Message> messages = new ConcurrentLinkedQueue<>(); @@ -2287,66 +2285,70 @@ public class Bucket { } catch (Exception e) { logger.error("Exception in {}", this, e); } finally { - /* - * cleanly leave state -- we want to make sure that messages - * are processed in order, so the queue needs to remain until - * it is empty - */ - logger.info("{}: entering cleanup state", this); - for ( ; ; ) { - Message message = messages.poll(); - if (message == null) { - // no messages left, but this could change - synchronized (Bucket.this) { - message = messages.poll(); - if (message == null) { - // no messages left - if (state == this) { - if (owner == Server.getThisServer()) { - // we can now exit the state - state = null; - stateChanged(); - } else { - /* - * We need a grace period before we can - * remove the 'state' value (this can happen - * if we receive and process the bulk data - * before receiving official confirmation - * that we are owner of the bucket. - */ - messages = null; - } + run_cleanup(); + } + } + + private void run_cleanup() { + /* + * cleanly leave state -- we want to make sure that messages + * are processed in order, so the queue needs to remain until + * it is empty + */ + logger.info("{}: entering cleanup state", this); + for ( ; ; ) { + Message message = messages.poll(); + if (message == null) { + // no messages left, but this could change + synchronized (Bucket.this) { + message = messages.poll(); + if (message == null) { + // no messages left + if (state == this) { + if (owner == Server.getThisServer()) { + // we can now exit the state + state = null; + stateChanged(); + } else { + /* + * We need a grace period before we can + * remove the 'state' value (this can happen + * if we receive and process the bulk data + * before receiving official confirmation + * that we are owner of the bucket. + */ + messages = null; } - break; } + break; } } - // this doesn't work -- it ends up right back in the queue - // if 'messages' is defined - message.process(); } - if (messages == null) { - // this indicates we need to enter a grace period before cleanup, - try { - logger.info("{}: entering grace period before terminating", - this); - Thread.sleep(unconfirmedGracePeriod); - } catch (InterruptedException e) { - // we are exiting in any case - Thread.currentThread().interrupt(); - } finally { - synchronized (Bucket.this) { - // Do we need to confirm that we really are the owner? - // What does it mean if we are not? - if (state == this) { - state = null; - stateChanged(); - } + // this doesn't work -- it ends up right back in the queue + // if 'messages' is defined + message.process(); + } + if (messages == null) { + // this indicates we need to enter a grace period before cleanup, + try { + logger.info("{}: entering grace period before terminating", + this); + Thread.sleep(unconfirmedGracePeriod); + } catch (InterruptedException e) { + // we are exiting in any case + Thread.currentThread().interrupt(); + } finally { + synchronized (Bucket.this) { + // Do we need to confirm that we really are the owner? + // What does it mean if we are not? + if (state == this) { + state = null; + stateChanged(); } } } - logger.info("{}: exiting cleanup state", this); } + logger.info("{}: exiting cleanup state", this); } /** @@ -2357,31 +2359,32 @@ public class Bucket { public String toString() { return "Bucket.NewOwner(" + index + ")"; } - } - /** - * Restore bucket data. - * - * @param obj deserialized bucket data - */ - private void restoreBucketData(Object obj) { - if (obj instanceof List) { - for (Object entry : (List<?>)obj) { - if (entry instanceof Restore) { - // entry-specific 'restore' operation - ((Restore)entry).restore(this.index); - } else { - logger.error("{}: Expected '{}' but got '{}'", - this, Restore.class.getName(), - entry.getClass().getName()); + /** + * Restore bucket data. + * + * @param obj deserialized bucket data + */ + private void restoreBucketData(Object obj) { + if (obj instanceof List) { + for (Object entry : (List<?>)obj) { + if (entry instanceof Restore) { + // entry-specific 'restore' operation + ((Restore)entry).restore(Bucket.this.index); + } else { + logger.error("{}: Expected '{}' but got '{}'", + this, Restore.class.getName(), + entry.getClass().getName()); + } } + } else { + logger.error("{}: expected 'List' but got '{}'", + this, obj.getClass().getName()); } - } else { - logger.error("{}: expected 'List' but got '{}'", - this, obj.getClass().getName()); } } + /* ============================================================ */ /** @@ -2459,9 +2462,9 @@ public class Bucket { @Override public WebTarget webTarget(WebTarget webTarget) { return webTarget - .queryParam("bucket", index) - .queryParam("dest", newOwner.getUuid()) - .queryParam("ttl", timeToLive); + .queryParam(QP_BUCKET, index) + .queryParam(QP_DEST, newOwner.getUuid()) + .queryParam(QP_TTL, timeToLive); } @Override |