summaryrefslogtreecommitdiffstats
path: root/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java')
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java429
1 files changed, 216 insertions, 213 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
index 2236506e..b949134f 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
@@ -174,7 +174,13 @@ public class Bucket {
private volatile State state = null;
// storage for additional data
- private Map<Class<?>, Object> adjuncts = new HashMap<Class<?>, Object>();
+ private Map<Class<?>, Object> adjuncts = new HashMap<>();
+
+ // HTTP query parameters
+ private static final String QP_BUCKET = "bucket";
+ private static final String QP_KEYWORD = "keyword";
+ private static final String QP_DEST = "dest";
+ private static final String QP_TTL = "ttl";
// BACKUP data (only buckets for where we are the owner, or a backup)
@@ -287,92 +293,6 @@ public class Bucket {
}
/**
- * This method is called to start a 'rebalance' operation in a background
- * thread, but it only does this on the lead server. Being balanced means
- * the following:
- * 1) Each server owns approximately the same number of buckets
- * 2) If any server were to fail, and the designated primaries take over
- * for all of that server's buckets, all remaining servers would still
- * own approximately the same number of buckets.
- * 3) If any two servers were to fail, and the designated primaries were
- * to take over for the failed server's buckets (secondaries would take
- * for buckets where the owner and primary are OOS), all remaining
- * servers would still own approximately the same number of buckets.
- * 4) Each server should have approximately the same number of
- * (primary-backup + secondary-backup) buckets that it is responsible for.
- * 5) The primary backup for each bucket must be on the same site as the
- * owner, and the secondary backup must be on a different site.
- */
- private static void rebalance() {
- if (Leader.getLeader() == Server.getThisServer()) {
- Rebalance rb = new Rebalance();
- synchronized (rebalanceLock) {
- // the most recent 'Rebalance' instance is the only valid one
- rebalance = rb;
- }
-
- new Thread("BUCKET REBALANCER") {
- @Override
- public void run() {
- /*
- * copy bucket and host data,
- * generating a temporary internal table.
- */
- rb.copyData();
-
- /*
- * allocate owners for all buckets without an owner,
- * and rebalance bucket owners, if necessary --
- * this takes card of item #1, above.
- */
- rb.allocateBuckets();
-
- /*
- * make sure that primary backups always have the same site
- * as the owner, and secondary backups always have a different
- * site -- this takes care of #5, above.
- */
- rb.checkSiteValues();
-
- /*
- * adjust primary backup lists to take care of item #2, above
- * (taking #5 into account).
- */
- rb.rebalancePrimaryBackups();
-
- /*
- * allocate secondary backups, and take care of items
- * #3 and #4, above (taking #5 into account).
- */
- rb.rebalanceSecondaryBackups();
-
- try {
- synchronized (rebalanceLock) {
- /*
- * if another 'Rebalance' instance has started in the
- * mean time, don't do the update.
- */
- if (rebalance == rb) {
- /*
- * build a message containing all of the updated bucket
- * information, process it internally in this host
- * (lead server), and send it out to others in the
- * "notify list".
- */
- rb.generateBucketMessage();
- rebalance = null;
- }
- }
- } catch (IOException e) {
- logger.error("Exception in Rebalance.generateBucketMessage",
- e);
- }
- }
- }.start();
- }
- }
-
- /**
* Handle an incoming /bucket/update REST message.
*
* @param data base64-encoded data, containing all bucket updates
@@ -433,13 +353,12 @@ public class Bucket {
int tag;
while ((tag = dis.readUnsignedByte()) != END_OF_PARAMETERS_TAG) {
switch (tag) {
- case OWNER_UPDATE: {
+ case OWNER_UPDATE:
// <OWNER_UPDATE> <owner-uuid> -- owner UUID specified
bucketChanges = updateBucketInternalOwnerUpdate(bucket, dis, index);
break;
- }
- case OWNER_NULL: {
+ case OWNER_NULL:
// <OWNER_NULL> -- owner UUID should be set to 'null'
if (bucket.getOwner() != null) {
logger.info("Bucket {} owner: {}->null",
@@ -451,9 +370,8 @@ public class Bucket {
}
}
break;
- }
- case PRIMARY_BACKUP_UPDATE: {
+ case PRIMARY_BACKUP_UPDATE:
// <PRIMARY_BACKUP_UPDATE> <primary-backup-uuid> --
// primary backup UUID specified
Server newPrimaryBackup =
@@ -465,9 +383,8 @@ public class Bucket {
bucket.primaryBackup = newPrimaryBackup;
}
break;
- }
- case PRIMARY_BACKUP_NULL: {
+ case PRIMARY_BACKUP_NULL:
// <PRIMARY_BACKUP_NULL> --
// primary backup should be set to 'null'
if (bucket.primaryBackup != null) {
@@ -477,9 +394,8 @@ public class Bucket {
bucket.primaryBackup = null;
}
break;
- }
- case SECONDARY_BACKUP_UPDATE: {
+ case SECONDARY_BACKUP_UPDATE:
// <SECONDARY_BACKUP_UPDATE> <secondary-backup-uuid> --
// secondary backup UUID specified
Server newSecondaryBackup =
@@ -491,9 +407,8 @@ public class Bucket {
bucket.secondaryBackup = newSecondaryBackup;
}
break;
- }
- case SECONDARY_BACKUP_NULL: {
+ case SECONDARY_BACKUP_NULL:
// <SECONDARY_BACKUP_NULL> --
// secondary backup should be set to 'null'
if (bucket.secondaryBackup != null) {
@@ -503,7 +418,6 @@ public class Bucket {
bucket.secondaryBackup = null;
}
break;
- }
default:
logger.error("Illegal tag: {}", tag);
@@ -550,7 +464,6 @@ public class Bucket {
bucket.state = bucket.new NewOwner(true, oldOwner);
} else {
// new owner has been confirmed
- // orig bucket.state.newOwner();
bucket.state.newOwner();
}
}
@@ -685,22 +598,23 @@ public class Bucket {
* selected bucket has no server assigned -- this should only be a
* transient situation, until 'rebalance' is run.
*/
- out.println("Bucket is " + bucketNumber + ", which has no owner");
+ out.format("Bucket is %d, which has no owner\n", bucketNumber);
} else if (server == Server.getThisServer()) {
/*
* the selected bucket is associated with this particular server --
* no forwarding is needed.
*/
- out.println("Bucket is " + bucketNumber
- + ", which is owned by this server: " + server.getUuid());
+ out.format("Bucket is %d, which is owned by this server: %s\n",
+ bucketNumber, server.getUuid());
} else {
/*
* the selected bucket is assigned to a different server -- forward
* the message.
*/
- out.println("Bucket is " + bucketNumber + ": sending from\n"
- + " " + Server.getThisServer().getUuid() + " to \n"
- + " " + server.getUuid());
+ out.format("Bucket is %d: sending from\n"
+ + " %s to\n"
+ + " %s\n",
+ bucketNumber, Server.getThisServer().getUuid(), server.getUuid());
// do a POST call of /bucket/bucketResponse to the remoote server
Entity<String> entity =
@@ -723,8 +637,8 @@ public class Bucket {
// we need to include the 'bucket' and 'keyword' parameters
// in the POST that we are sending out
return webTarget
- .queryParam("bucket", bucketNumber)
- .queryParam("keyword", keyword);
+ .queryParam(QP_BUCKET, bucketNumber)
+ .queryParam(QP_KEYWORD, keyword);
}
/**
@@ -744,12 +658,12 @@ public class Bucket {
if (response == null) {
out.println("Timed out waiting for a response");
} else {
- out.println("Received response code " + response.getStatus());
- out.println("Entity = " + response.readEntity(String.class));
+ out.format("Received response code %s\nEntity = %s\n",
+ response.getStatus(), response.readEntity(String.class));
}
} catch (InterruptedException e) {
out.println(e);
- throw new IOException(e);
+ Thread.currentThread().interrupt();
}
}
}
@@ -905,7 +819,8 @@ public class Bucket {
Server server;
WebTarget webTarget;
- if ((ttl -= 1) > 0
+ ttl -= 1;
+ if (ttl > 0
&& (server = Server.getServer(dest)) != null
&& (webTarget = server.getWebTarget("bucket/sessionData")) != null) {
logger.info("Forwarding 'bucket/sessionData' to uuid {}",
@@ -915,9 +830,9 @@ public class Bucket {
MediaType.APPLICATION_OCTET_STREAM_TYPE);
Response response =
webTarget
- .queryParam("bucket", bucketNumber)
- .queryParam("dest", dest)
- .queryParam("ttl", String.valueOf(ttl))
+ .queryParam(QP_BUCKET, bucketNumber)
+ .queryParam(QP_DEST, dest)
+ .queryParam(QP_TTL, String.valueOf(ttl))
.request().post(entity);
logger.info("/bucket/sessionData response code = {}",
response.getStatus());
@@ -977,23 +892,16 @@ public class Bucket {
* the 'newInstance' method is unable to create the adjunct)
*/
public <T> T getAdjunct(Class<T> clazz) {
- synchronized (adjuncts) {
- // look up the adjunct in the table
- Object adj = adjuncts.get(clazz);
- if (adj == null) {
- // lookup failed -- create one
- try {
- // create the adjunct (may trigger an exception)
- adj = clazz.newInstance();
-
- // update the table
- adjuncts.put(clazz, adj);
- } catch (Exception e) {
- logger.error("Can't create adjunct of {}", clazz, e);
- }
+ Object adj = adjuncts.computeIfAbsent(clazz, key -> {
+ try {
+ // create the adjunct, if needed
+ return clazz.getDeclaredConstructor().newInstance();
+ } catch (Exception e) {
+ logger.error("Can't create adjunct of {}", clazz, e);
+ return null;
}
- return clazz.cast(adj);
- }
+ });
+ return clazz.cast(adj);
}
/**
@@ -1019,7 +927,7 @@ public class Bucket {
*/
public Object putAdjunct(Object adj) {
synchronized (adjuncts) {
- Class clazz = adj.getClass();
+ Class<?> clazz = adj.getClass();
return adjuncts.put(clazz, adj);
}
}
@@ -1148,6 +1056,92 @@ public class Bucket {
// trigger a rebalance (only happens if we are the lead server)
rebalance();
}
+
+ /**
+ * This method is called to start a 'rebalance' operation in a background
+ * thread, but it only does this on the lead server. Being balanced means
+ * the following:
+ * 1) Each server owns approximately the same number of buckets
+ * 2) If any server were to fail, and the designated primaries take over
+ * for all of that server's buckets, all remaining servers would still
+ * own approximately the same number of buckets.
+ * 3) If any two servers were to fail, and the designated primaries were
+ * to take over for the failed server's buckets (secondaries would take
+ * for buckets where the owner and primary are OOS), all remaining
+ * servers would still own approximately the same number of buckets.
+ * 4) Each server should have approximately the same number of
+ * (primary-backup + secondary-backup) buckets that it is responsible for.
+ * 5) The primary backup for each bucket must be on the same site as the
+ * owner, and the secondary backup must be on a different site.
+ */
+ private void rebalance() {
+ if (Leader.getLeader() == Server.getThisServer()) {
+ Rebalance rb = new Rebalance();
+ synchronized (rebalanceLock) {
+ // the most recent 'Rebalance' instance is the only valid one
+ rebalance = rb;
+ }
+
+ new Thread("BUCKET REBALANCER") {
+ @Override
+ public void run() {
+ /*
+ * copy bucket and host data,
+ * generating a temporary internal table.
+ */
+ rb.copyData();
+
+ /*
+ * allocate owners for all buckets without an owner,
+ * and rebalance bucket owners, if necessary --
+ * this takes card of item #1, above.
+ */
+ rb.allocateBuckets();
+
+ /*
+ * make sure that primary backups always have the same site
+ * as the owner, and secondary backups always have a different
+ * site -- this takes care of #5, above.
+ */
+ rb.checkSiteValues();
+
+ /*
+ * adjust primary backup lists to take care of item #2, above
+ * (taking #5 into account).
+ */
+ rb.rebalancePrimaryBackups();
+
+ /*
+ * allocate secondary backups, and take care of items
+ * #3 and #4, above (taking #5 into account).
+ */
+ rb.rebalanceSecondaryBackups();
+
+ try {
+ synchronized (rebalanceLock) {
+ /*
+ * if another 'Rebalance' instance has started in the
+ * mean time, don't do the update.
+ */
+ if (rebalance == rb) {
+ /*
+ * build a message containing all of the updated bucket
+ * information, process it internally in this host
+ * (lead server), and send it out to others in the
+ * "notify list".
+ */
+ rb.generateBucketMessage();
+ rebalance = null;
+ }
+ }
+ } catch (IOException e) {
+ logger.error("Exception in Rebalance.generateBucketMessage",
+ e);
+ }
+ }
+ }.start();
+ }
+ }
}
/* ============================================================ */
@@ -1477,11 +1471,15 @@ public class Bucket {
return 0;
};
- FutureTask<Integer> ft = new FutureTask(callable);
+ FutureTask<Integer> ft = new FutureTask<>(callable);
MainLoop.queueWork(ft);
try {
ft.get(60, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ } catch (InterruptedException e) {
+ logger.error("Interrupted", e);
+ Thread.currentThread().interrupt();
+ return;
+ } catch (ExecutionException | TimeoutException e) {
logger.error("Exception in Rebalance.copyData", e);
return;
}
@@ -1534,7 +1532,7 @@ public class Bucket {
* 'needBuckets' TreeSet: those with the fewest buckets allocated are
* at the head of the list.
*/
- Comparator<TestServer> bucketCount = new Comparator<TestServer>() {
+ Comparator<TestServer> bucketCount = new Comparator<>() {
@Override
public int compare(TestServer s1, TestServer s2) {
int rval = s1.buckets.size() - s2.buckets.size();
@@ -1662,8 +1660,7 @@ public class Bucket {
// populate a 'TreeSet' of 'AdjustedTestServer' instances based
// the failure of 'failedServer'
- TreeSet<AdjustedTestServer> adjustedTestServers =
- new TreeSet<AdjustedTestServer>();
+ TreeSet<AdjustedTestServer> adjustedTestServers = new TreeSet<>();
for (TestServer server : testServers.values()) {
if (server == failedServer
|| !Objects.equals(siteSocketAddress,
@@ -1943,7 +1940,7 @@ public class Bucket {
int size = buckets.size();
if (size != 0) {
// generate a linked list of the bucket data to display
- LinkedList<String> data = new LinkedList<String>();
+ LinkedList<String> data = new LinkedList<>();
StringBuilder sb = new StringBuilder();
int count = 8;
@@ -1956,7 +1953,8 @@ public class Bucket {
// add the bucket number
sb.append(String.format("%4s", bucket.index));
- if ((count -= 1) <= 0) {
+ count -= 1;
+ if (count <= 0) {
// filled up a row --
// add it to the list, and start a new line
data.add(sb.toString());
@@ -2109,7 +2107,7 @@ public class Bucket {
// when 'System.currentTimeMillis()' reaches this value, we time out
long endTime;
- // If not 'null', we are queueing messages for this bucket;
+ // If not 'null', we are queueing messages for this bucket
// otherwise, we are sending them through.
Queue<Message> messages = new ConcurrentLinkedQueue<>();
@@ -2287,66 +2285,70 @@ public class Bucket {
} catch (Exception e) {
logger.error("Exception in {}", this, e);
} finally {
- /*
- * cleanly leave state -- we want to make sure that messages
- * are processed in order, so the queue needs to remain until
- * it is empty
- */
- logger.info("{}: entering cleanup state", this);
- for ( ; ; ) {
- Message message = messages.poll();
- if (message == null) {
- // no messages left, but this could change
- synchronized (Bucket.this) {
- message = messages.poll();
- if (message == null) {
- // no messages left
- if (state == this) {
- if (owner == Server.getThisServer()) {
- // we can now exit the state
- state = null;
- stateChanged();
- } else {
- /*
- * We need a grace period before we can
- * remove the 'state' value (this can happen
- * if we receive and process the bulk data
- * before receiving official confirmation
- * that we are owner of the bucket.
- */
- messages = null;
- }
+ run_cleanup();
+ }
+ }
+
+ private void run_cleanup() {
+ /*
+ * cleanly leave state -- we want to make sure that messages
+ * are processed in order, so the queue needs to remain until
+ * it is empty
+ */
+ logger.info("{}: entering cleanup state", this);
+ for ( ; ; ) {
+ Message message = messages.poll();
+ if (message == null) {
+ // no messages left, but this could change
+ synchronized (Bucket.this) {
+ message = messages.poll();
+ if (message == null) {
+ // no messages left
+ if (state == this) {
+ if (owner == Server.getThisServer()) {
+ // we can now exit the state
+ state = null;
+ stateChanged();
+ } else {
+ /*
+ * We need a grace period before we can
+ * remove the 'state' value (this can happen
+ * if we receive and process the bulk data
+ * before receiving official confirmation
+ * that we are owner of the bucket.
+ */
+ messages = null;
}
- break;
}
+ break;
}
}
- // this doesn't work -- it ends up right back in the queue
- // if 'messages' is defined
- message.process();
}
- if (messages == null) {
- // this indicates we need to enter a grace period before cleanup,
- try {
- logger.info("{}: entering grace period before terminating",
- this);
- Thread.sleep(unconfirmedGracePeriod);
- } catch (InterruptedException e) {
- // we are exiting in any case
- Thread.currentThread().interrupt();
- } finally {
- synchronized (Bucket.this) {
- // Do we need to confirm that we really are the owner?
- // What does it mean if we are not?
- if (state == this) {
- state = null;
- stateChanged();
- }
+ // this doesn't work -- it ends up right back in the queue
+ // if 'messages' is defined
+ message.process();
+ }
+ if (messages == null) {
+ // this indicates we need to enter a grace period before cleanup,
+ try {
+ logger.info("{}: entering grace period before terminating",
+ this);
+ Thread.sleep(unconfirmedGracePeriod);
+ } catch (InterruptedException e) {
+ // we are exiting in any case
+ Thread.currentThread().interrupt();
+ } finally {
+ synchronized (Bucket.this) {
+ // Do we need to confirm that we really are the owner?
+ // What does it mean if we are not?
+ if (state == this) {
+ state = null;
+ stateChanged();
}
}
}
- logger.info("{}: exiting cleanup state", this);
}
+ logger.info("{}: exiting cleanup state", this);
}
/**
@@ -2357,31 +2359,32 @@ public class Bucket {
public String toString() {
return "Bucket.NewOwner(" + index + ")";
}
- }
- /**
- * Restore bucket data.
- *
- * @param obj deserialized bucket data
- */
- private void restoreBucketData(Object obj) {
- if (obj instanceof List) {
- for (Object entry : (List<?>)obj) {
- if (entry instanceof Restore) {
- // entry-specific 'restore' operation
- ((Restore)entry).restore(this.index);
- } else {
- logger.error("{}: Expected '{}' but got '{}'",
- this, Restore.class.getName(),
- entry.getClass().getName());
+ /**
+ * Restore bucket data.
+ *
+ * @param obj deserialized bucket data
+ */
+ private void restoreBucketData(Object obj) {
+ if (obj instanceof List) {
+ for (Object entry : (List<?>)obj) {
+ if (entry instanceof Restore) {
+ // entry-specific 'restore' operation
+ ((Restore)entry).restore(Bucket.this.index);
+ } else {
+ logger.error("{}: Expected '{}' but got '{}'",
+ this, Restore.class.getName(),
+ entry.getClass().getName());
+ }
}
+ } else {
+ logger.error("{}: expected 'List' but got '{}'",
+ this, obj.getClass().getName());
}
- } else {
- logger.error("{}: expected 'List' but got '{}'",
- this, obj.getClass().getName());
}
}
+
/* ============================================================ */
/**
@@ -2459,9 +2462,9 @@ public class Bucket {
@Override
public WebTarget webTarget(WebTarget webTarget) {
return webTarget
- .queryParam("bucket", index)
- .queryParam("dest", newOwner.getUuid())
- .queryParam("ttl", timeToLive);
+ .queryParam(QP_BUCKET, index)
+ .queryParam(QP_DEST, newOwner.getUuid())
+ .queryParam(QP_TTL, timeToLive);
}
@Override