summaryrefslogtreecommitdiffstats
path: root/feature-server-pool/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'feature-server-pool/src/main/java/org')
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java429
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java1
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java159
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java41
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java211
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/MainLoop.java7
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/RestServerPool.java6
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java222
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ServerPoolProperties.java7
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java958
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Util.java35
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java171
12 files changed, 1149 insertions, 1098 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
index 2236506e..b949134f 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
@@ -174,7 +174,13 @@ public class Bucket {
private volatile State state = null;
// storage for additional data
- private Map<Class<?>, Object> adjuncts = new HashMap<Class<?>, Object>();
+ private Map<Class<?>, Object> adjuncts = new HashMap<>();
+
+ // HTTP query parameters
+ private static final String QP_BUCKET = "bucket";
+ private static final String QP_KEYWORD = "keyword";
+ private static final String QP_DEST = "dest";
+ private static final String QP_TTL = "ttl";
// BACKUP data (only buckets for where we are the owner, or a backup)
@@ -287,92 +293,6 @@ public class Bucket {
}
/**
- * This method is called to start a 'rebalance' operation in a background
- * thread, but it only does this on the lead server. Being balanced means
- * the following:
- * 1) Each server owns approximately the same number of buckets
- * 2) If any server were to fail, and the designated primaries take over
- * for all of that server's buckets, all remaining servers would still
- * own approximately the same number of buckets.
- * 3) If any two servers were to fail, and the designated primaries were
- * to take over for the failed server's buckets (secondaries would take
- * for buckets where the owner and primary are OOS), all remaining
- * servers would still own approximately the same number of buckets.
- * 4) Each server should have approximately the same number of
- * (primary-backup + secondary-backup) buckets that it is responsible for.
- * 5) The primary backup for each bucket must be on the same site as the
- * owner, and the secondary backup must be on a different site.
- */
- private static void rebalance() {
- if (Leader.getLeader() == Server.getThisServer()) {
- Rebalance rb = new Rebalance();
- synchronized (rebalanceLock) {
- // the most recent 'Rebalance' instance is the only valid one
- rebalance = rb;
- }
-
- new Thread("BUCKET REBALANCER") {
- @Override
- public void run() {
- /*
- * copy bucket and host data,
- * generating a temporary internal table.
- */
- rb.copyData();
-
- /*
- * allocate owners for all buckets without an owner,
- * and rebalance bucket owners, if necessary --
- * this takes card of item #1, above.
- */
- rb.allocateBuckets();
-
- /*
- * make sure that primary backups always have the same site
- * as the owner, and secondary backups always have a different
- * site -- this takes care of #5, above.
- */
- rb.checkSiteValues();
-
- /*
- * adjust primary backup lists to take care of item #2, above
- * (taking #5 into account).
- */
- rb.rebalancePrimaryBackups();
-
- /*
- * allocate secondary backups, and take care of items
- * #3 and #4, above (taking #5 into account).
- */
- rb.rebalanceSecondaryBackups();
-
- try {
- synchronized (rebalanceLock) {
- /*
- * if another 'Rebalance' instance has started in the
- * mean time, don't do the update.
- */
- if (rebalance == rb) {
- /*
- * build a message containing all of the updated bucket
- * information, process it internally in this host
- * (lead server), and send it out to others in the
- * "notify list".
- */
- rb.generateBucketMessage();
- rebalance = null;
- }
- }
- } catch (IOException e) {
- logger.error("Exception in Rebalance.generateBucketMessage",
- e);
- }
- }
- }.start();
- }
- }
-
- /**
* Handle an incoming /bucket/update REST message.
*
* @param data base64-encoded data, containing all bucket updates
@@ -433,13 +353,12 @@ public class Bucket {
int tag;
while ((tag = dis.readUnsignedByte()) != END_OF_PARAMETERS_TAG) {
switch (tag) {
- case OWNER_UPDATE: {
+ case OWNER_UPDATE:
// <OWNER_UPDATE> <owner-uuid> -- owner UUID specified
bucketChanges = updateBucketInternalOwnerUpdate(bucket, dis, index);
break;
- }
- case OWNER_NULL: {
+ case OWNER_NULL:
// <OWNER_NULL> -- owner UUID should be set to 'null'
if (bucket.getOwner() != null) {
logger.info("Bucket {} owner: {}->null",
@@ -451,9 +370,8 @@ public class Bucket {
}
}
break;
- }
- case PRIMARY_BACKUP_UPDATE: {
+ case PRIMARY_BACKUP_UPDATE:
// <PRIMARY_BACKUP_UPDATE> <primary-backup-uuid> --
// primary backup UUID specified
Server newPrimaryBackup =
@@ -465,9 +383,8 @@ public class Bucket {
bucket.primaryBackup = newPrimaryBackup;
}
break;
- }
- case PRIMARY_BACKUP_NULL: {
+ case PRIMARY_BACKUP_NULL:
// <PRIMARY_BACKUP_NULL> --
// primary backup should be set to 'null'
if (bucket.primaryBackup != null) {
@@ -477,9 +394,8 @@ public class Bucket {
bucket.primaryBackup = null;
}
break;
- }
- case SECONDARY_BACKUP_UPDATE: {
+ case SECONDARY_BACKUP_UPDATE:
// <SECONDARY_BACKUP_UPDATE> <secondary-backup-uuid> --
// secondary backup UUID specified
Server newSecondaryBackup =
@@ -491,9 +407,8 @@ public class Bucket {
bucket.secondaryBackup = newSecondaryBackup;
}
break;
- }
- case SECONDARY_BACKUP_NULL: {
+ case SECONDARY_BACKUP_NULL:
// <SECONDARY_BACKUP_NULL> --
// secondary backup should be set to 'null'
if (bucket.secondaryBackup != null) {
@@ -503,7 +418,6 @@ public class Bucket {
bucket.secondaryBackup = null;
}
break;
- }
default:
logger.error("Illegal tag: {}", tag);
@@ -550,7 +464,6 @@ public class Bucket {
bucket.state = bucket.new NewOwner(true, oldOwner);
} else {
// new owner has been confirmed
- // orig bucket.state.newOwner();
bucket.state.newOwner();
}
}
@@ -685,22 +598,23 @@ public class Bucket {
* selected bucket has no server assigned -- this should only be a
* transient situation, until 'rebalance' is run.
*/
- out.println("Bucket is " + bucketNumber + ", which has no owner");
+ out.format("Bucket is %d, which has no owner\n", bucketNumber);
} else if (server == Server.getThisServer()) {
/*
* the selected bucket is associated with this particular server --
* no forwarding is needed.
*/
- out.println("Bucket is " + bucketNumber
- + ", which is owned by this server: " + server.getUuid());
+ out.format("Bucket is %d, which is owned by this server: %s\n",
+ bucketNumber, server.getUuid());
} else {
/*
* the selected bucket is assigned to a different server -- forward
* the message.
*/
- out.println("Bucket is " + bucketNumber + ": sending from\n"
- + " " + Server.getThisServer().getUuid() + " to \n"
- + " " + server.getUuid());
+ out.format("Bucket is %d: sending from\n"
+ + " %s to\n"
+ + " %s\n",
+ bucketNumber, Server.getThisServer().getUuid(), server.getUuid());
// do a POST call of /bucket/bucketResponse to the remoote server
Entity<String> entity =
@@ -723,8 +637,8 @@ public class Bucket {
// we need to include the 'bucket' and 'keyword' parameters
// in the POST that we are sending out
return webTarget
- .queryParam("bucket", bucketNumber)
- .queryParam("keyword", keyword);
+ .queryParam(QP_BUCKET, bucketNumber)
+ .queryParam(QP_KEYWORD, keyword);
}
/**
@@ -744,12 +658,12 @@ public class Bucket {
if (response == null) {
out.println("Timed out waiting for a response");
} else {
- out.println("Received response code " + response.getStatus());
- out.println("Entity = " + response.readEntity(String.class));
+ out.format("Received response code %s\nEntity = %s\n",
+ response.getStatus(), response.readEntity(String.class));
}
} catch (InterruptedException e) {
out.println(e);
- throw new IOException(e);
+ Thread.currentThread().interrupt();
}
}
}
@@ -905,7 +819,8 @@ public class Bucket {
Server server;
WebTarget webTarget;
- if ((ttl -= 1) > 0
+ ttl -= 1;
+ if (ttl > 0
&& (server = Server.getServer(dest)) != null
&& (webTarget = server.getWebTarget("bucket/sessionData")) != null) {
logger.info("Forwarding 'bucket/sessionData' to uuid {}",
@@ -915,9 +830,9 @@ public class Bucket {
MediaType.APPLICATION_OCTET_STREAM_TYPE);
Response response =
webTarget
- .queryParam("bucket", bucketNumber)
- .queryParam("dest", dest)
- .queryParam("ttl", String.valueOf(ttl))
+ .queryParam(QP_BUCKET, bucketNumber)
+ .queryParam(QP_DEST, dest)
+ .queryParam(QP_TTL, String.valueOf(ttl))
.request().post(entity);
logger.info("/bucket/sessionData response code = {}",
response.getStatus());
@@ -977,23 +892,16 @@ public class Bucket {
* the 'newInstance' method is unable to create the adjunct)
*/
public <T> T getAdjunct(Class<T> clazz) {
- synchronized (adjuncts) {
- // look up the adjunct in the table
- Object adj = adjuncts.get(clazz);
- if (adj == null) {
- // lookup failed -- create one
- try {
- // create the adjunct (may trigger an exception)
- adj = clazz.newInstance();
-
- // update the table
- adjuncts.put(clazz, adj);
- } catch (Exception e) {
- logger.error("Can't create adjunct of {}", clazz, e);
- }
+ Object adj = adjuncts.computeIfAbsent(clazz, key -> {
+ try {
+ // create the adjunct, if needed
+ return clazz.getDeclaredConstructor().newInstance();
+ } catch (Exception e) {
+ logger.error("Can't create adjunct of {}", clazz, e);
+ return null;
}
- return clazz.cast(adj);
- }
+ });
+ return clazz.cast(adj);
}
/**
@@ -1019,7 +927,7 @@ public class Bucket {
*/
public Object putAdjunct(Object adj) {
synchronized (adjuncts) {
- Class clazz = adj.getClass();
+ Class<?> clazz = adj.getClass();
return adjuncts.put(clazz, adj);
}
}
@@ -1148,6 +1056,92 @@ public class Bucket {
// trigger a rebalance (only happens if we are the lead server)
rebalance();
}
+
+ /**
+ * This method is called to start a 'rebalance' operation in a background
+ * thread, but it only does this on the lead server. Being balanced means
+ * the following:
+ * 1) Each server owns approximately the same number of buckets
+ * 2) If any server were to fail, and the designated primaries take over
+ * for all of that server's buckets, all remaining servers would still
+ * own approximately the same number of buckets.
+ * 3) If any two servers were to fail, and the designated primaries were
+ * to take over for the failed server's buckets (secondaries would take
+ * for buckets where the owner and primary are OOS), all remaining
+ * servers would still own approximately the same number of buckets.
+ * 4) Each server should have approximately the same number of
+ * (primary-backup + secondary-backup) buckets that it is responsible for.
+ * 5) The primary backup for each bucket must be on the same site as the
+ * owner, and the secondary backup must be on a different site.
+ */
+ private void rebalance() {
+ if (Leader.getLeader() == Server.getThisServer()) {
+ Rebalance rb = new Rebalance();
+ synchronized (rebalanceLock) {
+ // the most recent 'Rebalance' instance is the only valid one
+ rebalance = rb;
+ }
+
+ new Thread("BUCKET REBALANCER") {
+ @Override
+ public void run() {
+ /*
+ * copy bucket and host data,
+ * generating a temporary internal table.
+ */
+ rb.copyData();
+
+ /*
+ * allocate owners for all buckets without an owner,
+ * and rebalance bucket owners, if necessary --
+ * this takes card of item #1, above.
+ */
+ rb.allocateBuckets();
+
+ /*
+ * make sure that primary backups always have the same site
+ * as the owner, and secondary backups always have a different
+ * site -- this takes care of #5, above.
+ */
+ rb.checkSiteValues();
+
+ /*
+ * adjust primary backup lists to take care of item #2, above
+ * (taking #5 into account).
+ */
+ rb.rebalancePrimaryBackups();
+
+ /*
+ * allocate secondary backups, and take care of items
+ * #3 and #4, above (taking #5 into account).
+ */
+ rb.rebalanceSecondaryBackups();
+
+ try {
+ synchronized (rebalanceLock) {
+ /*
+ * if another 'Rebalance' instance has started in the
+ * mean time, don't do the update.
+ */
+ if (rebalance == rb) {
+ /*
+ * build a message containing all of the updated bucket
+ * information, process it internally in this host
+ * (lead server), and send it out to others in the
+ * "notify list".
+ */
+ rb.generateBucketMessage();
+ rebalance = null;
+ }
+ }
+ } catch (IOException e) {
+ logger.error("Exception in Rebalance.generateBucketMessage",
+ e);
+ }
+ }
+ }.start();
+ }
+ }
}
/* ============================================================ */
@@ -1477,11 +1471,15 @@ public class Bucket {
return 0;
};
- FutureTask<Integer> ft = new FutureTask(callable);
+ FutureTask<Integer> ft = new FutureTask<>(callable);
MainLoop.queueWork(ft);
try {
ft.get(60, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ } catch (InterruptedException e) {
+ logger.error("Interrupted", e);
+ Thread.currentThread().interrupt();
+ return;
+ } catch (ExecutionException | TimeoutException e) {
logger.error("Exception in Rebalance.copyData", e);
return;
}
@@ -1534,7 +1532,7 @@ public class Bucket {
* 'needBuckets' TreeSet: those with the fewest buckets allocated are
* at the head of the list.
*/
- Comparator<TestServer> bucketCount = new Comparator<TestServer>() {
+ Comparator<TestServer> bucketCount = new Comparator<>() {
@Override
public int compare(TestServer s1, TestServer s2) {
int rval = s1.buckets.size() - s2.buckets.size();
@@ -1662,8 +1660,7 @@ public class Bucket {
// populate a 'TreeSet' of 'AdjustedTestServer' instances based
// the failure of 'failedServer'
- TreeSet<AdjustedTestServer> adjustedTestServers =
- new TreeSet<AdjustedTestServer>();
+ TreeSet<AdjustedTestServer> adjustedTestServers = new TreeSet<>();
for (TestServer server : testServers.values()) {
if (server == failedServer
|| !Objects.equals(siteSocketAddress,
@@ -1943,7 +1940,7 @@ public class Bucket {
int size = buckets.size();
if (size != 0) {
// generate a linked list of the bucket data to display
- LinkedList<String> data = new LinkedList<String>();
+ LinkedList<String> data = new LinkedList<>();
StringBuilder sb = new StringBuilder();
int count = 8;
@@ -1956,7 +1953,8 @@ public class Bucket {
// add the bucket number
sb.append(String.format("%4s", bucket.index));
- if ((count -= 1) <= 0) {
+ count -= 1;
+ if (count <= 0) {
// filled up a row --
// add it to the list, and start a new line
data.add(sb.toString());
@@ -2109,7 +2107,7 @@ public class Bucket {
// when 'System.currentTimeMillis()' reaches this value, we time out
long endTime;
- // If not 'null', we are queueing messages for this bucket;
+ // If not 'null', we are queueing messages for this bucket
// otherwise, we are sending them through.
Queue<Message> messages = new ConcurrentLinkedQueue<>();
@@ -2287,66 +2285,70 @@ public class Bucket {
} catch (Exception e) {
logger.error("Exception in {}", this, e);
} finally {
- /*
- * cleanly leave state -- we want to make sure that messages
- * are processed in order, so the queue needs to remain until
- * it is empty
- */
- logger.info("{}: entering cleanup state", this);
- for ( ; ; ) {
- Message message = messages.poll();
- if (message == null) {
- // no messages left, but this could change
- synchronized (Bucket.this) {
- message = messages.poll();
- if (message == null) {
- // no messages left
- if (state == this) {
- if (owner == Server.getThisServer()) {
- // we can now exit the state
- state = null;
- stateChanged();
- } else {
- /*
- * We need a grace period before we can
- * remove the 'state' value (this can happen
- * if we receive and process the bulk data
- * before receiving official confirmation
- * that we are owner of the bucket.
- */
- messages = null;
- }
+ run_cleanup();
+ }
+ }
+
+ private void run_cleanup() {
+ /*
+ * cleanly leave state -- we want to make sure that messages
+ * are processed in order, so the queue needs to remain until
+ * it is empty
+ */
+ logger.info("{}: entering cleanup state", this);
+ for ( ; ; ) {
+ Message message = messages.poll();
+ if (message == null) {
+ // no messages left, but this could change
+ synchronized (Bucket.this) {
+ message = messages.poll();
+ if (message == null) {
+ // no messages left
+ if (state == this) {
+ if (owner == Server.getThisServer()) {
+ // we can now exit the state
+ state = null;
+ stateChanged();
+ } else {
+ /*
+ * We need a grace period before we can
+ * remove the 'state' value (this can happen
+ * if we receive and process the bulk data
+ * before receiving official confirmation
+ * that we are owner of the bucket.
+ */
+ messages = null;
}
- break;
}
+ break;
}
}
- // this doesn't work -- it ends up right back in the queue
- // if 'messages' is defined
- message.process();
}
- if (messages == null) {
- // this indicates we need to enter a grace period before cleanup,
- try {
- logger.info("{}: entering grace period before terminating",
- this);
- Thread.sleep(unconfirmedGracePeriod);
- } catch (InterruptedException e) {
- // we are exiting in any case
- Thread.currentThread().interrupt();
- } finally {
- synchronized (Bucket.this) {
- // Do we need to confirm that we really are the owner?
- // What does it mean if we are not?
- if (state == this) {
- state = null;
- stateChanged();
- }
+ // this doesn't work -- it ends up right back in the queue
+ // if 'messages' is defined
+ message.process();
+ }
+ if (messages == null) {
+ // this indicates we need to enter a grace period before cleanup,
+ try {
+ logger.info("{}: entering grace period before terminating",
+ this);
+ Thread.sleep(unconfirmedGracePeriod);
+ } catch (InterruptedException e) {
+ // we are exiting in any case
+ Thread.currentThread().interrupt();
+ } finally {
+ synchronized (Bucket.this) {
+ // Do we need to confirm that we really are the owner?
+ // What does it mean if we are not?
+ if (state == this) {
+ state = null;
+ stateChanged();
}
}
}
- logger.info("{}: exiting cleanup state", this);
}
+ logger.info("{}: exiting cleanup state", this);
}
/**
@@ -2357,31 +2359,32 @@ public class Bucket {
public String toString() {
return "Bucket.NewOwner(" + index + ")";
}
- }
- /**
- * Restore bucket data.
- *
- * @param obj deserialized bucket data
- */
- private void restoreBucketData(Object obj) {
- if (obj instanceof List) {
- for (Object entry : (List<?>)obj) {
- if (entry instanceof Restore) {
- // entry-specific 'restore' operation
- ((Restore)entry).restore(this.index);
- } else {
- logger.error("{}: Expected '{}' but got '{}'",
- this, Restore.class.getName(),
- entry.getClass().getName());
+ /**
+ * Restore bucket data.
+ *
+ * @param obj deserialized bucket data
+ */
+ private void restoreBucketData(Object obj) {
+ if (obj instanceof List) {
+ for (Object entry : (List<?>)obj) {
+ if (entry instanceof Restore) {
+ // entry-specific 'restore' operation
+ ((Restore)entry).restore(Bucket.this.index);
+ } else {
+ logger.error("{}: Expected '{}' but got '{}'",
+ this, Restore.class.getName(),
+ entry.getClass().getName());
+ }
}
+ } else {
+ logger.error("{}: expected 'List' but got '{}'",
+ this, obj.getClass().getName());
}
- } else {
- logger.error("{}: expected 'List' but got '{}'",
- this, obj.getClass().getName());
}
}
+
/* ============================================================ */
/**
@@ -2459,9 +2462,9 @@ public class Bucket {
@Override
public WebTarget webTarget(WebTarget webTarget) {
return webTarget
- .queryParam("bucket", index)
- .queryParam("dest", newOwner.getUuid())
- .queryParam("ttl", timeToLive);
+ .queryParam(QP_BUCKET, index)
+ .queryParam(QP_DEST, newOwner.getUuid())
+ .queryParam(QP_TTL, timeToLive);
}
@Override
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java
index c507e97d..1d695a01 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java
@@ -37,7 +37,6 @@ import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVER_PU
import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty;
import com.google.gson.Gson;
-import com.google.gson.JsonObject;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
index 748a38f3..dfe211ce 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
@@ -26,9 +26,6 @@ import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUC
import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUCKET_TIME_TO_LIVE;
import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.Serializable;
@@ -36,7 +33,6 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
-import java.util.Enumeration;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -53,8 +49,6 @@ import javax.ws.rs.core.Response;
import lombok.AllArgsConstructor;
-import org.drools.core.definitions.InternalKnowledgePackage;
-import org.drools.core.impl.KnowledgeBaseImpl;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.rule.FactHandle;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
@@ -75,7 +69,6 @@ import org.onap.policy.drools.system.PolicyControllerConstants;
import org.onap.policy.drools.system.PolicyEngine;
import org.onap.policy.drools.system.PolicyEngineConstants;
import org.onap.policy.drools.utils.Pair;
-import org.onap.policy.drools.utils.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,7 +91,7 @@ public class FeatureServerPool
// used for JSON <-> String conversion
private static StandardCoder coder = new StandardCoder();
- private static final String configFile =
+ private static final String CONFIG_FILE =
"config/feature-server-pool.properties";
/*
@@ -142,6 +135,15 @@ public class FeatureServerPool
private static long droolsTimeoutMillis;
private static String timeToLiveSecond;
+ // HTTP query parameters
+ private static final String QP_KEYWORD = "keyword";
+ private static final String QP_SESSION = "session";
+ private static final String QP_BUCKET = "bucket";
+ private static final String QP_TTL = "ttl";
+ private static final String QP_CONTROLLER = "controller";
+ private static final String QP_PROTOCOL = "protocol";
+ private static final String QP_TOPIC = "topic";
+
/******************************/
/* 'OrderedService' interface */
/******************************/
@@ -166,7 +168,7 @@ public class FeatureServerPool
@Override
public boolean afterStart(PolicyEngine engine) {
logger.info("Starting FeatureServerPool");
- Server.startup(configFile);
+ Server.startup(CONFIG_FILE);
TargetLock.startup();
droolsTimeoutMillis =
getProperty(BUCKET_DROOLS_TIMEOUT, DEFAULT_BUCKET_DROOLS_TIMEOUT);
@@ -251,10 +253,10 @@ public class FeatureServerPool
+ session.getName();
return webTarget
- .queryParam("keyword", keyword)
- .queryParam("session", encodedSessionName)
- .queryParam("bucket", bucketNumber)
- .queryParam("ttl", timeToLiveSecond);
+ .queryParam(QP_KEYWORD, keyword)
+ .queryParam(QP_SESSION, encodedSessionName)
+ .queryParam(QP_BUCKET, bucketNumber)
+ .queryParam(QP_TTL, timeToLiveSecond);
}
@Override
@@ -320,6 +322,20 @@ public class FeatureServerPool
path[path.length - 1] = fieldName;
}
keyword = sco.getString(path);
+ if (keyword != null) {
+ if (conversionFunctionName == null) {
+ // We found a keyword -- we don't need to try other paths,
+ // so we should break out of the loop
+ break;
+ }
+
+ // we have post-processing to do
+ keyword = Keyword.convertKeyword(keyword, conversionFunctionName);
+ if (keyword != null) {
+ // conversion was successful
+ break;
+ }
+ }
}
if (keyword == null) {
@@ -451,13 +467,16 @@ public class FeatureServerPool
}
}
}
- } else if ((ttl -= 1) > 0) {
- /*
- * This host is not the intended destination -- this could happen
- * if it was sent from another site. Forward the message in the
- * same thread.
- */
- forwardInsertDroolsMessage(bucket, keyword, sessionName, ttl, data);
+ } else {
+ ttl -= 1;
+ if (ttl > 0) {
+ /*
+ * This host is not the intended destination -- this could happen
+ * if it was sent from another site. Forward the message in the
+ * same thread.
+ */
+ forwardInsertDroolsMessage(bucket, keyword, sessionName, ttl, data);
+ }
}
}
@@ -496,10 +515,10 @@ public class FeatureServerPool
Entity.entity(new String(data, StandardCharsets.UTF_8),
MediaType.APPLICATION_OCTET_STREAM_TYPE);
webTarget
- .queryParam("keyword", keyword)
- .queryParam("session", sessionName)
- .queryParam("bucket", bucket)
- .queryParam("ttl", ttl)
+ .queryParam(QP_KEYWORD, keyword)
+ .queryParam(QP_SESSION, sessionName)
+ .queryParam(QP_BUCKET, bucket)
+ .queryParam(QP_TTL, ttl)
.request().post(entity);
}
}
@@ -683,17 +702,22 @@ public class FeatureServerPool
@Override
public WebTarget webTarget(WebTarget webTarget) {
return webTarget
- .queryParam("bucket", bucketNumber)
- .queryParam("keyword", keyword)
- .queryParam("controller", controller.getName())
- .queryParam("protocol", protocol.toString())
- .queryParam("topic", topic);
+ .queryParam(QP_BUCKET, bucketNumber)
+ .queryParam(QP_KEYWORD, keyword)
+ .queryParam(QP_CONTROLLER, controller.getName())
+ .queryParam(QP_PROTOCOL, protocol.toString())
+ .queryParam(QP_TOPIC, topic);
}
@Override
public void response(Response response) {
- // TODO: eventually, we will want to do something different
- // based upon success/failure
+ // log a message indicating success/failure
+ int status = response.getStatus();
+ if (status >= 200 && status <= 299) {
+ logger.info("/bucket/topic response code = {}", status);
+ } else {
+ logger.error("/bucket/topic response code = {}", status);
+ }
}
});
}
@@ -749,30 +773,28 @@ public class FeatureServerPool
logger.info("{}: about to fetch data for session {}",
this, session.getFullName());
- kieSession.insert(new DroolsRunnable() {
- @Override
- public void run() {
- List<Object> droolsObjects = new ArrayList<>();
- for (FactHandle fh : kieSession.getFactHandles()) {
- Object obj = kieSession.getObject(fh);
- String keyword = Keyword.lookupKeyword(obj);
- if (keyword != null
- && Bucket.bucketNumber(keyword) == bucketNumber) {
- // bucket matches -- include this object
- droolsObjects.add(obj);
- /*
- * delete this factHandle from Drools memory
- * this classes are used in bucket migration,
- * so the delete is intentional.
- */
- kieSession.delete(fh);
- }
+ DroolsRunnable backupAndRemove = () -> {
+ List<Object> droolsObjects = new ArrayList<>();
+ for (FactHandle fh : kieSession.getFactHandles()) {
+ Object obj = kieSession.getObject(fh);
+ String keyword = Keyword.lookupKeyword(obj);
+ if (keyword != null
+ && Bucket.bucketNumber(keyword) == bucketNumber) {
+ // bucket matches -- include this object
+ droolsObjects.add(obj);
+ /*
+ * delete this factHandle from Drools memory
+ * this classes are used in bucket migration,
+ * so the delete is intentional.
+ */
+ kieSession.delete(fh);
}
-
- // send notification that object list is complete
- droolsObjectsWrapper.complete(droolsObjects);
}
- });
+
+ // send notification that object list is complete
+ droolsObjectsWrapper.complete(droolsObjects);
+ };
+ kieSession.insert(backupAndRemove);
// add pending operation to the list
pendingData.add(new Pair<>(droolsObjectsWrapper, session));
@@ -858,6 +880,7 @@ public class FeatureServerPool
}
} catch (InterruptedException e) {
logger.error("Exception in {}", this, e);
+ Thread.currentThread().interrupt();
}
}
}
@@ -957,24 +980,22 @@ public class FeatureServerPool
final KieSession kieSession = session.getKieSession();
// run the following within the Drools session thread
- kieSession.insert(new DroolsRunnable() {
- @Override
- public void run() {
- try {
- /*
- * Insert all of the objects -- note that this is running
- * in the session thread, so no other rules can fire
- * until all of the objects are inserted.
- */
- for (Object obj : droolsObjects) {
- kieSession.insert(obj);
- }
- } finally {
- // send notification that the inserts have completed
- sessionLatch.countDown();
+ DroolsRunnable doRestore = () -> {
+ try {
+ /*
+ * Insert all of the objects -- note that this is running
+ * in the session thread, so no other rules can fire
+ * until all of the objects are inserted.
+ */
+ for (Object droolsObj : droolsObjects) {
+ kieSession.insert(droolsObj);
}
+ } finally {
+ // send notification that the inserts have completed
+ sessionLatch.countDown();
}
- });
+ };
+ kieSession.insert(doRestore);
return sessionLatch;
} else {
logger.error("{}: Invalid session data for session={}, type={}",
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java
index 6c88ebd0..e0b97fda 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java
@@ -47,16 +47,11 @@ public class Keyword {
// this table can be used to map an object class into the method
// to invoke to do the lookup
- private static ConcurrentHashMap<Class, Lookup> classToLookup =
+ private static ConcurrentHashMap<Class<?>, Lookup> classToLookup =
new ConcurrentHashMap<>();
// this is a pre-defined 'Lookup' instance that always returns 'null'
- private static Lookup nullLookup = new Lookup() {
- @Override
- public String getKeyword(Object obj) {
- return null;
- }
- };
+ private static Lookup nullLookup = (Object obj) -> (String) null;
/**
* This method takes the object's class, looks it up in the 'classToLookup'
@@ -78,7 +73,7 @@ public class Keyword {
// try to locate a matching entry using 'inheritance' rules
Class<?> thisClass = obj.getClass();
Class<?> matchingClass = null;
- for (Map.Entry<Class, Lookup> entry : classToLookup.entrySet()) {
+ for (Map.Entry<Class<?>, Lookup> entry : classToLookup.entrySet()) {
if (entry.getKey().isAssignableFrom(thisClass)
&& (matchingClass == null
|| matchingClass.isAssignableFrom(entry.getKey()))) {
@@ -173,7 +168,14 @@ public class Keyword {
}
}
- return lookupClassByName(classNameToSequence, clazz);
+ Class<?> keyClass = buildReflectiveLookup_findKeyClass(clazz);
+
+ if (keyClass == null) {
+ // no matching class name found
+ return null;
+ }
+
+ return buildReflectiveLookup_build(clazz, keyClass);
}
/**
@@ -182,8 +184,7 @@ public class Keyword {
* interfaces. If no match is found, repeat with the superclass,
* and all the way up the superclass chain.
*/
- private static Lookup lookupClassByName(Map<String, String> classNameToSequence,
- Class<?> clazz) {
+ private static Class<?> buildReflectiveLookup_findKeyClass(Class<?> clazz) {
Class<?> keyClass = null;
for (Class<?> cl = clazz ; cl != null ; cl = cl.getSuperclass()) {
if (classNameToSequence.containsKey(cl.getName())) {
@@ -210,11 +211,10 @@ public class Keyword {
break;
}
}
+ return keyClass;
+ }
- if (keyClass == null) {
- // no matching class name found
- return null;
- }
+ private static Lookup buildReflectiveLookup_build(Class<?> clazz, Class<?> keyClass) {
// we found a matching key in the table -- now, process the values
Class<?> currentClass = keyClass;
@@ -443,13 +443,10 @@ public class Keyword {
static final int UUID_LENGTH = 36;
static {
- conversionFunction.put("uuid", new Function<String, String>() {
- @Override
- public String apply(String value) {
- // truncate strings to 36 characters
- return value != null && value.length() > UUID_LENGTH
- ? value.substring(0, UUID_LENGTH) : value;
- }
+ conversionFunction.put("uuid", value -> {
+ // truncate strings to 36 characters
+ return value != null && value.length() > UUID_LENGTH
+ ? value.substring(0, UUID_LENGTH) : value;
});
}
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java
index 9d864bd7..06b02527 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java
@@ -83,6 +83,13 @@ class Leader {
private static int stableVotingCycles;
/**
+ * Hide implicit public constructor.
+ */
+ private Leader() {
+ // everything here is static -- no instances of this class are created
+ }
+
+ /**
* Invoked at startup, or after some events -- immediately start a new vote.
*/
static void startup() {
@@ -125,23 +132,18 @@ class Leader {
// decode base64 data
final byte[] packet = Base64.getDecoder().decode(data);
- MainLoop.queueWork(new Runnable() {
- /**
- * This method is running within the 'MainLoop' thread.
- */
- @Override
- public void run() {
- // create the 'VoteCycle' state machine, if needed
- if (voteCycle == null) {
- voteCycle = new VoteCycle();
- MainLoop.addBackgroundWork(voteCycle);
- }
- try {
- // pass data to 'VoteCycle' state machine
- voteCycle.packetReceived(packet);
- } catch (IOException e) {
- logger.error("Exception in 'Leader.voteData", e);
- }
+ MainLoop.queueWork(() -> {
+ // This runs within the 'MainLoop' thread --
+ // create the 'VoteCycle' state machine, if needed
+ if (voteCycle == null) {
+ voteCycle = new VoteCycle();
+ MainLoop.addBackgroundWork(voteCycle);
+ }
+ try {
+ // pass data to 'VoteCycle' state machine
+ voteCycle.packetReceived(packet);
+ } catch (IOException e) {
+ logger.error("Exception in 'Leader.voteData", e);
}
});
}
@@ -250,94 +252,107 @@ class Leader {
@Override
public void run() {
switch (state) {
- case STARTUP: {
- // 5-second grace period -- wait for things to stablize before
- // starting the vote
- if ((cycleCount -= 1) <= 0) {
- logger.info("VoteCycle: {} seconds have passed",
- stableIdleCycles);
- //MainLoop.removeBackgroundWork(this);
- updateMyVote();
- sendOutUpdates();
- state = State.VOTING;
- cycleCount = stableVotingCycles;
- }
+ case STARTUP:
+ startupState();
break;
- }
- case VOTING: {
- // need to be in the VOTING state without any vote changes
- // for 5 seconds -- once this happens, the leader is chosen
- if (sendOutUpdates()) {
- // changes have occurred -- set the grace period to 5 seconds
- cycleCount = stableVotingCycles;
- } else if ((cycleCount -= 1) <= 0) {
- // 5 second grace period has passed -- the leader is one with
- // the most votes, which is the first entry in 'voteData'
- Server oldLeader = leader;
- leader = Server.getServer(voteData.first().uuid);
- if (leader != oldLeader) {
- // the leader has changed -- send out notifications
- for (Events listener : Events.getListeners()) {
- listener.newLeader(leader);
- }
- } else {
- // the election is over, and the leader has been confirmed
- for (Events listener : Events.getListeners()) {
- listener.leaderConfirmed(leader);
- }
- }
- if (leader == Server.getThisServer()) {
- // this is the lead server --
- // make sure the 'Discovery' threads are running
- Discovery.startDiscovery();
- } else {
- // this is not the lead server -- stop 'Discovery' threads
- Discovery.stopDiscovery();
- }
-
- // we are done with voting -- clean up, and report results
- MainLoop.removeBackgroundWork(this);
- voteCycle = null;
-
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- PrintStream out = new PrintStream(bos);
-
- out.println("Voting results:");
-
- // x(36) xxxxx x(36)
- // UUID Votes Voter
- String format = "%-36s %5s %-36s\n";
- out.format(format, "UUID", "Votes", "Voter(s)");
- out.format(format, "----", "-----", "--------");
-
- for (VoteData vote : voteData) {
- if (vote.voters.isEmpty()) {
- out.format(format, vote.uuid, 0, "");
- } else {
- boolean headerNeeded = true;
- for (VoterData voter : vote.voters) {
- if (headerNeeded) {
- out.format(format, vote.uuid,
- vote.voters.size(), voter.uuid);
- headerNeeded = false;
- } else {
- out.format(format, "", "", voter.uuid);
- }
- }
- }
- }
-
- logger.info(bos.toString());
- }
+ case VOTING:
+ votingState();
break;
- }
+
default:
logger.error("Unknown state: {}", state);
break;
}
}
+ private void startupState() {
+ // 5-second grace period -- wait for things to stablize before
+ // starting the vote
+ cycleCount -= 1;
+ if (cycleCount <= 0) {
+ logger.info("VoteCycle: {} seconds have passed",
+ stableIdleCycles);
+ updateMyVote();
+ sendOutUpdates();
+ state = State.VOTING;
+ cycleCount = stableVotingCycles;
+ }
+ }
+
+ private void votingState() {
+ // need to be in the VOTING state without any vote changes
+ // for 5 seconds -- once this happens, the leader is chosen
+ if (sendOutUpdates()) {
+ // changes have occurred -- set the grace period to 5 seconds
+ cycleCount = stableVotingCycles;
+ return;
+ }
+
+ cycleCount -= 1;
+ if (cycleCount > 0) {
+ return;
+ }
+
+ // 5 second grace period has passed -- the leader is one with
+ // the most votes, which is the first entry in 'voteData'
+ Server oldLeader = leader;
+ leader = Server.getServer(voteData.first().uuid);
+ if (leader != oldLeader) {
+ // the leader has changed -- send out notifications
+ for (Events listener : Events.getListeners()) {
+ listener.newLeader(leader);
+ }
+ } else {
+ // the election is over, and the leader has been confirmed
+ for (Events listener : Events.getListeners()) {
+ listener.leaderConfirmed(leader);
+ }
+ }
+ if (leader == Server.getThisServer()) {
+ // this is the lead server --
+ // make sure the 'Discovery' threads are running
+ Discovery.startDiscovery();
+ } else {
+ // this is not the lead server -- stop 'Discovery' threads
+ Discovery.stopDiscovery();
+ }
+
+ // we are done with voting -- clean up, and report results
+ MainLoop.removeBackgroundWork(this);
+ voteCycle = null;
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ PrintStream out = new PrintStream(bos);
+
+ out.println("Voting results:");
+
+ // x(36) xxxxx x(36)
+ // UUID Votes Voter
+ String format = "%-36s %5s %-36s\n";
+ out.format(format, "UUID", "Votes", "Voter(s)");
+ out.format(format, "----", "-----", "--------");
+
+ for (VoteData vote : voteData) {
+ if (vote.voters.isEmpty()) {
+ out.format(format, vote.uuid, 0, "");
+ continue;
+ }
+ boolean headerNeeded = true;
+ for (VoterData voter : vote.voters) {
+ if (headerNeeded) {
+ out.format(format, vote.uuid,
+ vote.voters.size(), voter.uuid);
+ headerNeeded = false;
+ } else {
+ out.format(format, "", "", voter.uuid);
+ }
+ }
+ }
+
+ logger.info(bos.toString());
+ }
+
/**
* Process an incoming /vote REST message.
*
@@ -375,7 +390,7 @@ class Leader {
private void processVote(UUID voter, UUID vote, long timestamp) {
// fetch old data for this voter
VoterData voterData = uuidToVoterData.computeIfAbsent(voter,
- (key) -> new VoterData(voter, timestamp));
+ key -> new VoterData(voter, timestamp));
if (timestamp >= voterData.timestamp) {
// this is a new vote for this voter -- update the timestamp
voterData.timestamp = timestamp;
@@ -389,7 +404,7 @@ class Leader {
VoteData newVoteData = null;
if (vote != null) {
- newVoteData = uuidToVoteData.computeIfAbsent(vote, (key) -> new VoteData(vote));
+ newVoteData = uuidToVoteData.computeIfAbsent(vote, key -> new VoteData(vote));
}
if (oldVoteData != newVoteData) {
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/MainLoop.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/MainLoop.java
index 1ed7ecb2..1c6281d9 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/MainLoop.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/MainLoop.java
@@ -159,8 +159,8 @@ class MainLoop extends Thread {
/**
* Poll for and process incoming messages for up to 1 second.
*/
- static void handleIncomingWork() throws InterruptedException {
- long currentTime = System.currentTimeMillis();;
+ static void handleIncomingWork() {
+ long currentTime = System.currentTimeMillis();
long wakeUpTime = currentTime + cycleTime;
long timeDiff;
@@ -176,7 +176,8 @@ class MainLoop extends Thread {
work.run();
} catch (InterruptedException e) {
logger.error("Interrupted in MainLoop");
- throw(e);
+ Thread.currentThread().interrupt();
+ return;
} catch (Exception e) {
logger.error("Exception in MainLoop incoming work", e);
}
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/RestServerPool.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/RestServerPool.java
index 1c4cc7ba..8ece943e 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/RestServerPool.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/RestServerPool.java
@@ -41,12 +41,6 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import org.onap.policy.drools.serverpool.Bucket;
-import org.onap.policy.drools.serverpool.FeatureServerPool;
-import org.onap.policy.drools.serverpool.Leader;
-import org.onap.policy.drools.serverpool.Server;
-import org.onap.policy.drools.serverpool.TargetLock;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java
index 52e3d2dc..8ee0f2d2 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java
@@ -62,7 +62,6 @@ import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
-import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Date;
@@ -81,15 +80,12 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import javax.servlet.ServletException;
-import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import org.eclipse.jetty.server.ServerConnector;
import org.glassfish.jersey.client.ClientProperties;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.http.client.HttpClient;
@@ -97,7 +93,6 @@ import org.onap.policy.common.endpoints.http.client.HttpClientConfigException;
import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
import org.onap.policy.common.endpoints.http.server.HttpServletServer;
import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
-import org.onap.policy.drools.system.PolicyEngineConstants;
import org.onap.policy.drools.utils.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -126,10 +121,6 @@ public class Server implements Comparable<Server> {
// the current REST server
private static HttpServletServer restServer;
- // incoming packets from HTTP
- private static LinkedTransferQueue<byte[]> incomingPackets =
- new LinkedTransferQueue<>();
-
/*==================================================*/
/* Some properties extracted at initialization time */
/*==================================================*/
@@ -212,6 +203,9 @@ public class Server implements Comparable<Server> {
static final int SOCKET_ADDRESS_TAG = 1;
static final int SITE_SOCKET_ADDRESS_TAG = 2;
+ // 'pingHosts' error
+ static final String PINGHOSTS_ERROR = "Server.pingHosts error";
+
/*==============================*/
/* Comparable<Server> interface */
/*==============================*/
@@ -311,6 +305,7 @@ public class Server implements Comparable<Server> {
InetAddress address = InetAddress.getByName(ipAddressString);
InetSocketAddress socketAddress = new InetSocketAddress(address, port);
+ possibleError = "HTTP server initialization error";
restServer = HttpServletServerFactoryInstance.getServerFactory().build(
"SERVER-POOL", // name
useHttps, // https
@@ -332,7 +327,9 @@ public class Server implements Comparable<Server> {
}
// we may not know the port until after the server is started
+ possibleError = "HTTP server start error";
restServer.start();
+ possibleError = null;
// determine the address to use
if (DEFAULT_SERVER_IP_ADDRESS.contentEquals(address.getHostAddress())) {
@@ -346,13 +343,10 @@ public class Server implements Comparable<Server> {
// start background thread
MainLoop.startThread();
- MainLoop.queueWork(new Runnable() {
- @Override
- public void run() {
- // run this in the 'MainLoop' thread
- Leader.startup();
- Bucket.startup();
- }
+ MainLoop.queueWork(() -> {
+ // run this in the 'MainLoop' thread
+ Leader.startup();
+ Bucket.startup();
});
logger.info("Listening on port {}", port);
@@ -491,14 +485,12 @@ public class Server implements Comparable<Server> {
int tag;
while ((tag = is.readUnsignedByte()) != END_OF_PARAMETERS_TAG) {
switch (tag) {
- case SOCKET_ADDRESS_TAG: {
+ case SOCKET_ADDRESS_TAG:
socketAddress = readSocketAddress(is);
break;
- }
- case SITE_SOCKET_ADDRESS_TAG: {
+ case SITE_SOCKET_ADDRESS_TAG:
siteSocketAddress = readSocketAddress(is);
break;
- }
default:
// ignore tag
logger.error("Illegal tag: {}", tag);
@@ -513,8 +505,7 @@ public class Server implements Comparable<Server> {
* @param is the 'DataInputStream'
* @return the 'InetSocketAddress'
*/
- private static InetSocketAddress readSocketAddress(DataInputStream is)
- throws IOException, UnknownHostException {
+ private static InetSocketAddress readSocketAddress(DataInputStream is) throws IOException {
byte[] ipAddress = new byte[4];
is.read(ipAddress, 0, 4);
@@ -926,47 +917,45 @@ public class Server implements Comparable<Server> {
return;
}
- getThreadPool().execute(new Runnable() {
+ getThreadPool().execute(() -> {
/**
* This method is running within the 'MainLoop' thread.
*/
- @Override
- public void run() {
- try {
- WebTarget webTarget = target.path(path);
- if (responseCallback != null) {
- // give callback a chance to modify 'WebTarget'
- webTarget = responseCallback.webTarget(webTarget);
-
- // send the response to the callback
- Response response;
- if (entity == null) {
- response = webTarget.request().get();
- } else {
- response = webTarget.request().post(entity);
- }
- responseCallback.response(response);
+ try {
+ WebTarget webTarget = target.path(path);
+ if (responseCallback != null) {
+ // give callback a chance to modify 'WebTarget'
+ webTarget = responseCallback.webTarget(webTarget);
+
+ // send the response to the callback
+ Response response;
+ if (entity == null) {
+ response = webTarget.request().get();
} else {
- // just do the invoke, and ignore the response
- if (entity == null) {
- webTarget.request().get();
- } else {
- webTarget.request().post(entity);
- }
+ response = webTarget.request().post(entity);
}
- } catch (Exception e) {
- logger.error("Failed to send to {} ({}, {})",
- uuid, destSocketAddress, destName);
+ responseCallback.response(response);
+ } else {
+ // just do the invoke, and ignore the response
+ if (entity == null) {
+ webTarget.request().get();
+ } else {
+ webTarget.request().post(entity);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Failed to send to {} ({}, {})",
+ uuid, destSocketAddress, destName);
+ if (responseCallback != null) {
responseCallback.exceptionResponse(e);
- MainLoop.queueWork(new Runnable() {
- @Override
- public void run() {
- // the DNS cache may have been out-of-date when this server
- // was first contacted -- fix the problem, if needed
- checkServer();
- }
- });
}
+ MainLoop.queueWork(() -> {
+ // this runs in the 'MainLoop' thread
+
+ // the DNS cache may have been out-of-date when this server
+ // was first contacted -- fix the problem, if needed
+ checkServer();
+ });
}
});
}
@@ -1037,18 +1026,20 @@ public class Server implements Comparable<Server> {
* in 'notifyList' (may need to build or rebuild 'notifyList').
*/
static void sendOutData() throws IOException {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
-
// include 'thisServer' in the data -- first, advance the count
- if ((thisServer.count += 1) == 0) {
+ thisServer.count += 1;
+ if (thisServer.count == 0) {
/*
- * counter wrapped (0 is a special case);
+ * counter wrapped (0 is a special case) --
* actually, we could probably leave this out, because it would take
* more than a century to wrap if the increment is 1 second
*/
thisServer.count = 1;
}
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+
thisServer.lastUpdateTime = System.currentTimeMillis();
thisServer.writeServerData(dos);
@@ -1129,11 +1120,11 @@ public class Server implements Comparable<Server> {
}
} catch (NumberFormatException e) {
out.println(host + ": Invalid port value");
- logger.error("Server.pingHosts error", e);
+ logger.error(PINGHOSTS_ERROR, e);
error = true;
} catch (UnknownHostException e) {
out.println(host + ": Unknown host");
- logger.error("Server.pingHosts error", e);
+ logger.error(PINGHOSTS_ERROR, e);
error = true;
}
}
@@ -1152,58 +1143,58 @@ public class Server implements Comparable<Server> {
*/
static void pingHosts(final PrintStream out,
final Collection<InetSocketAddress> hosts) {
- FutureTask<Integer> ft = new FutureTask<>(new Callable<Integer>() {
- @Override
- public Integer call() {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
-
- // add information for this server only
- try {
- thisServer.writeServerData(dos);
-
- // create an 'Entity' that can be sent out to all hosts
- Entity<String> entity = Entity.entity(
- new String(Base64.getEncoder().encode(bos.toByteArray()),
- StandardCharsets.UTF_8),
- MediaType.APPLICATION_OCTET_STREAM_TYPE);
-
- // loop through hosts
- for (InetSocketAddress host : hosts) {
- HttpClient client = null;
-
- try {
- client = buildClient(host.toString(), host,
+ FutureTask<Integer> ft = new FutureTask<>(() -> {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+
+ // add information for this server only
+ try {
+ thisServer.writeServerData(dos);
+
+ // create an 'Entity' that can be sent out to all hosts
+ Entity<String> entity = Entity.entity(
+ new String(Base64.getEncoder().encode(bos.toByteArray()),
+ StandardCharsets.UTF_8),
+ MediaType.APPLICATION_OCTET_STREAM_TYPE);
+
+ // loop through hosts
+ for (InetSocketAddress host : hosts) {
+ HttpClient httpClient = null;
+
+ try {
+ httpClient = buildClient(host.toString(), host,
socketAddressToName(host));
- getTarget(client).path("admin").request().post(entity);
- client.shutdown();
- client = null;
- } catch (KeyManagementException | NoSuchAlgorithmException e) {
- out.println(host + ": Unable to create client connection");
- logger.error("Server.pingHosts error", e);
- } catch (NoSuchFieldException | IllegalAccessException e) {
- out.println(host + ": Unable to get link to target");
- logger.error("Server.pingHosts error", e);
- } catch (Exception e) {
- out.println(host + ": " + e);
- logger.error("Server.pingHosts error", e);
- }
- if (client != null) {
- client.shutdown();
- }
+ getTarget(httpClient).path("admin").request().post(entity);
+ httpClient.shutdown();
+ httpClient = null;
+ } catch (KeyManagementException | NoSuchAlgorithmException e) {
+ out.println(host + ": Unable to create client connection");
+ logger.error(PINGHOSTS_ERROR, e);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ out.println(host + ": Unable to get link to target");
+ logger.error(PINGHOSTS_ERROR, e);
+ } catch (Exception e) {
+ out.println(host + ": " + e);
+ logger.error(PINGHOSTS_ERROR, e);
+ }
+ if (httpClient != null) {
+ httpClient.shutdown();
}
- } catch (IOException e) {
- out.println("Unable to generate 'ping' data: " + e);
- logger.error("Server.pingHosts error", e);
}
- return 0;
+ } catch (IOException e) {
+ out.println("Unable to generate 'ping' data: " + e);
+ logger.error(PINGHOSTS_ERROR, e);
}
+ return 0;
});
MainLoop.queueWork(ft);
try {
ft.get(60, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ } catch (InterruptedException e) {
+ logger.error("Server.pingHosts: interrupted waiting for queued work", e);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException | TimeoutException e) {
logger.error("Server.pingHosts: error waiting for queued work", e);
}
}
@@ -1215,16 +1206,17 @@ public class Server implements Comparable<Server> {
* @param out the 'PrintStream' to dump the table to
*/
public static void dumpHosts(final PrintStream out) {
- FutureTask<Integer> ft = new FutureTask<Integer>(new Callable<Integer>() {
- public Integer call() {
- dumpHostsInternal(out);
- return 0;
- }
+ FutureTask<Integer> ft = new FutureTask<>(() -> {
+ dumpHostsInternal(out);
+ return 0;
});
MainLoop.queueWork(ft);
try {
ft.get(60, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ } catch (InterruptedException e) {
+ logger.error("Server.dumpHosts: interrupted waiting for queued work", e);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException | TimeoutException e) {
logger.error("Server.dumpHosts: error waiting for queued work", e);
}
}
@@ -1278,12 +1270,6 @@ public class Server implements Comparable<Server> {
} else if (localNotifyList.contains(server)) {
thisOne = "n";
}
- /*
- else if (newHosts.contains(server))
- {
- thisOne = "N";
- }
- */
if (siteData) {
String siteIp = "";
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ServerPoolProperties.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ServerPoolProperties.java
index fb6a791e..61188e6b 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ServerPoolProperties.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ServerPoolProperties.java
@@ -235,6 +235,13 @@ public class ServerPoolProperties {
private static Properties properties = new Properties();
/**
+ * Hide implicit public constructor.
+ */
+ private ServerPoolProperties() {
+ // everything here is static -- no instances of this class are created
+ }
+
+ /**
* Store the application properties values.
*
* @param properties the properties to save
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java
index 7e4b795f..65804082 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java
@@ -101,8 +101,8 @@ public class TargetLock implements Lock, Serializable {
private static ReferenceQueue<TargetLock> abandoned = new ReferenceQueue<>();
// some status codes
- static final int ACCEPTED = 202; //Response.Status.ACCEPTED.getStatusCode();
- static final int NO_CONTENT = 204; //Response.Status.NO_CONTENT.getStatusCode();
+ static final int ACCEPTED = 202; //Response.Status.ACCEPTED.getStatusCode()
+ static final int NO_CONTENT = 204; //Response.Status.NO_CONTENT.getStatusCode()
static final int LOCKED = 423;
// Values extracted from properties
@@ -131,13 +131,21 @@ public class TargetLock implements Lock, Serializable {
// this is used to notify the application when a lock is available,
// or if it is not available
- private LockCallback owner;
+ private volatile LockCallback owner;
// This is what is actually called by the infrastructure to do the owner
// notification. The owner may be running in a Drools session, in which case
// the actual notification should be done within that thread -- the 'context'
// object ensures that it happens this way.
- private LockCallback context;
+ private volatile LockCallback context;
+
+ // HTTP query parameters
+ private static final String QP_KEY = "key";
+ private static final String QP_OWNER = "owner";
+ private static final String QP_UUID = "uuid";
+ private static final String QP_WAIT = "wait";
+ private static final String QP_SERVER = "server";
+ private static final String QP_TTL = "ttl";
/**
* This method triggers registration of 'eventHandler', and also extracts
@@ -221,7 +229,7 @@ public class TargetLock implements Lock, Serializable {
if (session != null) {
// deliver through a 'PolicySessionContext' class
Object lcontext = session.getAdjunct(PolicySessionContext.class);
- if (lcontext == null || !(lcontext instanceof LockCallback)) {
+ if (!(lcontext instanceof LockCallback)) {
context = new PolicySessionContext(session);
session.setAdjunct(PolicySessionContext.class, context);
} else {
@@ -301,11 +309,11 @@ public class TargetLock implements Lock, Serializable {
@Override
public WebTarget webTarget(WebTarget webTarget) {
return webTarget
- .queryParam("key", key)
- .queryParam("owner", ownerKey)
- .queryParam("uuid", identity.uuid.toString())
- .queryParam("wait", waitForLock)
- .queryParam("ttl", timeToLive);
+ .queryParam(QP_KEY, key)
+ .queryParam(QP_OWNER, ownerKey)
+ .queryParam(QP_UUID, identity.uuid.toString())
+ .queryParam(QP_WAIT, waitForLock)
+ .queryParam(QP_TTL, timeToLive);
}
/**
@@ -323,14 +331,13 @@ public class TargetLock implements Lock, Serializable {
* 423 (Locked) - lock in use, and 'waitForLock' is 'false'
*/
switch (response.getStatus()) {
- case NO_CONTENT: {
+ case NO_CONTENT:
// lock successful
setState(State.ACTIVE);
context.lockAvailable(TargetLock.this);
break;
- }
- case LOCKED: {
+ case LOCKED:
// failed -- lock in use, and 'waitForLock == false'
setState(State.FREE);
synchronized (localLocks) {
@@ -340,13 +347,12 @@ public class TargetLock implements Lock, Serializable {
wr.clear();
context.lockUnavailable(TargetLock.this);
break;
- }
case ACCEPTED:
break;
default:
- logger.error("Unknown status: ", response.getStatus());
+ logger.error("Unknown status: {}", response.getStatus());
break;
}
}
@@ -434,6 +440,7 @@ public class TargetLock implements Lock, Serializable {
*/
@Override
public void extend(int holdSec, LockCallback callback) {
+ // not implemented yet
}
/********************/
@@ -473,7 +480,8 @@ public class TargetLock implements Lock, Serializable {
if (!Bucket.isKeyOnThisServer(key)) {
// this is the wrong server -- forward to the correct one
// (we can use this thread)
- if ((ttl -= 1) > 0) {
+ ttl -= 1;
+ if (ttl > 0) {
Server server = Bucket.bucketToServer(Bucket.bucketNumber(key));
if (server != null) {
WebTarget webTarget = server.getWebTarget("lock/lock");
@@ -483,11 +491,11 @@ public class TargetLock implements Lock, Serializable {
server.getUuid(), key, ownerKey, uuid,
waitForLock, ttl);
return webTarget
- .queryParam("key", key)
- .queryParam("owner", ownerKey)
- .queryParam("uuid", uuid.toString())
- .queryParam("wait", waitForLock)
- .queryParam("ttl", String.valueOf(ttl))
+ .queryParam(QP_KEY, key)
+ .queryParam(QP_OWNER, ownerKey)
+ .queryParam(QP_UUID, uuid.toString())
+ .queryParam(QP_WAIT, waitForLock)
+ .queryParam(QP_TTL, String.valueOf(ttl))
.request().get();
}
}
@@ -527,7 +535,8 @@ public class TargetLock implements Lock, Serializable {
if (!Bucket.isKeyOnThisServer(key)) {
// this is the wrong server -- forward to the correct one
// (we can use this thread)
- if ((ttl -= 1) > 0) {
+ ttl -= 1;
+ if (ttl > 0) {
Server server = Bucket.bucketToServer(Bucket.bucketNumber(key));
if (server != null) {
WebTarget webTarget = server.getWebTarget("lock/free");
@@ -536,10 +545,10 @@ public class TargetLock implements Lock, Serializable {
+ "(key={},owner={},uuid={},ttl={})",
server.getUuid(), key, ownerKey, uuid, ttl);
return webTarget
- .queryParam("key", key)
- .queryParam("owner", ownerKey)
- .queryParam("uuid", uuid.toString())
- .queryParam("ttl", String.valueOf(ttl))
+ .queryParam(QP_KEY, key)
+ .queryParam(QP_OWNER, ownerKey)
+ .queryParam(QP_UUID, uuid.toString())
+ .queryParam(QP_TTL, String.valueOf(ttl))
.request().get();
}
}
@@ -575,7 +584,8 @@ public class TargetLock implements Lock, Serializable {
if (!Bucket.isKeyOnThisServer(ownerKey)) {
// this is the wrong server -- forward to the correct one
// (we can use this thread)
- if ((ttl -= 1) > 0) {
+ ttl -= 1;
+ if (ttl > 0) {
Server server = Bucket.bucketToServer(Bucket.bucketNumber(key));
if (server != null) {
WebTarget webTarget = server.getWebTarget("lock/locked");
@@ -584,10 +594,10 @@ public class TargetLock implements Lock, Serializable {
+ "(key={},owner={},uuid={},ttl={})",
server.getUuid(), key, ownerKey, uuid, ttl);
return webTarget
- .queryParam("key", key)
- .queryParam("owner", ownerKey)
- .queryParam("uuid", uuid.toString())
- .queryParam("ttl", String.valueOf(ttl))
+ .queryParam(QP_KEY, key)
+ .queryParam(QP_OWNER, ownerKey)
+ .queryParam(QP_UUID, uuid.toString())
+ .queryParam(QP_TTL, String.valueOf(ttl))
.request().get();
}
}
@@ -744,6 +754,7 @@ public class TargetLock implements Lock, Serializable {
*/
@Override
public void shutdown() {
+ // nothing needs to be done
}
/**
@@ -887,10 +898,10 @@ public class TargetLock implements Lock, Serializable {
@Override
public WebTarget webTarget(WebTarget webTarget) {
return webTarget
- .queryParam("key", key)
- .queryParam("owner", ownerKey)
- .queryParam("uuid", uuid.toString())
- .queryParam("ttl", timeToLive);
+ .queryParam(QP_KEY, key)
+ .queryParam(QP_OWNER, ownerKey)
+ .queryParam(QP_UUID, uuid.toString())
+ .queryParam(QP_TTL, timeToLive);
}
@Override
@@ -898,21 +909,19 @@ public class TargetLock implements Lock, Serializable {
logger.info("Free response={} (code={})",
response, response.getStatus());
switch (response.getStatus()) {
- case NO_CONTENT: {
+ case NO_CONTENT:
// free successful -- don't need to do anything
break;
- }
- case LOCKED: {
+ case LOCKED:
// free failed
logger.error("TargetLock free failed, "
+ "key={}, owner={}, uuid={}",
key, ownerKey, uuid);
break;
- }
default:
- logger.error("Unknown status: ", response.getStatus());
+ logger.error("Unknown status: {}", response.getStatus());
break;
}
}
@@ -986,12 +995,10 @@ public class TargetLock implements Lock, Serializable {
public void lockAvailable(final Lock lock) {
// Run 'owner.lockAvailable' within the Drools session
if (policySession != null) {
- policySession.getKieSession().insert(new DroolsRunnable() {
- @Override
- public void run() {
- ((TargetLock)lock).owner.lockAvailable(lock);
- }
- });
+ DroolsRunnable callback = () -> {
+ ((TargetLock)lock).owner.lockAvailable(lock);
+ };
+ policySession.getKieSession().insert(callback);
}
}
@@ -1002,12 +1009,10 @@ public class TargetLock implements Lock, Serializable {
public void lockUnavailable(Lock lock) {
// Run 'owner.unlockAvailable' within the Drools session
if (policySession != null) {
- policySession.getKieSession().insert(new DroolsRunnable() {
- @Override
- public void run() {
- ((TargetLock)lock).owner.lockUnavailable(lock);
- }
- });
+ DroolsRunnable callback = () -> {
+ ((TargetLock)lock).owner.lockUnavailable(lock);
+ };
+ policySession.getKieSession().insert(callback);
}
}
@@ -1218,16 +1223,16 @@ public class TargetLock implements Lock, Serializable {
*/
private static class LockEntry implements Serializable {
// string key identifying the lock
- String key;
+ private String key;
// string key identifying the owner
- String currentOwnerKey;
+ private String currentOwnerKey;
// UUID identifying the original 'TargetLock
- UUID currentOwnerUuid;
+ private UUID currentOwnerUuid;
// list of pending lock requests for this key
- Queue<Waiting> waitingList = new LinkedList<>();
+ private Queue<Waiting> waitingList = new LinkedList<>();
/**
* Constructor - initialize the 'LockEntry'.
@@ -1273,27 +1278,19 @@ public class TargetLock implements Lock, Serializable {
@Override
public WebTarget webTarget(WebTarget webTarget) {
return webTarget
- .queryParam("key", key)
- .queryParam("owner", currentOwnerKey)
- .queryParam("uuid", currentOwnerUuid.toString())
- .queryParam("ttl", timeToLive);
+ .queryParam(QP_KEY, key)
+ .queryParam(QP_OWNER, currentOwnerKey)
+ .queryParam(QP_UUID, currentOwnerUuid.toString())
+ .queryParam(QP_TTL, timeToLive);
}
@Override
public void response(Response response) {
logger.info("Locked response={} (code={})",
response, response.getStatus());
- switch (response.getStatus()) {
- case NO_CONTENT: {
- // successful -- we are done
- break;
- }
-
- default: {
- // notification failed -- free this one
- globalLocks.unlock(key, currentOwnerUuid);
- break;
- }
+ if (response.getStatus() != NO_CONTENT) {
+ // notification failed -- free this one
+ globalLocks.unlock(key, currentOwnerUuid);
}
}
});
@@ -1409,7 +1406,6 @@ public class TargetLock implements Lock, Serializable {
while (abandonedHandler != null) {
try {
Reference<? extends TargetLock> wr = abandoned.remove();
- TargetLock notify = null;
// At this point, we know that 'ref' is a
// 'WeakReference<TargetLock>' instance that has been abandoned,
@@ -1515,7 +1511,8 @@ public class TargetLock implements Lock, Serializable {
*/
static byte[] dumpLocksData(UUID serverUuid, int ttl) throws IOException {
if (!Server.getThisServer().getUuid().equals(serverUuid)) {
- if ((ttl -= 1) > 0) {
+ ttl -= 1;
+ if (ttl > 0) {
Server server = Server.getServer(serverUuid);
if (server != null) {
WebTarget webTarget =
@@ -1524,8 +1521,8 @@ public class TargetLock implements Lock, Serializable {
logger.info("Forwarding 'lock/dumpLocksData' to uuid {}",
serverUuid);
return webTarget
- .queryParam("server", serverUuid.toString())
- .queryParam("ttl", String.valueOf(ttl))
+ .queryParam(QP_SERVER, serverUuid.toString())
+ .queryParam(QP_TTL, String.valueOf(ttl))
.request().get(byte[].class);
}
}
@@ -1571,8 +1568,8 @@ public class TargetLock implements Lock, Serializable {
@Override
public WebTarget webTarget(WebTarget webTarget) {
return webTarget
- .queryParam("server", server.getUuid().toString())
- .queryParam("ttl", timeToLive);
+ .queryParam(QP_SERVER, server.getUuid().toString())
+ .queryParam(QP_TTL, timeToLive);
}
@Override
@@ -1621,128 +1618,144 @@ public class TargetLock implements Lock, Serializable {
// process the client-end data
for (ClientData clientData : hostData.clientDataList) {
- // 'true' if the bucket associated with this 'ClientData'
- // doesn't belong to the remote server, as far as we can tell
- boolean serverMismatch =
- Bucket.bucketToServer(clientData.bucketNumber) != server;
-
- // each 'ClientDataRecord' instance corresponds to an
- // active 'Identity' (TargetLock) instance
- for (ClientDataRecord cdr : clientData.clientDataRecords) {
- // update maximum 'key' and 'ownerKey' lengths
- updateKeyLength(cdr.identity.key);
- updateOwnerKeyLength(cdr.identity.ownerKey);
-
- // fetch UUID
- UUID uuid = cdr.identity.uuid;
-
- // fetch/generate 'MergeData' instance for this UUID
- MergedData md = mergedDataMap.get(uuid);
- if (md == null) {
- md = new MergedData(uuid);
- mergedDataMap.put(uuid, md);
- }
-
- // update 'MergedData.clientDataRecord'
- if (md.clientDataRecord == null) {
- md.clientDataRecord = cdr;
- } else {
- md.comment("Duplicate client entry for UUID");
- }
-
- if (serverMismatch) {
- // need to generate an additional error
- md.comment(server.toString()
- + "(client) does not own bucket "
- + clientData.bucketNumber);
- }
- }
+ populateLockData_clientData(clientData, server);
}
// process the server-end data
for (ServerData serverData : hostData.serverDataList) {
- // 'true' if the bucket associated with this 'ServerData'
- // doesn't belong to the remote server, as far as we can tell
- boolean serverMismatch =
- Bucket.bucketToServer(serverData.bucketNumber) != server;
-
- // each 'LockEntry' instance corresponds to the current holder
- // of a lock, and all requestors waiting for it to be freed
- for (LockEntry le : serverData.globalLocks.keyToEntry.values()) {
- // update maximum 'key' and 'ownerKey' lengths
- updateKeyLength(le.key);
- updateOwnerKeyLength(le.currentOwnerKey);
-
- // fetch uuid
- UUID uuid = le.currentOwnerUuid;
-
- // fetch/generate 'MergeData' instance for this UUID
- MergedData md = mergedDataMap.get(uuid);
- if (md == null) {
- md = new MergedData(uuid);
- mergedDataMap.put(uuid, md);
- }
+ populateLockData_serverData(serverData, server);
+ }
+ } else {
+ logger.error("TargetLock.DumpLocks.populateLockData: "
+ + "received data has class {}",
+ decodedData.getClass().getName());
+ }
+ }
- // update 'lockEntries' table entry
- if (lockEntries.get(le.key) != null) {
- md.comment("Duplicate server entry for key " + le.key);
- } else {
- lockEntries.put(le.key, le);
- }
+ private void populateLockData_clientData(ClientData clientData, Server server) {
+ // 'true' if the bucket associated with this 'ClientData'
+ // doesn't belong to the remote server, as far as we can tell
+ boolean serverMismatch =
+ Bucket.bucketToServer(clientData.bucketNumber) != server;
- // update 'MergedData.serverLockEntry'
- // (leave 'MergedData.serverWaiting' as 'null', because
- // this field is only used for waiting entries)
- if (md.serverLockEntry == null) {
- md.serverLockEntry = le;
- } else {
- md.comment("Duplicate server entry for UUID");
- }
+ // each 'ClientDataRecord' instance corresponds to an
+ // active 'Identity' (TargetLock) instance
+ for (ClientDataRecord cdr : clientData.clientDataRecords) {
+ // update maximum 'key' and 'ownerKey' lengths
+ updateKeyLength(cdr.identity.key);
+ updateOwnerKeyLength(cdr.identity.ownerKey);
- if (serverMismatch) {
- // need to generate an additional error
- md.comment(server.toString()
- + "(server) does not own bucket "
- + serverData.bucketNumber);
- }
+ // fetch UUID
+ UUID uuid = cdr.identity.uuid;
- // we need 'MergeData' entries for all waiting requests
- for (Waiting waiting : le.waitingList) {
- // update maximum 'ownerKey' length
- updateOwnerKeyLength(waiting.ownerKey);
+ // fetch/generate 'MergeData' instance for this UUID
+ MergedData md = mergedDataMap.get(uuid);
+ if (md == null) {
+ md = new MergedData(uuid);
+ mergedDataMap.put(uuid, md);
+ }
- // fetch uuid
- uuid = waiting.ownerUuid;
+ // update 'MergedData.clientDataRecord'
+ if (md.clientDataRecord == null) {
+ md.clientDataRecord = cdr;
+ } else {
+ md.comment("Duplicate client entry for UUID");
+ }
- // fetch/generate 'MergeData' instance for this UUID
- md = mergedDataMap.get(uuid);
- if (md == null) {
- md = new MergedData(uuid);
- mergedDataMap.put(uuid, md);
- }
+ if (serverMismatch) {
+ // need to generate an additional error
+ md.comment(server.toString()
+ + "(client) does not own bucket "
+ + clientData.bucketNumber);
+ }
+ }
+ }
- // update 'MergedData.serverLockEntry' and
- // 'MergedData.serverWaiting'
- if (md.serverLockEntry == null) {
- md.serverLockEntry = le;
- md.serverWaiting = waiting;
- } else {
- md.comment("Duplicate server entry for UUID");
- }
+ private void populateLockData_serverData(ServerData serverData, Server server) {
+ // 'true' if the bucket associated with this 'ServerData'
+ // doesn't belong to the remote server, as far as we can tell
+ boolean serverMismatch =
+ Bucket.bucketToServer(serverData.bucketNumber) != server;
- if (serverMismatch) {
- // need to generate an additional error
- md.comment(server.toString()
- + "(server) does not own bucket "
- + serverData.bucketNumber);
- }
- }
- }
+ // each 'LockEntry' instance corresponds to the current holder
+ // of a lock, and all requestors waiting for it to be freed
+ for (LockEntry le : serverData.globalLocks.keyToEntry.values()) {
+ // update maximum 'key' and 'ownerKey' lengths
+ updateKeyLength(le.key);
+ updateOwnerKeyLength(le.currentOwnerKey);
+
+ // fetch uuid
+ UUID uuid = le.currentOwnerUuid;
+
+ // fetch/generate 'MergeData' instance for this UUID
+ MergedData md = mergedDataMap.get(uuid);
+ if (md == null) {
+ md = new MergedData(uuid);
+ mergedDataMap.put(uuid, md);
+ }
+
+ // update 'lockEntries' table entry
+ if (lockEntries.get(le.key) != null) {
+ md.comment("Duplicate server entry for key " + le.key);
+ } else {
+ lockEntries.put(le.key, le);
+ }
+
+ // update 'MergedData.serverLockEntry'
+ // (leave 'MergedData.serverWaiting' as 'null', because
+ // this field is only used for waiting entries)
+ if (md.serverLockEntry == null) {
+ md.serverLockEntry = le;
+ } else {
+ md.comment("Duplicate server entry for UUID");
+ }
+
+ if (serverMismatch) {
+ // need to generate an additional error
+ md.comment(server.toString()
+ + "(server) does not own bucket "
+ + serverData.bucketNumber);
+ }
+
+ // we need 'MergeData' entries for all waiting requests
+ for (Waiting waiting : le.waitingList) {
+ populateLockData_serverData_waiting(
+ serverData, server, serverMismatch, le, waiting);
}
+ }
+ }
+
+ private void populateLockData_serverData_waiting(
+ ServerData serverData, Server server, boolean serverMismatch,
+ LockEntry le, Waiting waiting) {
+
+ // update maximum 'ownerKey' length
+ updateOwnerKeyLength(waiting.ownerKey);
+
+ // fetch uuid
+ UUID uuid = waiting.ownerUuid;
+
+ // fetch/generate 'MergeData' instance for this UUID
+ MergedData md = mergedDataMap.get(uuid);
+ if (md == null) {
+ md = new MergedData(uuid);
+ mergedDataMap.put(uuid, md);
+ }
+
+ // update 'MergedData.serverLockEntry' and
+ // 'MergedData.serverWaiting'
+ if (md.serverLockEntry == null) {
+ md.serverLockEntry = le;
+ md.serverWaiting = waiting;
} else {
- logger.error("TargetLock.DumpLocks.populateLockData: "
- + "received data has class "
- + decodedData.getClass().getName());
+ md.comment("Duplicate server entry for UUID");
+ }
+
+ if (serverMismatch) {
+ // need to generate an additional error
+ md.comment(server.toString()
+ + "(server) does not own bucket "
+ + serverData.bucketNumber);
}
}
@@ -1801,6 +1814,11 @@ public class TargetLock implements Lock, Serializable {
out.printf(format, "---", "---------", "----", "-----", "--------");
}
+ dump_serverTable(out);
+ dump_clientOnlyEntries(out);
+ }
+
+ private void dump_serverTable(PrintStream out) {
// iterate over the server table
for (LockEntry le : lockEntries.values()) {
// fetch merged data
@@ -1841,7 +1859,9 @@ public class TargetLock implements Lock, Serializable {
dumpMoreComments(out, md);
}
}
+ }
+ private void dump_clientOnlyEntries(PrintStream out) {
// client records that don't have matching server entries
for (MergedData md : clientOnlyEntries.values()) {
ClientDataRecord cdr = md.clientDataRecord;
@@ -2017,13 +2037,13 @@ public class TargetLock implements Lock, Serializable {
*/
static class HostData implements Serializable {
// the UUID of the host sending the data
- UUID hostUuid;
+ private UUID hostUuid;
// all of the information derived from the 'LocalLocks' data
- List<ClientData> clientDataList;
+ private List<ClientData> clientDataList;
// all of the information derived from the 'GlobalLocks' data
- List<ServerData> serverDataList;
+ private List<ServerData> serverDataList;
/**
* Constructor - this goes through all of the lock tables,
@@ -2086,10 +2106,10 @@ public class TargetLock implements Lock, Serializable {
*/
static class ClientData implements Serializable {
// number of the bucket
- int bucketNumber;
+ private int bucketNumber;
// all of the client locks within this bucket
- List<ClientDataRecord> clientDataRecords;
+ private List<ClientDataRecord> clientDataRecords;
/**
* Constructor - initially, there are no 'clientDataRecords'.
@@ -2108,11 +2128,11 @@ public class TargetLock implements Lock, Serializable {
*/
static class ClientDataRecord implements Serializable {
// contains key, ownerKey, uuid
- Identity identity;
+ private Identity identity;
// state field of 'TargetLock'
// (may be 'null' if there is no 'TargetLock')
- State state;
+ private State state;
/**
* Constructor - initialize the fields.
@@ -2132,10 +2152,10 @@ public class TargetLock implements Lock, Serializable {
*/
static class ServerData implements Serializable {
// number of the bucket
- int bucketNumber;
+ private int bucketNumber;
// server-side data associated with a single bucket
- GlobalLocks globalLocks;
+ private GlobalLocks globalLocks;
/**
* Constructor - initialize the fields.
@@ -2158,15 +2178,15 @@ public class TargetLock implements Lock, Serializable {
*/
static class AuditData implements Serializable {
// sending UUID
- UUID hostUuid;
+ private UUID hostUuid;
// client records that currently exist, or records to be cleared
// (depending upon message) -- client/server is from the senders side
- List<Identity> clientData;
+ private List<Identity> clientData;
// server records that currently exist, or records to be cleared
// (depending upon message) -- client/server is from the senders side
- List<Identity> serverData;
+ private List<Identity> serverData;
/**
* Constructor - set 'hostUuid' to the current host, and start with
@@ -2174,8 +2194,8 @@ public class TargetLock implements Lock, Serializable {
*/
AuditData() {
hostUuid = Server.getThisServer().getUuid();
- clientData = new ArrayList<Identity>();
- serverData = new ArrayList<Identity>();
+ clientData = new ArrayList<>();
+ serverData = new ArrayList<>();
}
/**
@@ -2191,8 +2211,17 @@ public class TargetLock implements Lock, Serializable {
AuditData response = new AuditData();
// compare remote servers client data with our server data
+ generateResponse_clientEnd(response, includeWarnings);
+
+ // test server data
+ generateResponse_serverEnd(response, includeWarnings);
+
+ return response;
+ }
+
+ private void generateResponse_clientEnd(AuditData response, boolean includeWarnings) {
for (Identity identity : clientData) {
- // we are the server in this case
+ // remote end is the client, and we are the server
Bucket bucket = Bucket.getBucket(identity.key);
GlobalLocks globalLocks =
bucket.getAdjunctDontCreate(GlobalLocks.class);
@@ -2240,10 +2269,11 @@ public class TargetLock implements Lock, Serializable {
// it was 'clientData' to the sender, but 'serverData' to us
response.serverData.add(identity);
}
+ }
- // test server data
+ private void generateResponse_serverEnd(AuditData response, boolean includeWarnings) {
for (Identity identity : serverData) {
- // we are the client in this case
+ // remote end is the server, and we are the client
Bucket bucket = Bucket.getBucket(identity.ownerKey);
LocalLocks localLocks =
bucket.getAdjunctDontCreate(LocalLocks.class);
@@ -2275,8 +2305,6 @@ public class TargetLock implements Lock, Serializable {
}
response.clientData.add(identity);
}
-
- return response;
}
/**
@@ -2400,19 +2428,14 @@ public class TargetLock implements Lock, Serializable {
* Run a single audit cycle.
*/
static void runAudit() {
- try {
- logger.info("Starting TargetLock audit");
- Audit audit = new Audit();
+ logger.info("Starting TargetLock audit");
+ Audit audit = new Audit();
- // populate 'auditMap' table
- audit.build();
+ // populate 'auditMap' table
+ audit.build();
- // send to all of the servers in 'auditMap' (may include this server)
- audit.send();
- } catch (InterruptedException e) {
- logger.error("TargetLock audit interrupted", e);
- Thread.currentThread().interrupt();
- }
+ // send to all of the servers in 'auditMap' (may include this server)
+ audit.send();
}
/**
@@ -2441,63 +2464,59 @@ public class TargetLock implements Lock, Serializable {
// this needs to run in the 'MainLoop' thread, because it is dependent
// upon the list of servers, and our position in this list
- MainLoop.queueWork(new Runnable() {
- /**
- * {@inheritDoc}
- */
- @Override
- public void run() {
- // current list of servers
- Collection<Server> servers = Server.getServers();
+ MainLoop.queueWork(() -> {
+ // this runs in the 'MainLoop' thread
- // count of the number of servers
- int count = servers.size();
+ // current list of servers
+ Collection<Server> servers = Server.getServers();
- // will contain our position in this list
- int index = 0;
+ // count of the number of servers
+ int count = servers.size();
- // current server
- Server thisServer = Server.getThisServer();
+ // will contain our position in this list
+ int index = 0;
- for (Server server : servers) {
- if (server == thisServer) {
- break;
- }
- index += 1;
+ // current server
+ Server thisServer = Server.getThisServer();
+
+ for (Server server : servers) {
+ if (server == thisServer) {
+ break;
}
+ index += 1;
+ }
- // if index == count, we didn't find this server
- // (which shouldn't happen)
-
- if (index < count) {
- // The servers are ordered by UUID, and 'index' is this
- // server's position within the list. Suppose the period is
- // 60000 (60 seconds), and there are 5 servers -- the first one
- // will run the audit at 0 seconds after the minute, the next
- // at 12 seconds after the minute, then 24, 36, 48.
- long offset = (period * index) / count;
-
- // the earliest time we want the audit to run
- long time = System.currentTimeMillis() + gracePeriod;
- long startTime = time - (time % period) + offset;
- if (startTime <= time) {
- startTime += period;
+ // if index == count, we didn't find this server
+ // (which shouldn't happen)
+
+ if (index < count) {
+ // The servers are ordered by UUID, and 'index' is this
+ // server's position within the list. Suppose the period is
+ // 60000 (60 seconds), and there are 5 servers -- the first one
+ // will run the audit at 0 seconds after the minute, the next
+ // at 12 seconds after the minute, then 24, 36, 48.
+ long offset = (period * index) / count;
+
+ // the earliest time we want the audit to run
+ long time = System.currentTimeMillis() + gracePeriod;
+ long startTime = time - (time % period) + offset;
+ if (startTime <= time) {
+ startTime += period;
+ }
+ synchronized (Audit.class) {
+ if (timerTask != null) {
+ timerTask.cancel();
}
- synchronized (Audit.class) {
- if (timerTask != null) {
- timerTask.cancel();
+ timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ runAudit();
}
- timerTask = new TimerTask() {
- @Override
- public void run() {
- runAudit();
- }
- };
+ };
- // now, schedule the timer
- Util.timer.scheduleAtFixedRate(
- timerTask, new Date(startTime), period);
- }
+ // now, schedule the timer
+ Util.timer.scheduleAtFixedRate(
+ timerTask, new Date(startTime), period);
}
}
});
@@ -2514,7 +2533,8 @@ public class TargetLock implements Lock, Serializable {
*/
static byte[] incomingAudit(UUID serverUuid, int ttl, byte[] encodedData) {
if (!Server.getThisServer().getUuid().equals(serverUuid)) {
- if ((ttl -= 1) > 0) {
+ ttl -= 1;
+ if (ttl > 0) {
Server server = Server.getServer(serverUuid);
if (server != null) {
WebTarget webTarget = server.getWebTarget("lock/audit");
@@ -2525,8 +2545,8 @@ public class TargetLock implements Lock, Serializable {
Entity.entity(new String(encodedData),
MediaType.APPLICATION_OCTET_STREAM_TYPE);
return webTarget
- .queryParam("server", serverUuid.toString())
- .queryParam("ttl", String.valueOf(ttl))
+ .queryParam(QP_SERVER, serverUuid.toString())
+ .queryParam(QP_TTL, String.valueOf(ttl))
.request().post(entity, byte[].class);
}
}
@@ -2556,53 +2576,63 @@ public class TargetLock implements Lock, Serializable {
Bucket bucket = Bucket.getBucket(i);
// client data
- LocalLocks localLocks =
- bucket.getAdjunctDontCreate(LocalLocks.class);
- if (localLocks != null) {
- synchronized (localLocks) {
- // we have client data for this bucket
- for (Identity identity :
- localLocks.weakReferenceToIdentity.values()) {
- // find or create the 'AuditData' instance associated
- // with the server owning the 'key'
- AuditData auditData = getAuditData(identity.key);
- if (auditData != null) {
- auditData.clientData.add(identity);
- }
+ build_clientData(bucket);
+
+ // server data
+ build_serverData(bucket);
+ }
+ }
+
+ private void build_clientData(Bucket bucket) {
+ // client data
+ LocalLocks localLocks =
+ bucket.getAdjunctDontCreate(LocalLocks.class);
+ if (localLocks != null) {
+ synchronized (localLocks) {
+ // we have client data for this bucket
+ for (Identity identity :
+ localLocks.weakReferenceToIdentity.values()) {
+ // find or create the 'AuditData' instance associated
+ // with the server owning the 'key'
+ AuditData auditData = getAuditData(identity.key);
+ if (auditData != null) {
+ auditData.clientData.add(identity);
}
}
}
+ }
+ }
- // server data
- GlobalLocks globalLocks =
- bucket.getAdjunctDontCreate(GlobalLocks.class);
- if (globalLocks != null) {
- // we have server data for this bucket
- Map<String, LockEntry> keyToEntry = globalLocks.keyToEntry;
- synchronized (keyToEntry) {
- for (LockEntry le : keyToEntry.values()) {
+ private void build_serverData(Bucket bucket) {
+ // server data
+ GlobalLocks globalLocks =
+ bucket.getAdjunctDontCreate(GlobalLocks.class);
+ if (globalLocks != null) {
+ // we have server data for this bucket
+ Map<String, LockEntry> keyToEntry = globalLocks.keyToEntry;
+ synchronized (keyToEntry) {
+ for (LockEntry le : keyToEntry.values()) {
+ // find or create the 'AuditData' instance associated
+ // with the current 'ownerKey'
+ AuditData auditData = getAuditData(le.currentOwnerKey);
+ if (auditData != null) {
+ // create an 'Identity' entry, and add it to
+ // the list associated with the remote server
+ auditData.serverData.add(
+ new Identity(le.key, le.currentOwnerKey,
+ le.currentOwnerUuid));
+ }
+
+ for (Waiting waiting : le.waitingList) {
// find or create the 'AuditData' instance associated
- // with the current 'ownerKey'
- AuditData auditData = getAuditData(le.currentOwnerKey);
+ // with the waiting entry 'ownerKey'
+ auditData = getAuditData(waiting.ownerKey);
if (auditData != null) {
// create an 'Identity' entry, and add it to
// the list associated with the remote server
auditData.serverData.add(
- new Identity(le.key, le.currentOwnerKey,
- le.currentOwnerUuid));
- }
-
- for (Waiting waiting : le.waitingList) {
- // find or create the 'AuditData' instance associated
- // with the waiting entry 'ownerKey'
- auditData = getAuditData(waiting.ownerKey);
- if (auditData != null) {
- // create an 'Identity' entry, and add it to
- // the list associated with the remote server
- auditData.serverData.add(
- new Identity(le.key, waiting.ownerKey,
- waiting.ownerUuid));
- }
+ new Identity(le.key, waiting.ownerKey,
+ waiting.ownerUuid));
}
}
}
@@ -2618,12 +2648,8 @@ public class TargetLock implements Lock, Serializable {
// map 'key -> bucket number', and then 'bucket number' -> 'server'
Server server = Bucket.bucketToServer(Bucket.bucketNumber(key));
if (server != null) {
- AuditData auditData = auditMap.get(server);
- if (auditData == null) {
- // doesn't exist yet -- create it
- auditData = new AuditData();
- auditMap.put(server, auditData);
- }
+ AuditData auditData =
+ auditMap.computeIfAbsent(server, sk -> new AuditData());
return auditData;
}
@@ -2635,7 +2661,7 @@ public class TargetLock implements Lock, Serializable {
* Using the collected 'auditMap', send out the messages to all of the
* servers.
*/
- void send() throws InterruptedException {
+ void send() {
if (auditMap.isEmpty()) {
logger.info("TargetLock audit: no locks on this server");
} else {
@@ -2644,178 +2670,176 @@ public class TargetLock implements Lock, Serializable {
}
for (final Server server : auditMap.keySet()) {
- // fetch audit data
- AuditData auditData = auditMap.get(server);
+ send_server(server);
+ }
+ }
- if (server == Server.getThisServer()) {
- // process this locally
- final AuditData respData = auditData.generateResponse(true);
- if (respData.clientData.isEmpty()
- && respData.serverData.isEmpty()) {
- // no mismatches
- logger.info("TargetLock.Audit.send: "
- + "no errors from self ({})", server);
- continue;
+ private void send_server(final Server server) {
+ // fetch audit data
+ AuditData auditData = auditMap.get(server);
+
+ if (server == Server.getThisServer()) {
+ // process this locally
+ final AuditData respData = auditData.generateResponse(true);
+ if (respData.clientData.isEmpty()
+ && respData.serverData.isEmpty()) {
+ // no mismatches
+ logger.info("TargetLock.Audit.send: "
+ + "no errors from self ({})", server);
+ return;
+ }
+
+ // do the rest in a separate thread
+ server.getThreadPool().execute(() -> {
+ // wait a few seconds, and see if we still know of these
+ // errors
+ if (AuditPostResponse.responseSupport(
+ respData, "self (" + server + ")",
+ "TargetLock.Audit.send")) {
+ // a return falue of 'true' either indicates the
+ // mismatches were resolved after a retry, or we
+ // received an interrupt, and need to abort
+ return;
}
- // do the rest in a separate thread
- server.getThreadPool().execute(new Runnable() {
- @Override
- public void run() {
- // wait a few seconds, and see if we still know of these
- // errors
- logger.info("TargetLock.Audit.send: "
- + "mismatches from self ({})", server);
- try {
- Thread.sleep(auditRetryDelay);
- } catch (InterruptedException e) {
- logger.error("TargetLock.Audit.send: Interrupted "
- + "handling audit response from self ({})",
- server);
- // just abort
- Thread.currentThread().interrupt();
- return;
- }
+ // any mismatches left in 'respData' are still issues
+ respData.processResponse(server);
+ });
+ return;
+ }
- // This will check against our own data -- any mismatches
- // mean that things have changed since we sent out the
- // first message. We will remove any mismatches from
- // 'respData', and see if there are any left.
- AuditData mismatches = respData.generateResponse(false);
-
- respData.serverData.removeAll(mismatches.clientData);
- respData.clientData.removeAll(mismatches.serverData);
-
- if (respData.clientData.isEmpty()
- && respData.serverData.isEmpty()) {
- // no mismatches --
- // there must have been transient issues on our side
- logger.info("TargetLock.Audit.send: "
- + "no mismatches from self "
- + "({}) after retry", server);
- return;
- }
+ // serialize
+ byte[] encodedData = auditData.encode();
+ if (encodedData == null) {
+ // error has already been displayed
+ return;
+ }
- // any mismatches left in 'respData' are still issues
- respData.processResponse(server);
- }
- });
- continue;
- }
+ // generate entity
+ Entity<String> entity =
+ Entity.entity(new String(encodedData),
+ MediaType.APPLICATION_OCTET_STREAM_TYPE);
- // serialize
- byte[] encodedData = auditData.encode();
- if (encodedData == null) {
- // error has already been displayed
- continue;
- }
+ server.post("lock/audit", entity, new AuditPostResponse(server));
+ }
+ }
- // generate entity
- Entity<String> entity =
- Entity.entity(new String(encodedData),
- MediaType.APPLICATION_OCTET_STREAM_TYPE);
+ static class AuditPostResponse implements Server.PostResponse {
+ private Server server;
- server.post("lock/audit", entity, new Server.PostResponse() {
- @Override
- public WebTarget webTarget(WebTarget webTarget) {
- // include the 'uuid' keyword
- return webTarget
- .queryParam("server", server.getUuid().toString())
- .queryParam("ttl", timeToLive);
- }
+ AuditPostResponse(Server server) {
+ this.server = server;
+ }
- @Override
- public void response(Response response) {
- // process the response here
- AuditData respData =
- AuditData.decode(response.readEntity(byte[].class));
- if (respData == null) {
- logger.error("TargetLock.Audit.send: "
- + "couldn't process response from {}",
- server);
- return;
- }
+ @Override
+ public WebTarget webTarget(WebTarget webTarget) {
+ // include the 'uuid' keyword
+ return webTarget
+ .queryParam(QP_SERVER, server.getUuid().toString())
+ .queryParam(QP_TTL, timeToLive);
+ }
- // if we reach this point, we got a response
- if (respData.clientData.isEmpty()
- && respData.serverData.isEmpty()) {
- // no mismatches
- logger.info("TargetLock.Audit.send: "
- + "no errors from {}", server);
- return;
- }
+ @Override
+ public void response(Response response) {
+ // process the response here
+ AuditData respData =
+ AuditData.decode(response.readEntity(byte[].class));
+ if (respData == null) {
+ logger.error("TargetLock.Audit.send: "
+ + "couldn't process response from {}",
+ server);
+ return;
+ }
- // wait a few seconds, and see if we still know of these
- // errors
- logger.info("TargetLock.Audit.send: mismatches from {}",
- server);
- try {
- Thread.sleep(auditRetryDelay);
- } catch (InterruptedException e) {
- logger.error("TargetLock.Audit.send: Interrupted "
- + "handling audit response from {}",
- server);
- // just abort
- Thread.currentThread().interrupt();
- return;
- }
+ // if we reach this point, we got a response
+ if (respData.clientData.isEmpty()
+ && respData.serverData.isEmpty()) {
+ // no mismatches
+ logger.info("TargetLock.Audit.send: "
+ + "no errors from {}", server);
+ return;
+ }
- // This will check against our own data -- any mismatches
- // mean that things have changed since we sent out the
- // first message. We will remove any mismatches from
- // 'respData', and see if there are any left.
- AuditData mismatches = respData.generateResponse(false);
-
- respData.serverData.removeAll(mismatches.clientData);
- respData.clientData.removeAll(mismatches.serverData);
-
- if (respData.clientData.isEmpty()
- && respData.serverData.isEmpty()) {
- // no mismatches --
- // there must have been transient issues on our side
- logger.info("TargetLock.Audit.send: no mismatches from "
- + "{} after retry", server);
- return;
- }
+ // wait a few seconds, and see if we still know of these
+ // errors
+ if (responseSupport(respData, server, "AuditPostResponse.response")) {
+ // a return falue of 'true' either indicates the mismatches
+ // were resolved after a retry, or we received an interrupt,
+ // and need to abort
+ return;
+ }
- // any mismatches left in 'respData' are still there --
- // hopefully, they are transient issues on the other side
- AuditData auditData = new AuditData();
- auditData.clientData = respData.serverData;
- auditData.serverData = respData.clientData;
-
- // serialize
- byte[] encodedData = auditData.encode();
- if (encodedData == null) {
- // error has already been displayed
- return;
- }
+ // any mismatches left in 'respData' are still there --
+ // hopefully, they are transient issues on the other side
+ AuditData auditData = new AuditData();
+ auditData.clientData = respData.serverData;
+ auditData.serverData = respData.clientData;
- // generate entity
- Entity<String> entity =
- Entity.entity(new String(encodedData),
- MediaType.APPLICATION_OCTET_STREAM_TYPE);
-
- // send new list to other end
- response = server
- .getWebTarget("lock/audit")
- .queryParam("server", server.getUuid().toString())
- .queryParam("ttl", timeToLive)
- .request().post(entity);
-
- respData = AuditData.decode(response.readEntity(byte[].class));
- if (respData == null) {
- logger.error("TargetLock.auditDataBuilder.send: "
- + "couldn't process response from {}",
- server);
- return;
- }
+ // serialize
+ byte[] encodedData = auditData.encode();
+ if (encodedData == null) {
+ // error has already been displayed
+ return;
+ }
- // if there are mismatches left, they are presumably real
- respData.processResponse(server);
- }
- });
+ // generate entity
+ Entity<String> entity =
+ Entity.entity(new String(encodedData),
+ MediaType.APPLICATION_OCTET_STREAM_TYPE);
+
+ // send new list to other end
+ response = server
+ .getWebTarget("lock/audit")
+ .queryParam(QP_SERVER, server.getUuid().toString())
+ .queryParam(QP_TTL, timeToLive)
+ .request().post(entity);
+
+ respData = AuditData.decode(response.readEntity(byte[].class));
+ if (respData == null) {
+ logger.error("TargetLock.auditDataBuilder.send: "
+ + "couldn't process response from {}",
+ server);
+ return;
+ }
+
+ // if there are mismatches left, they are presumably real
+ respData.processResponse(server);
+ }
+
+ // Handle mismatches indicated by an audit response -- a return value of
+ // 'true' indicates that there were no mismatches after a retry, or
+ // we received an interrupt. In either case, the caller returns.
+ private static boolean responseSupport(AuditData respData, Object serverString, String caller) {
+ logger.info("{}: mismatches from {}", caller, serverString);
+ try {
+ Thread.sleep(auditRetryDelay);
+ } catch (InterruptedException e) {
+ logger.error("{}: Interrupted handling audit response from {}",
+ caller, serverString);
+ // just abort
+ Thread.currentThread().interrupt();
+ return true;
+ }
+
+ // This will check against our own data -- any mismatches
+ // mean that things have changed since we sent out the
+ // first message. We will remove any mismatches from
+ // 'respData', and see if there are any left.
+ AuditData mismatches = respData.generateResponse(false);
+
+ respData.serverData.removeAll(mismatches.clientData);
+ respData.clientData.removeAll(mismatches.serverData);
+
+ if (respData.clientData.isEmpty()
+ && respData.serverData.isEmpty()) {
+ // no mismatches --
+ // there must have been transient issues on our side
+ logger.info("{}: no mismatches from {} after retry",
+ caller, serverString);
+ return true;
}
+
+ return false;
}
}
}
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Util.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Util.java
index 2ad0a401..66a9eac3 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Util.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Util.java
@@ -43,6 +43,13 @@ public class Util {
public static final Timer timer = new Timer("Server Pool Timer", true);
/**
+ * Hide implicit public constructor.
+ */
+ private Util() {
+ // everything here is static -- no instances of this class are created
+ }
+
+ /**
* Internally, UUID objects use two 'long' variables, and the default
* comparison is signed, which means the order for the first and 16th digit
* is: '89abcdef01234567', while the order for the rest is
@@ -50,21 +57,18 @@ public class Util {
* The following comparator uses the ordering '0123456789abcdef' for all
* digits.
*/
- public static final Comparator<UUID> uuidComparator =
- new Comparator<UUID>() {
- public int compare(UUID u1, UUID u2) {
- // compare most significant portion
- int rval = Long.compareUnsigned(u1.getMostSignificantBits(),
- u2.getMostSignificantBits());
- if (rval == 0) {
- // most significant portion matches --
- // compare least significant portion
- rval = Long.compareUnsigned(u1.getLeastSignificantBits(),
- u2.getLeastSignificantBits());
- }
- return rval;
- }
- };
+ public static final Comparator<UUID> uuidComparator = (UUID u1, UUID u2) -> {
+ // compare most significant portion
+ int rval = Long.compareUnsigned(u1.getMostSignificantBits(),
+ u2.getMostSignificantBits());
+ if (rval == 0) {
+ // most significant portion matches --
+ // compare least significant portion
+ rval = Long.compareUnsigned(u1.getLeastSignificantBits(),
+ u2.getLeastSignificantBits());
+ }
+ return rval;
+ };
/* ============================================================ */
@@ -104,7 +108,6 @@ public class Util {
try {
return IOUtils.toString(input, StandardCharsets.UTF_8);
} catch (IOException e) {
- // TODO Auto-generated catch block
logger.error("Util.inputStreamToString error", e);
return "";
}
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java
index 295194d2..60e740c5 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java
@@ -20,7 +20,6 @@
package org.onap.policy.drools.serverpool.persistence;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
@@ -73,6 +72,12 @@ import org.slf4j.LoggerFactory;
public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
private static Logger logger = LoggerFactory.getLogger(Persistence.class);
+ // HTTP query parameters
+ private static final String QP_BUCKET = "bucket";
+ private static final String QP_SESSION = "session";
+ private static final String QP_COUNT = "count";
+ private static final String QP_DEST = "dest";
+
/***************************************/
/* 'PolicySessionFeatureApi' interface */
/***************************************/
@@ -209,7 +214,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
* @param bucket the bucket containing the 'GlobalLocks' adjunct
* @param globalLocks the 'GlobalLocks' adjunct
*/
- private static void sendLockDataToBackups(Bucket bucket, GlobalLocks globalLocks) {
+ private static void sendLockDataToBackups(final Bucket bucket, final GlobalLocks globalLocks) {
final int bucketNumber = bucket.getIndex();
SenderBucketData sbd = bucket.getAdjunct(SenderBucketData.class);
int lockCount = 0;
@@ -245,18 +250,15 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
for (final Server server : servers) {
if (server != null) {
// send out REST command
- server.getThreadPool().execute(new Runnable() {
- @Override
- public void run() {
- WebTarget webTarget =
- server.getWebTarget("persistence/lock");
- if (webTarget != null) {
- webTarget
- .queryParam("bucket", bucketNumber)
- .queryParam("count", count)
- .queryParam("dest", server.getUuid())
- .request().post(entity);
- }
+ server.getThreadPool().execute(() -> {
+ WebTarget webTarget =
+ server.getWebTarget("persistence/lock");
+ if (webTarget != null) {
+ webTarget
+ .queryParam(QP_BUCKET, bucketNumber)
+ .queryParam(QP_COUNT, count)
+ .queryParam(QP_DEST, server.getUuid())
+ .request().post(entity);
}
});
}
@@ -339,21 +341,18 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
for (final Server server : servers) {
if (server != null) {
// send out REST command
- server.getThreadPool().execute(new Runnable() {
- @Override
- public void run() {
- WebTarget webTarget =
- server.getWebTarget("persistence/session");
- if (webTarget != null) {
- webTarget
- .queryParam("bucket",
- bucket.getIndex())
- .queryParam("session",
- encodedSessionName)
- .queryParam("count", count)
- .queryParam("dest", server.getUuid())
- .request().post(entity);
- }
+ server.getThreadPool().execute(() -> {
+ WebTarget webTarget =
+ server.getWebTarget("persistence/session");
+ if (webTarget != null) {
+ webTarget
+ .queryParam(QP_BUCKET,
+ bucket.getIndex())
+ .queryParam(QP_SESSION,
+ encodedSessionName)
+ .queryParam(QP_COUNT, count)
+ .queryParam(QP_DEST, server.getUuid())
+ .request().post(entity);
}
});
}
@@ -552,14 +551,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
* @return the associated 'SenderSessionBucketData' instance
*/
synchronized SenderSessionBucketData getSessionData(PolicySession session) {
- // try to fetch the associated instance
- SenderSessionBucketData rval = sessionData.get(session);
- if (rval == null) {
- // it doesn't exist, so create one
- rval = new SenderSessionBucketData();
- sessionData.put(session, rval);
- }
- return rval;
+ return sessionData.computeIfAbsent(session, key -> new SenderSessionBucketData());
}
/**
@@ -596,6 +588,9 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
* bucket owner fails.
*/
public static class ReceiverBucketData {
+ static final String RESTORE_BUCKET_ERROR =
+ "Persistence.ReceiverBucketData.restoreBucket: ";
+
// maps session name into encoded data
Map<String, ReceiverSessionBucketData> sessionData = new HashMap<>();
@@ -672,8 +667,31 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
// one entry for each Drools session being restored --
// indicates when the restore is complete (restore runs within
// the Drools session thread)
+ List<CountDownLatch> sessionLatches = restoreBucket_droolsSessions();
+
+ // restore lock data
+ restoreBucket_locks(bucket);
+
+ // wait for all of the sessions to update
+ try {
+ for (CountDownLatch sessionLatch : sessionLatches) {
+ if (!sessionLatch.await(10000L, TimeUnit.MILLISECONDS)) {
+ logger.error("{}: timed out waiting for session latch",
+ this);
+ }
+ }
+ } catch (InterruptedException e) {
+ logger.error("Exception in {}", this, e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private List<CountDownLatch> restoreBucket_droolsSessions() {
List<CountDownLatch> sessionLatches = new LinkedList<>();
- for (String sessionName : sessionData.keySet()) {
+ for (Map.Entry<String, ReceiverSessionBucketData> entry : sessionData.entrySet()) {
+ String sessionName = entry.getKey();
+ ReceiverSessionBucketData rsbd = entry.getValue();
+
// [0]="<groupId>" [1]="<artifactId>", [2]="<sessionName>"
String[] nameSegments = sessionName.split(":");
PolicySession policySession = null;
@@ -693,7 +711,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
}
if (policySession == null) {
- logger.error("Persistence.ReceiverBucketData.restoreBucket: "
+ logger.error(RESTORE_BUCKET_ERROR
+ "Can't find PolicySession{}", sessionName);
continue;
}
@@ -701,11 +719,10 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
Object obj = null;
try {
// deserialization needs to use the correct 'ClassLoader'
- ReceiverSessionBucketData rsbd = sessionData.get(sessionName);
obj = Util.deserialize(Base64.getDecoder().decode(rsbd.encodedSerializedData),
policySession.getPolicyContainer().getClassLoader());
} catch (IOException | ClassNotFoundException | IllegalArgumentException e) {
- logger.error("Persistence.ReceiverBucketData.restoreBucket: "
+ logger.error(RESTORE_BUCKET_ERROR
+ "Failed to read data for session '{}'",
sessionName, e);
@@ -714,7 +731,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
}
if (!(obj instanceof Map)) {
- logger.error("Persistence.ReceiverBucketData.restoreBucket: "
+ logger.error(RESTORE_BUCKET_ERROR
+ "Session '{}' data has class {}, expected 'Map'",
sessionName, obj.getClass().getName());
@@ -733,29 +750,26 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
final KieSession kieSession = policySession.getKieSession();
// run the following within the Drools session thread
- kieSession.insert(new DroolsRunnable() {
- /**
- * {@inheritDoc}
- */
- @Override
- public void run() {
- try {
- // insert all of the Drools objects into the session
- for (Object obj : droolsObjects.keySet()) {
- kieSession.insert(obj);
- }
- } finally {
- // signal completion
- sessionLatch.countDown();
+ DroolsRunnable insertDroolsObjects = () -> {
+ try {
+ // insert all of the Drools objects into the session
+ for (Object droolsObj : droolsObjects.keySet()) {
+ kieSession.insert(droolsObj);
}
+ } finally {
+ // signal completion
+ sessionLatch.countDown();
}
- });
+ };
+ kieSession.insert(insertDroolsObjects);
// add this to the set of 'CountDownLatch's we are waiting for
sessionLatches.add(sessionLatch);
}
+ return sessionLatches;
+ }
- // restore lock data
+ private void restoreBucket_locks(Bucket bucket) {
if (lockData != null) {
Object obj = null;
try {
@@ -767,30 +781,17 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
// send out updated date
sendLockDataToBackups(bucket, (GlobalLocks)obj);
} else {
- logger.error("Persistence.ReceiverBucketData.restoreBucket: "
+ logger.error(RESTORE_BUCKET_ERROR
+ "Expected 'GlobalLocks', got '{}'",
obj.getClass().getName());
}
} catch (IOException | ClassNotFoundException | IllegalArgumentException e) {
- logger.error("Persistence.ReceiverBucketData.restoreBucket: "
+ logger.error(RESTORE_BUCKET_ERROR
+ "Failed to read lock data", e);
// skip the lock data
}
}
-
- // wait for all of the sessions to update
- try {
- for (CountDownLatch sessionLatch : sessionLatches) {
- if (!sessionLatch.await(10000L, TimeUnit.MILLISECONDS)) {
- logger.error("{}: timed out waiting for session latch",
- this);
- }
- }
- } catch (InterruptedException e) {
- logger.error("Exception in {}", this, e);
- Thread.currentThread().interrupt();
- }
}
}
@@ -804,10 +805,10 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
@POST
@Path("/persistence/session")
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
- public void receiveSession(@QueryParam("bucket") int bucket,
- @QueryParam("session") String sessionName,
- @QueryParam("count") int count,
- @QueryParam("dest") UUID dest,
+ public void receiveSession(@QueryParam(QP_BUCKET) int bucket,
+ @QueryParam(QP_SESSION) String sessionName,
+ @QueryParam(QP_COUNT) int count,
+ @QueryParam(QP_DEST) UUID dest,
byte[] data) {
logger.debug("/persistence/session: (bucket={},session={},count={}) "
+ "got {} bytes of data",
@@ -829,9 +830,9 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
Entity.entity(new String(data),
MediaType.APPLICATION_OCTET_STREAM_TYPE);
webTarget
- .queryParam("bucket", bucket)
- .queryParam("session", sessionName)
- .queryParam("count", count)
+ .queryParam(QP_BUCKET, bucket)
+ .queryParam(QP_SESSION, sessionName)
+ .queryParam(QP_COUNT, count)
.request().post(entity);
}
}
@@ -843,9 +844,9 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
@POST
@Path("/persistence/lock")
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
- public void receiveLockData(@QueryParam("bucket") int bucket,
- @QueryParam("count") int count,
- @QueryParam("dest") UUID dest,
+ public void receiveLockData(@QueryParam(QP_BUCKET) int bucket,
+ @QueryParam(QP_COUNT) int count,
+ @QueryParam(QP_DEST) UUID dest,
byte[] data) {
logger.debug("/persistence/lock: (bucket={},count={}) "
+ "got {} bytes of data", bucket, count, data.length);
@@ -865,8 +866,8 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
Entity.entity(new String(data),
MediaType.APPLICATION_OCTET_STREAM_TYPE);
webTarget
- .queryParam("bucket", bucket)
- .queryParam("count", count)
+ .queryParam(QP_BUCKET, bucket)
+ .queryParam(QP_COUNT, count)
.request().post(entity);
}
}