summaryrefslogtreecommitdiffstats
path: root/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java')
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java958
1 files changed, 491 insertions, 467 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java
index 7e4b795f..65804082 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java
@@ -101,8 +101,8 @@ public class TargetLock implements Lock, Serializable {
private static ReferenceQueue<TargetLock> abandoned = new ReferenceQueue<>();
// some status codes
- static final int ACCEPTED = 202; //Response.Status.ACCEPTED.getStatusCode();
- static final int NO_CONTENT = 204; //Response.Status.NO_CONTENT.getStatusCode();
+ static final int ACCEPTED = 202; //Response.Status.ACCEPTED.getStatusCode()
+ static final int NO_CONTENT = 204; //Response.Status.NO_CONTENT.getStatusCode()
static final int LOCKED = 423;
// Values extracted from properties
@@ -131,13 +131,21 @@ public class TargetLock implements Lock, Serializable {
// this is used to notify the application when a lock is available,
// or if it is not available
- private LockCallback owner;
+ private volatile LockCallback owner;
// This is what is actually called by the infrastructure to do the owner
// notification. The owner may be running in a Drools session, in which case
// the actual notification should be done within that thread -- the 'context'
// object ensures that it happens this way.
- private LockCallback context;
+ private volatile LockCallback context;
+
+ // HTTP query parameters
+ private static final String QP_KEY = "key";
+ private static final String QP_OWNER = "owner";
+ private static final String QP_UUID = "uuid";
+ private static final String QP_WAIT = "wait";
+ private static final String QP_SERVER = "server";
+ private static final String QP_TTL = "ttl";
/**
* This method triggers registration of 'eventHandler', and also extracts
@@ -221,7 +229,7 @@ public class TargetLock implements Lock, Serializable {
if (session != null) {
// deliver through a 'PolicySessionContext' class
Object lcontext = session.getAdjunct(PolicySessionContext.class);
- if (lcontext == null || !(lcontext instanceof LockCallback)) {
+ if (!(lcontext instanceof LockCallback)) {
context = new PolicySessionContext(session);
session.setAdjunct(PolicySessionContext.class, context);
} else {
@@ -301,11 +309,11 @@ public class TargetLock implements Lock, Serializable {
@Override
public WebTarget webTarget(WebTarget webTarget) {
return webTarget
- .queryParam("key", key)
- .queryParam("owner", ownerKey)
- .queryParam("uuid", identity.uuid.toString())
- .queryParam("wait", waitForLock)
- .queryParam("ttl", timeToLive);
+ .queryParam(QP_KEY, key)
+ .queryParam(QP_OWNER, ownerKey)
+ .queryParam(QP_UUID, identity.uuid.toString())
+ .queryParam(QP_WAIT, waitForLock)
+ .queryParam(QP_TTL, timeToLive);
}
/**
@@ -323,14 +331,13 @@ public class TargetLock implements Lock, Serializable {
* 423 (Locked) - lock in use, and 'waitForLock' is 'false'
*/
switch (response.getStatus()) {
- case NO_CONTENT: {
+ case NO_CONTENT:
// lock successful
setState(State.ACTIVE);
context.lockAvailable(TargetLock.this);
break;
- }
- case LOCKED: {
+ case LOCKED:
// failed -- lock in use, and 'waitForLock == false'
setState(State.FREE);
synchronized (localLocks) {
@@ -340,13 +347,12 @@ public class TargetLock implements Lock, Serializable {
wr.clear();
context.lockUnavailable(TargetLock.this);
break;
- }
case ACCEPTED:
break;
default:
- logger.error("Unknown status: ", response.getStatus());
+ logger.error("Unknown status: {}", response.getStatus());
break;
}
}
@@ -434,6 +440,7 @@ public class TargetLock implements Lock, Serializable {
*/
@Override
public void extend(int holdSec, LockCallback callback) {
+ // not implemented yet
}
/********************/
@@ -473,7 +480,8 @@ public class TargetLock implements Lock, Serializable {
if (!Bucket.isKeyOnThisServer(key)) {
// this is the wrong server -- forward to the correct one
// (we can use this thread)
- if ((ttl -= 1) > 0) {
+ ttl -= 1;
+ if (ttl > 0) {
Server server = Bucket.bucketToServer(Bucket.bucketNumber(key));
if (server != null) {
WebTarget webTarget = server.getWebTarget("lock/lock");
@@ -483,11 +491,11 @@ public class TargetLock implements Lock, Serializable {
server.getUuid(), key, ownerKey, uuid,
waitForLock, ttl);
return webTarget
- .queryParam("key", key)
- .queryParam("owner", ownerKey)
- .queryParam("uuid", uuid.toString())
- .queryParam("wait", waitForLock)
- .queryParam("ttl", String.valueOf(ttl))
+ .queryParam(QP_KEY, key)
+ .queryParam(QP_OWNER, ownerKey)
+ .queryParam(QP_UUID, uuid.toString())
+ .queryParam(QP_WAIT, waitForLock)
+ .queryParam(QP_TTL, String.valueOf(ttl))
.request().get();
}
}
@@ -527,7 +535,8 @@ public class TargetLock implements Lock, Serializable {
if (!Bucket.isKeyOnThisServer(key)) {
// this is the wrong server -- forward to the correct one
// (we can use this thread)
- if ((ttl -= 1) > 0) {
+ ttl -= 1;
+ if (ttl > 0) {
Server server = Bucket.bucketToServer(Bucket.bucketNumber(key));
if (server != null) {
WebTarget webTarget = server.getWebTarget("lock/free");
@@ -536,10 +545,10 @@ public class TargetLock implements Lock, Serializable {
+ "(key={},owner={},uuid={},ttl={})",
server.getUuid(), key, ownerKey, uuid, ttl);
return webTarget
- .queryParam("key", key)
- .queryParam("owner", ownerKey)
- .queryParam("uuid", uuid.toString())
- .queryParam("ttl", String.valueOf(ttl))
+ .queryParam(QP_KEY, key)
+ .queryParam(QP_OWNER, ownerKey)
+ .queryParam(QP_UUID, uuid.toString())
+ .queryParam(QP_TTL, String.valueOf(ttl))
.request().get();
}
}
@@ -575,7 +584,8 @@ public class TargetLock implements Lock, Serializable {
if (!Bucket.isKeyOnThisServer(ownerKey)) {
// this is the wrong server -- forward to the correct one
// (we can use this thread)
- if ((ttl -= 1) > 0) {
+ ttl -= 1;
+ if (ttl > 0) {
Server server = Bucket.bucketToServer(Bucket.bucketNumber(key));
if (server != null) {
WebTarget webTarget = server.getWebTarget("lock/locked");
@@ -584,10 +594,10 @@ public class TargetLock implements Lock, Serializable {
+ "(key={},owner={},uuid={},ttl={})",
server.getUuid(), key, ownerKey, uuid, ttl);
return webTarget
- .queryParam("key", key)
- .queryParam("owner", ownerKey)
- .queryParam("uuid", uuid.toString())
- .queryParam("ttl", String.valueOf(ttl))
+ .queryParam(QP_KEY, key)
+ .queryParam(QP_OWNER, ownerKey)
+ .queryParam(QP_UUID, uuid.toString())
+ .queryParam(QP_TTL, String.valueOf(ttl))
.request().get();
}
}
@@ -744,6 +754,7 @@ public class TargetLock implements Lock, Serializable {
*/
@Override
public void shutdown() {
+ // nothing needs to be done
}
/**
@@ -887,10 +898,10 @@ public class TargetLock implements Lock, Serializable {
@Override
public WebTarget webTarget(WebTarget webTarget) {
return webTarget
- .queryParam("key", key)
- .queryParam("owner", ownerKey)
- .queryParam("uuid", uuid.toString())
- .queryParam("ttl", timeToLive);
+ .queryParam(QP_KEY, key)
+ .queryParam(QP_OWNER, ownerKey)
+ .queryParam(QP_UUID, uuid.toString())
+ .queryParam(QP_TTL, timeToLive);
}
@Override
@@ -898,21 +909,19 @@ public class TargetLock implements Lock, Serializable {
logger.info("Free response={} (code={})",
response, response.getStatus());
switch (response.getStatus()) {
- case NO_CONTENT: {
+ case NO_CONTENT:
// free successful -- don't need to do anything
break;
- }
- case LOCKED: {
+ case LOCKED:
// free failed
logger.error("TargetLock free failed, "
+ "key={}, owner={}, uuid={}",
key, ownerKey, uuid);
break;
- }
default:
- logger.error("Unknown status: ", response.getStatus());
+ logger.error("Unknown status: {}", response.getStatus());
break;
}
}
@@ -986,12 +995,10 @@ public class TargetLock implements Lock, Serializable {
public void lockAvailable(final Lock lock) {
// Run 'owner.lockAvailable' within the Drools session
if (policySession != null) {
- policySession.getKieSession().insert(new DroolsRunnable() {
- @Override
- public void run() {
- ((TargetLock)lock).owner.lockAvailable(lock);
- }
- });
+ DroolsRunnable callback = () -> {
+ ((TargetLock)lock).owner.lockAvailable(lock);
+ };
+ policySession.getKieSession().insert(callback);
}
}
@@ -1002,12 +1009,10 @@ public class TargetLock implements Lock, Serializable {
public void lockUnavailable(Lock lock) {
// Run 'owner.unlockAvailable' within the Drools session
if (policySession != null) {
- policySession.getKieSession().insert(new DroolsRunnable() {
- @Override
- public void run() {
- ((TargetLock)lock).owner.lockUnavailable(lock);
- }
- });
+ DroolsRunnable callback = () -> {
+ ((TargetLock)lock).owner.lockUnavailable(lock);
+ };
+ policySession.getKieSession().insert(callback);
}
}
@@ -1218,16 +1223,16 @@ public class TargetLock implements Lock, Serializable {
*/
private static class LockEntry implements Serializable {
// string key identifying the lock
- String key;
+ private String key;
// string key identifying the owner
- String currentOwnerKey;
+ private String currentOwnerKey;
// UUID identifying the original 'TargetLock
- UUID currentOwnerUuid;
+ private UUID currentOwnerUuid;
// list of pending lock requests for this key
- Queue<Waiting> waitingList = new LinkedList<>();
+ private Queue<Waiting> waitingList = new LinkedList<>();
/**
* Constructor - initialize the 'LockEntry'.
@@ -1273,27 +1278,19 @@ public class TargetLock implements Lock, Serializable {
@Override
public WebTarget webTarget(WebTarget webTarget) {
return webTarget
- .queryParam("key", key)
- .queryParam("owner", currentOwnerKey)
- .queryParam("uuid", currentOwnerUuid.toString())
- .queryParam("ttl", timeToLive);
+ .queryParam(QP_KEY, key)
+ .queryParam(QP_OWNER, currentOwnerKey)
+ .queryParam(QP_UUID, currentOwnerUuid.toString())
+ .queryParam(QP_TTL, timeToLive);
}
@Override
public void response(Response response) {
logger.info("Locked response={} (code={})",
response, response.getStatus());
- switch (response.getStatus()) {
- case NO_CONTENT: {
- // successful -- we are done
- break;
- }
-
- default: {
- // notification failed -- free this one
- globalLocks.unlock(key, currentOwnerUuid);
- break;
- }
+ if (response.getStatus() != NO_CONTENT) {
+ // notification failed -- free this one
+ globalLocks.unlock(key, currentOwnerUuid);
}
}
});
@@ -1409,7 +1406,6 @@ public class TargetLock implements Lock, Serializable {
while (abandonedHandler != null) {
try {
Reference<? extends TargetLock> wr = abandoned.remove();
- TargetLock notify = null;
// At this point, we know that 'ref' is a
// 'WeakReference<TargetLock>' instance that has been abandoned,
@@ -1515,7 +1511,8 @@ public class TargetLock implements Lock, Serializable {
*/
static byte[] dumpLocksData(UUID serverUuid, int ttl) throws IOException {
if (!Server.getThisServer().getUuid().equals(serverUuid)) {
- if ((ttl -= 1) > 0) {
+ ttl -= 1;
+ if (ttl > 0) {
Server server = Server.getServer(serverUuid);
if (server != null) {
WebTarget webTarget =
@@ -1524,8 +1521,8 @@ public class TargetLock implements Lock, Serializable {
logger.info("Forwarding 'lock/dumpLocksData' to uuid {}",
serverUuid);
return webTarget
- .queryParam("server", serverUuid.toString())
- .queryParam("ttl", String.valueOf(ttl))
+ .queryParam(QP_SERVER, serverUuid.toString())
+ .queryParam(QP_TTL, String.valueOf(ttl))
.request().get(byte[].class);
}
}
@@ -1571,8 +1568,8 @@ public class TargetLock implements Lock, Serializable {
@Override
public WebTarget webTarget(WebTarget webTarget) {
return webTarget
- .queryParam("server", server.getUuid().toString())
- .queryParam("ttl", timeToLive);
+ .queryParam(QP_SERVER, server.getUuid().toString())
+ .queryParam(QP_TTL, timeToLive);
}
@Override
@@ -1621,128 +1618,144 @@ public class TargetLock implements Lock, Serializable {
// process the client-end data
for (ClientData clientData : hostData.clientDataList) {
- // 'true' if the bucket associated with this 'ClientData'
- // doesn't belong to the remote server, as far as we can tell
- boolean serverMismatch =
- Bucket.bucketToServer(clientData.bucketNumber) != server;
-
- // each 'ClientDataRecord' instance corresponds to an
- // active 'Identity' (TargetLock) instance
- for (ClientDataRecord cdr : clientData.clientDataRecords) {
- // update maximum 'key' and 'ownerKey' lengths
- updateKeyLength(cdr.identity.key);
- updateOwnerKeyLength(cdr.identity.ownerKey);
-
- // fetch UUID
- UUID uuid = cdr.identity.uuid;
-
- // fetch/generate 'MergeData' instance for this UUID
- MergedData md = mergedDataMap.get(uuid);
- if (md == null) {
- md = new MergedData(uuid);
- mergedDataMap.put(uuid, md);
- }
-
- // update 'MergedData.clientDataRecord'
- if (md.clientDataRecord == null) {
- md.clientDataRecord = cdr;
- } else {
- md.comment("Duplicate client entry for UUID");
- }
-
- if (serverMismatch) {
- // need to generate an additional error
- md.comment(server.toString()
- + "(client) does not own bucket "
- + clientData.bucketNumber);
- }
- }
+ populateLockData_clientData(clientData, server);
}
// process the server-end data
for (ServerData serverData : hostData.serverDataList) {
- // 'true' if the bucket associated with this 'ServerData'
- // doesn't belong to the remote server, as far as we can tell
- boolean serverMismatch =
- Bucket.bucketToServer(serverData.bucketNumber) != server;
-
- // each 'LockEntry' instance corresponds to the current holder
- // of a lock, and all requestors waiting for it to be freed
- for (LockEntry le : serverData.globalLocks.keyToEntry.values()) {
- // update maximum 'key' and 'ownerKey' lengths
- updateKeyLength(le.key);
- updateOwnerKeyLength(le.currentOwnerKey);
-
- // fetch uuid
- UUID uuid = le.currentOwnerUuid;
-
- // fetch/generate 'MergeData' instance for this UUID
- MergedData md = mergedDataMap.get(uuid);
- if (md == null) {
- md = new MergedData(uuid);
- mergedDataMap.put(uuid, md);
- }
+ populateLockData_serverData(serverData, server);
+ }
+ } else {
+ logger.error("TargetLock.DumpLocks.populateLockData: "
+ + "received data has class {}",
+ decodedData.getClass().getName());
+ }
+ }
- // update 'lockEntries' table entry
- if (lockEntries.get(le.key) != null) {
- md.comment("Duplicate server entry for key " + le.key);
- } else {
- lockEntries.put(le.key, le);
- }
+ private void populateLockData_clientData(ClientData clientData, Server server) {
+ // 'true' if the bucket associated with this 'ClientData'
+ // doesn't belong to the remote server, as far as we can tell
+ boolean serverMismatch =
+ Bucket.bucketToServer(clientData.bucketNumber) != server;
- // update 'MergedData.serverLockEntry'
- // (leave 'MergedData.serverWaiting' as 'null', because
- // this field is only used for waiting entries)
- if (md.serverLockEntry == null) {
- md.serverLockEntry = le;
- } else {
- md.comment("Duplicate server entry for UUID");
- }
+ // each 'ClientDataRecord' instance corresponds to an
+ // active 'Identity' (TargetLock) instance
+ for (ClientDataRecord cdr : clientData.clientDataRecords) {
+ // update maximum 'key' and 'ownerKey' lengths
+ updateKeyLength(cdr.identity.key);
+ updateOwnerKeyLength(cdr.identity.ownerKey);
- if (serverMismatch) {
- // need to generate an additional error
- md.comment(server.toString()
- + "(server) does not own bucket "
- + serverData.bucketNumber);
- }
+ // fetch UUID
+ UUID uuid = cdr.identity.uuid;
- // we need 'MergeData' entries for all waiting requests
- for (Waiting waiting : le.waitingList) {
- // update maximum 'ownerKey' length
- updateOwnerKeyLength(waiting.ownerKey);
+ // fetch/generate 'MergeData' instance for this UUID
+ MergedData md = mergedDataMap.get(uuid);
+ if (md == null) {
+ md = new MergedData(uuid);
+ mergedDataMap.put(uuid, md);
+ }
- // fetch uuid
- uuid = waiting.ownerUuid;
+ // update 'MergedData.clientDataRecord'
+ if (md.clientDataRecord == null) {
+ md.clientDataRecord = cdr;
+ } else {
+ md.comment("Duplicate client entry for UUID");
+ }
- // fetch/generate 'MergeData' instance for this UUID
- md = mergedDataMap.get(uuid);
- if (md == null) {
- md = new MergedData(uuid);
- mergedDataMap.put(uuid, md);
- }
+ if (serverMismatch) {
+ // need to generate an additional error
+ md.comment(server.toString()
+ + "(client) does not own bucket "
+ + clientData.bucketNumber);
+ }
+ }
+ }
- // update 'MergedData.serverLockEntry' and
- // 'MergedData.serverWaiting'
- if (md.serverLockEntry == null) {
- md.serverLockEntry = le;
- md.serverWaiting = waiting;
- } else {
- md.comment("Duplicate server entry for UUID");
- }
+ private void populateLockData_serverData(ServerData serverData, Server server) {
+ // 'true' if the bucket associated with this 'ServerData'
+ // doesn't belong to the remote server, as far as we can tell
+ boolean serverMismatch =
+ Bucket.bucketToServer(serverData.bucketNumber) != server;
- if (serverMismatch) {
- // need to generate an additional error
- md.comment(server.toString()
- + "(server) does not own bucket "
- + serverData.bucketNumber);
- }
- }
- }
+ // each 'LockEntry' instance corresponds to the current holder
+ // of a lock, and all requestors waiting for it to be freed
+ for (LockEntry le : serverData.globalLocks.keyToEntry.values()) {
+ // update maximum 'key' and 'ownerKey' lengths
+ updateKeyLength(le.key);
+ updateOwnerKeyLength(le.currentOwnerKey);
+
+ // fetch uuid
+ UUID uuid = le.currentOwnerUuid;
+
+ // fetch/generate 'MergeData' instance for this UUID
+ MergedData md = mergedDataMap.get(uuid);
+ if (md == null) {
+ md = new MergedData(uuid);
+ mergedDataMap.put(uuid, md);
+ }
+
+ // update 'lockEntries' table entry
+ if (lockEntries.get(le.key) != null) {
+ md.comment("Duplicate server entry for key " + le.key);
+ } else {
+ lockEntries.put(le.key, le);
+ }
+
+ // update 'MergedData.serverLockEntry'
+ // (leave 'MergedData.serverWaiting' as 'null', because
+ // this field is only used for waiting entries)
+ if (md.serverLockEntry == null) {
+ md.serverLockEntry = le;
+ } else {
+ md.comment("Duplicate server entry for UUID");
+ }
+
+ if (serverMismatch) {
+ // need to generate an additional error
+ md.comment(server.toString()
+ + "(server) does not own bucket "
+ + serverData.bucketNumber);
+ }
+
+ // we need 'MergeData' entries for all waiting requests
+ for (Waiting waiting : le.waitingList) {
+ populateLockData_serverData_waiting(
+ serverData, server, serverMismatch, le, waiting);
}
+ }
+ }
+
+ private void populateLockData_serverData_waiting(
+ ServerData serverData, Server server, boolean serverMismatch,
+ LockEntry le, Waiting waiting) {
+
+ // update maximum 'ownerKey' length
+ updateOwnerKeyLength(waiting.ownerKey);
+
+ // fetch uuid
+ UUID uuid = waiting.ownerUuid;
+
+ // fetch/generate 'MergeData' instance for this UUID
+ MergedData md = mergedDataMap.get(uuid);
+ if (md == null) {
+ md = new MergedData(uuid);
+ mergedDataMap.put(uuid, md);
+ }
+
+ // update 'MergedData.serverLockEntry' and
+ // 'MergedData.serverWaiting'
+ if (md.serverLockEntry == null) {
+ md.serverLockEntry = le;
+ md.serverWaiting = waiting;
} else {
- logger.error("TargetLock.DumpLocks.populateLockData: "
- + "received data has class "
- + decodedData.getClass().getName());
+ md.comment("Duplicate server entry for UUID");
+ }
+
+ if (serverMismatch) {
+ // need to generate an additional error
+ md.comment(server.toString()
+ + "(server) does not own bucket "
+ + serverData.bucketNumber);
}
}
@@ -1801,6 +1814,11 @@ public class TargetLock implements Lock, Serializable {
out.printf(format, "---", "---------", "----", "-----", "--------");
}
+ dump_serverTable(out);
+ dump_clientOnlyEntries(out);
+ }
+
+ private void dump_serverTable(PrintStream out) {
// iterate over the server table
for (LockEntry le : lockEntries.values()) {
// fetch merged data
@@ -1841,7 +1859,9 @@ public class TargetLock implements Lock, Serializable {
dumpMoreComments(out, md);
}
}
+ }
+ private void dump_clientOnlyEntries(PrintStream out) {
// client records that don't have matching server entries
for (MergedData md : clientOnlyEntries.values()) {
ClientDataRecord cdr = md.clientDataRecord;
@@ -2017,13 +2037,13 @@ public class TargetLock implements Lock, Serializable {
*/
static class HostData implements Serializable {
// the UUID of the host sending the data
- UUID hostUuid;
+ private UUID hostUuid;
// all of the information derived from the 'LocalLocks' data
- List<ClientData> clientDataList;
+ private List<ClientData> clientDataList;
// all of the information derived from the 'GlobalLocks' data
- List<ServerData> serverDataList;
+ private List<ServerData> serverDataList;
/**
* Constructor - this goes through all of the lock tables,
@@ -2086,10 +2106,10 @@ public class TargetLock implements Lock, Serializable {
*/
static class ClientData implements Serializable {
// number of the bucket
- int bucketNumber;
+ private int bucketNumber;
// all of the client locks within this bucket
- List<ClientDataRecord> clientDataRecords;
+ private List<ClientDataRecord> clientDataRecords;
/**
* Constructor - initially, there are no 'clientDataRecords'.
@@ -2108,11 +2128,11 @@ public class TargetLock implements Lock, Serializable {
*/
static class ClientDataRecord implements Serializable {
// contains key, ownerKey, uuid
- Identity identity;
+ private Identity identity;
// state field of 'TargetLock'
// (may be 'null' if there is no 'TargetLock')
- State state;
+ private State state;
/**
* Constructor - initialize the fields.
@@ -2132,10 +2152,10 @@ public class TargetLock implements Lock, Serializable {
*/
static class ServerData implements Serializable {
// number of the bucket
- int bucketNumber;
+ private int bucketNumber;
// server-side data associated with a single bucket
- GlobalLocks globalLocks;
+ private GlobalLocks globalLocks;
/**
* Constructor - initialize the fields.
@@ -2158,15 +2178,15 @@ public class TargetLock implements Lock, Serializable {
*/
static class AuditData implements Serializable {
// sending UUID
- UUID hostUuid;
+ private UUID hostUuid;
// client records that currently exist, or records to be cleared
// (depending upon message) -- client/server is from the senders side
- List<Identity> clientData;
+ private List<Identity> clientData;
// server records that currently exist, or records to be cleared
// (depending upon message) -- client/server is from the senders side
- List<Identity> serverData;
+ private List<Identity> serverData;
/**
* Constructor - set 'hostUuid' to the current host, and start with
@@ -2174,8 +2194,8 @@ public class TargetLock implements Lock, Serializable {
*/
AuditData() {
hostUuid = Server.getThisServer().getUuid();
- clientData = new ArrayList<Identity>();
- serverData = new ArrayList<Identity>();
+ clientData = new ArrayList<>();
+ serverData = new ArrayList<>();
}
/**
@@ -2191,8 +2211,17 @@ public class TargetLock implements Lock, Serializable {
AuditData response = new AuditData();
// compare remote servers client data with our server data
+ generateResponse_clientEnd(response, includeWarnings);
+
+ // test server data
+ generateResponse_serverEnd(response, includeWarnings);
+
+ return response;
+ }
+
+ private void generateResponse_clientEnd(AuditData response, boolean includeWarnings) {
for (Identity identity : clientData) {
- // we are the server in this case
+ // remote end is the client, and we are the server
Bucket bucket = Bucket.getBucket(identity.key);
GlobalLocks globalLocks =
bucket.getAdjunctDontCreate(GlobalLocks.class);
@@ -2240,10 +2269,11 @@ public class TargetLock implements Lock, Serializable {
// it was 'clientData' to the sender, but 'serverData' to us
response.serverData.add(identity);
}
+ }
- // test server data
+ private void generateResponse_serverEnd(AuditData response, boolean includeWarnings) {
for (Identity identity : serverData) {
- // we are the client in this case
+ // remote end is the server, and we are the client
Bucket bucket = Bucket.getBucket(identity.ownerKey);
LocalLocks localLocks =
bucket.getAdjunctDontCreate(LocalLocks.class);
@@ -2275,8 +2305,6 @@ public class TargetLock implements Lock, Serializable {
}
response.clientData.add(identity);
}
-
- return response;
}
/**
@@ -2400,19 +2428,14 @@ public class TargetLock implements Lock, Serializable {
* Run a single audit cycle.
*/
static void runAudit() {
- try {
- logger.info("Starting TargetLock audit");
- Audit audit = new Audit();
+ logger.info("Starting TargetLock audit");
+ Audit audit = new Audit();
- // populate 'auditMap' table
- audit.build();
+ // populate 'auditMap' table
+ audit.build();
- // send to all of the servers in 'auditMap' (may include this server)
- audit.send();
- } catch (InterruptedException e) {
- logger.error("TargetLock audit interrupted", e);
- Thread.currentThread().interrupt();
- }
+ // send to all of the servers in 'auditMap' (may include this server)
+ audit.send();
}
/**
@@ -2441,63 +2464,59 @@ public class TargetLock implements Lock, Serializable {
// this needs to run in the 'MainLoop' thread, because it is dependent
// upon the list of servers, and our position in this list
- MainLoop.queueWork(new Runnable() {
- /**
- * {@inheritDoc}
- */
- @Override
- public void run() {
- // current list of servers
- Collection<Server> servers = Server.getServers();
+ MainLoop.queueWork(() -> {
+ // this runs in the 'MainLoop' thread
- // count of the number of servers
- int count = servers.size();
+ // current list of servers
+ Collection<Server> servers = Server.getServers();
- // will contain our position in this list
- int index = 0;
+ // count of the number of servers
+ int count = servers.size();
- // current server
- Server thisServer = Server.getThisServer();
+ // will contain our position in this list
+ int index = 0;
- for (Server server : servers) {
- if (server == thisServer) {
- break;
- }
- index += 1;
+ // current server
+ Server thisServer = Server.getThisServer();
+
+ for (Server server : servers) {
+ if (server == thisServer) {
+ break;
}
+ index += 1;
+ }
- // if index == count, we didn't find this server
- // (which shouldn't happen)
-
- if (index < count) {
- // The servers are ordered by UUID, and 'index' is this
- // server's position within the list. Suppose the period is
- // 60000 (60 seconds), and there are 5 servers -- the first one
- // will run the audit at 0 seconds after the minute, the next
- // at 12 seconds after the minute, then 24, 36, 48.
- long offset = (period * index) / count;
-
- // the earliest time we want the audit to run
- long time = System.currentTimeMillis() + gracePeriod;
- long startTime = time - (time % period) + offset;
- if (startTime <= time) {
- startTime += period;
+ // if index == count, we didn't find this server
+ // (which shouldn't happen)
+
+ if (index < count) {
+ // The servers are ordered by UUID, and 'index' is this
+ // server's position within the list. Suppose the period is
+ // 60000 (60 seconds), and there are 5 servers -- the first one
+ // will run the audit at 0 seconds after the minute, the next
+ // at 12 seconds after the minute, then 24, 36, 48.
+ long offset = (period * index) / count;
+
+ // the earliest time we want the audit to run
+ long time = System.currentTimeMillis() + gracePeriod;
+ long startTime = time - (time % period) + offset;
+ if (startTime <= time) {
+ startTime += period;
+ }
+ synchronized (Audit.class) {
+ if (timerTask != null) {
+ timerTask.cancel();
}
- synchronized (Audit.class) {
- if (timerTask != null) {
- timerTask.cancel();
+ timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ runAudit();
}
- timerTask = new TimerTask() {
- @Override
- public void run() {
- runAudit();
- }
- };
+ };
- // now, schedule the timer
- Util.timer.scheduleAtFixedRate(
- timerTask, new Date(startTime), period);
- }
+ // now, schedule the timer
+ Util.timer.scheduleAtFixedRate(
+ timerTask, new Date(startTime), period);
}
}
});
@@ -2514,7 +2533,8 @@ public class TargetLock implements Lock, Serializable {
*/
static byte[] incomingAudit(UUID serverUuid, int ttl, byte[] encodedData) {
if (!Server.getThisServer().getUuid().equals(serverUuid)) {
- if ((ttl -= 1) > 0) {
+ ttl -= 1;
+ if (ttl > 0) {
Server server = Server.getServer(serverUuid);
if (server != null) {
WebTarget webTarget = server.getWebTarget("lock/audit");
@@ -2525,8 +2545,8 @@ public class TargetLock implements Lock, Serializable {
Entity.entity(new String(encodedData),
MediaType.APPLICATION_OCTET_STREAM_TYPE);
return webTarget
- .queryParam("server", serverUuid.toString())
- .queryParam("ttl", String.valueOf(ttl))
+ .queryParam(QP_SERVER, serverUuid.toString())
+ .queryParam(QP_TTL, String.valueOf(ttl))
.request().post(entity, byte[].class);
}
}
@@ -2556,53 +2576,63 @@ public class TargetLock implements Lock, Serializable {
Bucket bucket = Bucket.getBucket(i);
// client data
- LocalLocks localLocks =
- bucket.getAdjunctDontCreate(LocalLocks.class);
- if (localLocks != null) {
- synchronized (localLocks) {
- // we have client data for this bucket
- for (Identity identity :
- localLocks.weakReferenceToIdentity.values()) {
- // find or create the 'AuditData' instance associated
- // with the server owning the 'key'
- AuditData auditData = getAuditData(identity.key);
- if (auditData != null) {
- auditData.clientData.add(identity);
- }
+ build_clientData(bucket);
+
+ // server data
+ build_serverData(bucket);
+ }
+ }
+
+ private void build_clientData(Bucket bucket) {
+ // client data
+ LocalLocks localLocks =
+ bucket.getAdjunctDontCreate(LocalLocks.class);
+ if (localLocks != null) {
+ synchronized (localLocks) {
+ // we have client data for this bucket
+ for (Identity identity :
+ localLocks.weakReferenceToIdentity.values()) {
+ // find or create the 'AuditData' instance associated
+ // with the server owning the 'key'
+ AuditData auditData = getAuditData(identity.key);
+ if (auditData != null) {
+ auditData.clientData.add(identity);
}
}
}
+ }
+ }
- // server data
- GlobalLocks globalLocks =
- bucket.getAdjunctDontCreate(GlobalLocks.class);
- if (globalLocks != null) {
- // we have server data for this bucket
- Map<String, LockEntry> keyToEntry = globalLocks.keyToEntry;
- synchronized (keyToEntry) {
- for (LockEntry le : keyToEntry.values()) {
+ private void build_serverData(Bucket bucket) {
+ // server data
+ GlobalLocks globalLocks =
+ bucket.getAdjunctDontCreate(GlobalLocks.class);
+ if (globalLocks != null) {
+ // we have server data for this bucket
+ Map<String, LockEntry> keyToEntry = globalLocks.keyToEntry;
+ synchronized (keyToEntry) {
+ for (LockEntry le : keyToEntry.values()) {
+ // find or create the 'AuditData' instance associated
+ // with the current 'ownerKey'
+ AuditData auditData = getAuditData(le.currentOwnerKey);
+ if (auditData != null) {
+ // create an 'Identity' entry, and add it to
+ // the list associated with the remote server
+ auditData.serverData.add(
+ new Identity(le.key, le.currentOwnerKey,
+ le.currentOwnerUuid));
+ }
+
+ for (Waiting waiting : le.waitingList) {
// find or create the 'AuditData' instance associated
- // with the current 'ownerKey'
- AuditData auditData = getAuditData(le.currentOwnerKey);
+ // with the waiting entry 'ownerKey'
+ auditData = getAuditData(waiting.ownerKey);
if (auditData != null) {
// create an 'Identity' entry, and add it to
// the list associated with the remote server
auditData.serverData.add(
- new Identity(le.key, le.currentOwnerKey,
- le.currentOwnerUuid));
- }
-
- for (Waiting waiting : le.waitingList) {
- // find or create the 'AuditData' instance associated
- // with the waiting entry 'ownerKey'
- auditData = getAuditData(waiting.ownerKey);
- if (auditData != null) {
- // create an 'Identity' entry, and add it to
- // the list associated with the remote server
- auditData.serverData.add(
- new Identity(le.key, waiting.ownerKey,
- waiting.ownerUuid));
- }
+ new Identity(le.key, waiting.ownerKey,
+ waiting.ownerUuid));
}
}
}
@@ -2618,12 +2648,8 @@ public class TargetLock implements Lock, Serializable {
// map 'key -> bucket number', and then 'bucket number' -> 'server'
Server server = Bucket.bucketToServer(Bucket.bucketNumber(key));
if (server != null) {
- AuditData auditData = auditMap.get(server);
- if (auditData == null) {
- // doesn't exist yet -- create it
- auditData = new AuditData();
- auditMap.put(server, auditData);
- }
+ AuditData auditData =
+ auditMap.computeIfAbsent(server, sk -> new AuditData());
return auditData;
}
@@ -2635,7 +2661,7 @@ public class TargetLock implements Lock, Serializable {
* Using the collected 'auditMap', send out the messages to all of the
* servers.
*/
- void send() throws InterruptedException {
+ void send() {
if (auditMap.isEmpty()) {
logger.info("TargetLock audit: no locks on this server");
} else {
@@ -2644,178 +2670,176 @@ public class TargetLock implements Lock, Serializable {
}
for (final Server server : auditMap.keySet()) {
- // fetch audit data
- AuditData auditData = auditMap.get(server);
+ send_server(server);
+ }
+ }
- if (server == Server.getThisServer()) {
- // process this locally
- final AuditData respData = auditData.generateResponse(true);
- if (respData.clientData.isEmpty()
- && respData.serverData.isEmpty()) {
- // no mismatches
- logger.info("TargetLock.Audit.send: "
- + "no errors from self ({})", server);
- continue;
+ private void send_server(final Server server) {
+ // fetch audit data
+ AuditData auditData = auditMap.get(server);
+
+ if (server == Server.getThisServer()) {
+ // process this locally
+ final AuditData respData = auditData.generateResponse(true);
+ if (respData.clientData.isEmpty()
+ && respData.serverData.isEmpty()) {
+ // no mismatches
+ logger.info("TargetLock.Audit.send: "
+ + "no errors from self ({})", server);
+ return;
+ }
+
+ // do the rest in a separate thread
+ server.getThreadPool().execute(() -> {
+ // wait a few seconds, and see if we still know of these
+ // errors
+ if (AuditPostResponse.responseSupport(
+ respData, "self (" + server + ")",
+ "TargetLock.Audit.send")) {
+ // a return falue of 'true' either indicates the
+ // mismatches were resolved after a retry, or we
+ // received an interrupt, and need to abort
+ return;
}
- // do the rest in a separate thread
- server.getThreadPool().execute(new Runnable() {
- @Override
- public void run() {
- // wait a few seconds, and see if we still know of these
- // errors
- logger.info("TargetLock.Audit.send: "
- + "mismatches from self ({})", server);
- try {
- Thread.sleep(auditRetryDelay);
- } catch (InterruptedException e) {
- logger.error("TargetLock.Audit.send: Interrupted "
- + "handling audit response from self ({})",
- server);
- // just abort
- Thread.currentThread().interrupt();
- return;
- }
+ // any mismatches left in 'respData' are still issues
+ respData.processResponse(server);
+ });
+ return;
+ }
- // This will check against our own data -- any mismatches
- // mean that things have changed since we sent out the
- // first message. We will remove any mismatches from
- // 'respData', and see if there are any left.
- AuditData mismatches = respData.generateResponse(false);
-
- respData.serverData.removeAll(mismatches.clientData);
- respData.clientData.removeAll(mismatches.serverData);
-
- if (respData.clientData.isEmpty()
- && respData.serverData.isEmpty()) {
- // no mismatches --
- // there must have been transient issues on our side
- logger.info("TargetLock.Audit.send: "
- + "no mismatches from self "
- + "({}) after retry", server);
- return;
- }
+ // serialize
+ byte[] encodedData = auditData.encode();
+ if (encodedData == null) {
+ // error has already been displayed
+ return;
+ }
- // any mismatches left in 'respData' are still issues
- respData.processResponse(server);
- }
- });
- continue;
- }
+ // generate entity
+ Entity<String> entity =
+ Entity.entity(new String(encodedData),
+ MediaType.APPLICATION_OCTET_STREAM_TYPE);
- // serialize
- byte[] encodedData = auditData.encode();
- if (encodedData == null) {
- // error has already been displayed
- continue;
- }
+ server.post("lock/audit", entity, new AuditPostResponse(server));
+ }
+ }
- // generate entity
- Entity<String> entity =
- Entity.entity(new String(encodedData),
- MediaType.APPLICATION_OCTET_STREAM_TYPE);
+ static class AuditPostResponse implements Server.PostResponse {
+ private Server server;
- server.post("lock/audit", entity, new Server.PostResponse() {
- @Override
- public WebTarget webTarget(WebTarget webTarget) {
- // include the 'uuid' keyword
- return webTarget
- .queryParam("server", server.getUuid().toString())
- .queryParam("ttl", timeToLive);
- }
+ AuditPostResponse(Server server) {
+ this.server = server;
+ }
- @Override
- public void response(Response response) {
- // process the response here
- AuditData respData =
- AuditData.decode(response.readEntity(byte[].class));
- if (respData == null) {
- logger.error("TargetLock.Audit.send: "
- + "couldn't process response from {}",
- server);
- return;
- }
+ @Override
+ public WebTarget webTarget(WebTarget webTarget) {
+ // include the 'uuid' keyword
+ return webTarget
+ .queryParam(QP_SERVER, server.getUuid().toString())
+ .queryParam(QP_TTL, timeToLive);
+ }
- // if we reach this point, we got a response
- if (respData.clientData.isEmpty()
- && respData.serverData.isEmpty()) {
- // no mismatches
- logger.info("TargetLock.Audit.send: "
- + "no errors from {}", server);
- return;
- }
+ @Override
+ public void response(Response response) {
+ // process the response here
+ AuditData respData =
+ AuditData.decode(response.readEntity(byte[].class));
+ if (respData == null) {
+ logger.error("TargetLock.Audit.send: "
+ + "couldn't process response from {}",
+ server);
+ return;
+ }
- // wait a few seconds, and see if we still know of these
- // errors
- logger.info("TargetLock.Audit.send: mismatches from {}",
- server);
- try {
- Thread.sleep(auditRetryDelay);
- } catch (InterruptedException e) {
- logger.error("TargetLock.Audit.send: Interrupted "
- + "handling audit response from {}",
- server);
- // just abort
- Thread.currentThread().interrupt();
- return;
- }
+ // if we reach this point, we got a response
+ if (respData.clientData.isEmpty()
+ && respData.serverData.isEmpty()) {
+ // no mismatches
+ logger.info("TargetLock.Audit.send: "
+ + "no errors from {}", server);
+ return;
+ }
- // This will check against our own data -- any mismatches
- // mean that things have changed since we sent out the
- // first message. We will remove any mismatches from
- // 'respData', and see if there are any left.
- AuditData mismatches = respData.generateResponse(false);
-
- respData.serverData.removeAll(mismatches.clientData);
- respData.clientData.removeAll(mismatches.serverData);
-
- if (respData.clientData.isEmpty()
- && respData.serverData.isEmpty()) {
- // no mismatches --
- // there must have been transient issues on our side
- logger.info("TargetLock.Audit.send: no mismatches from "
- + "{} after retry", server);
- return;
- }
+ // wait a few seconds, and see if we still know of these
+ // errors
+ if (responseSupport(respData, server, "AuditPostResponse.response")) {
+ // a return falue of 'true' either indicates the mismatches
+ // were resolved after a retry, or we received an interrupt,
+ // and need to abort
+ return;
+ }
- // any mismatches left in 'respData' are still there --
- // hopefully, they are transient issues on the other side
- AuditData auditData = new AuditData();
- auditData.clientData = respData.serverData;
- auditData.serverData = respData.clientData;
-
- // serialize
- byte[] encodedData = auditData.encode();
- if (encodedData == null) {
- // error has already been displayed
- return;
- }
+ // any mismatches left in 'respData' are still there --
+ // hopefully, they are transient issues on the other side
+ AuditData auditData = new AuditData();
+ auditData.clientData = respData.serverData;
+ auditData.serverData = respData.clientData;
- // generate entity
- Entity<String> entity =
- Entity.entity(new String(encodedData),
- MediaType.APPLICATION_OCTET_STREAM_TYPE);
-
- // send new list to other end
- response = server
- .getWebTarget("lock/audit")
- .queryParam("server", server.getUuid().toString())
- .queryParam("ttl", timeToLive)
- .request().post(entity);
-
- respData = AuditData.decode(response.readEntity(byte[].class));
- if (respData == null) {
- logger.error("TargetLock.auditDataBuilder.send: "
- + "couldn't process response from {}",
- server);
- return;
- }
+ // serialize
+ byte[] encodedData = auditData.encode();
+ if (encodedData == null) {
+ // error has already been displayed
+ return;
+ }
- // if there are mismatches left, they are presumably real
- respData.processResponse(server);
- }
- });
+ // generate entity
+ Entity<String> entity =
+ Entity.entity(new String(encodedData),
+ MediaType.APPLICATION_OCTET_STREAM_TYPE);
+
+ // send new list to other end
+ response = server
+ .getWebTarget("lock/audit")
+ .queryParam(QP_SERVER, server.getUuid().toString())
+ .queryParam(QP_TTL, timeToLive)
+ .request().post(entity);
+
+ respData = AuditData.decode(response.readEntity(byte[].class));
+ if (respData == null) {
+ logger.error("TargetLock.auditDataBuilder.send: "
+ + "couldn't process response from {}",
+ server);
+ return;
+ }
+
+ // if there are mismatches left, they are presumably real
+ respData.processResponse(server);
+ }
+
+ // Handle mismatches indicated by an audit response -- a return value of
+ // 'true' indicates that there were no mismatches after a retry, or
+ // we received an interrupt. In either case, the caller returns.
+ private static boolean responseSupport(AuditData respData, Object serverString, String caller) {
+ logger.info("{}: mismatches from {}", caller, serverString);
+ try {
+ Thread.sleep(auditRetryDelay);
+ } catch (InterruptedException e) {
+ logger.error("{}: Interrupted handling audit response from {}",
+ caller, serverString);
+ // just abort
+ Thread.currentThread().interrupt();
+ return true;
+ }
+
+ // This will check against our own data -- any mismatches
+ // mean that things have changed since we sent out the
+ // first message. We will remove any mismatches from
+ // 'respData', and see if there are any left.
+ AuditData mismatches = respData.generateResponse(false);
+
+ respData.serverData.removeAll(mismatches.clientData);
+ respData.clientData.removeAll(mismatches.serverData);
+
+ if (respData.clientData.isEmpty()
+ && respData.serverData.isEmpty()) {
+ // no mismatches --
+ // there must have been transient issues on our side
+ logger.info("{}: no mismatches from {} after retry",
+ caller, serverString);
+ return true;
}
+
+ return false;
}
}
}