diff options
Diffstat (limited to 'feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java')
-rw-r--r-- | feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java | 958 |
1 files changed, 491 insertions, 467 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java index 7e4b795f..65804082 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java @@ -101,8 +101,8 @@ public class TargetLock implements Lock, Serializable { private static ReferenceQueue<TargetLock> abandoned = new ReferenceQueue<>(); // some status codes - static final int ACCEPTED = 202; //Response.Status.ACCEPTED.getStatusCode(); - static final int NO_CONTENT = 204; //Response.Status.NO_CONTENT.getStatusCode(); + static final int ACCEPTED = 202; //Response.Status.ACCEPTED.getStatusCode() + static final int NO_CONTENT = 204; //Response.Status.NO_CONTENT.getStatusCode() static final int LOCKED = 423; // Values extracted from properties @@ -131,13 +131,21 @@ public class TargetLock implements Lock, Serializable { // this is used to notify the application when a lock is available, // or if it is not available - private LockCallback owner; + private volatile LockCallback owner; // This is what is actually called by the infrastructure to do the owner // notification. The owner may be running in a Drools session, in which case // the actual notification should be done within that thread -- the 'context' // object ensures that it happens this way. - private LockCallback context; + private volatile LockCallback context; + + // HTTP query parameters + private static final String QP_KEY = "key"; + private static final String QP_OWNER = "owner"; + private static final String QP_UUID = "uuid"; + private static final String QP_WAIT = "wait"; + private static final String QP_SERVER = "server"; + private static final String QP_TTL = "ttl"; /** * This method triggers registration of 'eventHandler', and also extracts @@ -221,7 +229,7 @@ public class TargetLock implements Lock, Serializable { if (session != null) { // deliver through a 'PolicySessionContext' class Object lcontext = session.getAdjunct(PolicySessionContext.class); - if (lcontext == null || !(lcontext instanceof LockCallback)) { + if (!(lcontext instanceof LockCallback)) { context = new PolicySessionContext(session); session.setAdjunct(PolicySessionContext.class, context); } else { @@ -301,11 +309,11 @@ public class TargetLock implements Lock, Serializable { @Override public WebTarget webTarget(WebTarget webTarget) { return webTarget - .queryParam("key", key) - .queryParam("owner", ownerKey) - .queryParam("uuid", identity.uuid.toString()) - .queryParam("wait", waitForLock) - .queryParam("ttl", timeToLive); + .queryParam(QP_KEY, key) + .queryParam(QP_OWNER, ownerKey) + .queryParam(QP_UUID, identity.uuid.toString()) + .queryParam(QP_WAIT, waitForLock) + .queryParam(QP_TTL, timeToLive); } /** @@ -323,14 +331,13 @@ public class TargetLock implements Lock, Serializable { * 423 (Locked) - lock in use, and 'waitForLock' is 'false' */ switch (response.getStatus()) { - case NO_CONTENT: { + case NO_CONTENT: // lock successful setState(State.ACTIVE); context.lockAvailable(TargetLock.this); break; - } - case LOCKED: { + case LOCKED: // failed -- lock in use, and 'waitForLock == false' setState(State.FREE); synchronized (localLocks) { @@ -340,13 +347,12 @@ public class TargetLock implements Lock, Serializable { wr.clear(); context.lockUnavailable(TargetLock.this); break; - } case ACCEPTED: break; default: - logger.error("Unknown status: ", response.getStatus()); + logger.error("Unknown status: {}", response.getStatus()); break; } } @@ -434,6 +440,7 @@ public class TargetLock implements Lock, Serializable { */ @Override public void extend(int holdSec, LockCallback callback) { + // not implemented yet } /********************/ @@ -473,7 +480,8 @@ public class TargetLock implements Lock, Serializable { if (!Bucket.isKeyOnThisServer(key)) { // this is the wrong server -- forward to the correct one // (we can use this thread) - if ((ttl -= 1) > 0) { + ttl -= 1; + if (ttl > 0) { Server server = Bucket.bucketToServer(Bucket.bucketNumber(key)); if (server != null) { WebTarget webTarget = server.getWebTarget("lock/lock"); @@ -483,11 +491,11 @@ public class TargetLock implements Lock, Serializable { server.getUuid(), key, ownerKey, uuid, waitForLock, ttl); return webTarget - .queryParam("key", key) - .queryParam("owner", ownerKey) - .queryParam("uuid", uuid.toString()) - .queryParam("wait", waitForLock) - .queryParam("ttl", String.valueOf(ttl)) + .queryParam(QP_KEY, key) + .queryParam(QP_OWNER, ownerKey) + .queryParam(QP_UUID, uuid.toString()) + .queryParam(QP_WAIT, waitForLock) + .queryParam(QP_TTL, String.valueOf(ttl)) .request().get(); } } @@ -527,7 +535,8 @@ public class TargetLock implements Lock, Serializable { if (!Bucket.isKeyOnThisServer(key)) { // this is the wrong server -- forward to the correct one // (we can use this thread) - if ((ttl -= 1) > 0) { + ttl -= 1; + if (ttl > 0) { Server server = Bucket.bucketToServer(Bucket.bucketNumber(key)); if (server != null) { WebTarget webTarget = server.getWebTarget("lock/free"); @@ -536,10 +545,10 @@ public class TargetLock implements Lock, Serializable { + "(key={},owner={},uuid={},ttl={})", server.getUuid(), key, ownerKey, uuid, ttl); return webTarget - .queryParam("key", key) - .queryParam("owner", ownerKey) - .queryParam("uuid", uuid.toString()) - .queryParam("ttl", String.valueOf(ttl)) + .queryParam(QP_KEY, key) + .queryParam(QP_OWNER, ownerKey) + .queryParam(QP_UUID, uuid.toString()) + .queryParam(QP_TTL, String.valueOf(ttl)) .request().get(); } } @@ -575,7 +584,8 @@ public class TargetLock implements Lock, Serializable { if (!Bucket.isKeyOnThisServer(ownerKey)) { // this is the wrong server -- forward to the correct one // (we can use this thread) - if ((ttl -= 1) > 0) { + ttl -= 1; + if (ttl > 0) { Server server = Bucket.bucketToServer(Bucket.bucketNumber(key)); if (server != null) { WebTarget webTarget = server.getWebTarget("lock/locked"); @@ -584,10 +594,10 @@ public class TargetLock implements Lock, Serializable { + "(key={},owner={},uuid={},ttl={})", server.getUuid(), key, ownerKey, uuid, ttl); return webTarget - .queryParam("key", key) - .queryParam("owner", ownerKey) - .queryParam("uuid", uuid.toString()) - .queryParam("ttl", String.valueOf(ttl)) + .queryParam(QP_KEY, key) + .queryParam(QP_OWNER, ownerKey) + .queryParam(QP_UUID, uuid.toString()) + .queryParam(QP_TTL, String.valueOf(ttl)) .request().get(); } } @@ -744,6 +754,7 @@ public class TargetLock implements Lock, Serializable { */ @Override public void shutdown() { + // nothing needs to be done } /** @@ -887,10 +898,10 @@ public class TargetLock implements Lock, Serializable { @Override public WebTarget webTarget(WebTarget webTarget) { return webTarget - .queryParam("key", key) - .queryParam("owner", ownerKey) - .queryParam("uuid", uuid.toString()) - .queryParam("ttl", timeToLive); + .queryParam(QP_KEY, key) + .queryParam(QP_OWNER, ownerKey) + .queryParam(QP_UUID, uuid.toString()) + .queryParam(QP_TTL, timeToLive); } @Override @@ -898,21 +909,19 @@ public class TargetLock implements Lock, Serializable { logger.info("Free response={} (code={})", response, response.getStatus()); switch (response.getStatus()) { - case NO_CONTENT: { + case NO_CONTENT: // free successful -- don't need to do anything break; - } - case LOCKED: { + case LOCKED: // free failed logger.error("TargetLock free failed, " + "key={}, owner={}, uuid={}", key, ownerKey, uuid); break; - } default: - logger.error("Unknown status: ", response.getStatus()); + logger.error("Unknown status: {}", response.getStatus()); break; } } @@ -986,12 +995,10 @@ public class TargetLock implements Lock, Serializable { public void lockAvailable(final Lock lock) { // Run 'owner.lockAvailable' within the Drools session if (policySession != null) { - policySession.getKieSession().insert(new DroolsRunnable() { - @Override - public void run() { - ((TargetLock)lock).owner.lockAvailable(lock); - } - }); + DroolsRunnable callback = () -> { + ((TargetLock)lock).owner.lockAvailable(lock); + }; + policySession.getKieSession().insert(callback); } } @@ -1002,12 +1009,10 @@ public class TargetLock implements Lock, Serializable { public void lockUnavailable(Lock lock) { // Run 'owner.unlockAvailable' within the Drools session if (policySession != null) { - policySession.getKieSession().insert(new DroolsRunnable() { - @Override - public void run() { - ((TargetLock)lock).owner.lockUnavailable(lock); - } - }); + DroolsRunnable callback = () -> { + ((TargetLock)lock).owner.lockUnavailable(lock); + }; + policySession.getKieSession().insert(callback); } } @@ -1218,16 +1223,16 @@ public class TargetLock implements Lock, Serializable { */ private static class LockEntry implements Serializable { // string key identifying the lock - String key; + private String key; // string key identifying the owner - String currentOwnerKey; + private String currentOwnerKey; // UUID identifying the original 'TargetLock - UUID currentOwnerUuid; + private UUID currentOwnerUuid; // list of pending lock requests for this key - Queue<Waiting> waitingList = new LinkedList<>(); + private Queue<Waiting> waitingList = new LinkedList<>(); /** * Constructor - initialize the 'LockEntry'. @@ -1273,27 +1278,19 @@ public class TargetLock implements Lock, Serializable { @Override public WebTarget webTarget(WebTarget webTarget) { return webTarget - .queryParam("key", key) - .queryParam("owner", currentOwnerKey) - .queryParam("uuid", currentOwnerUuid.toString()) - .queryParam("ttl", timeToLive); + .queryParam(QP_KEY, key) + .queryParam(QP_OWNER, currentOwnerKey) + .queryParam(QP_UUID, currentOwnerUuid.toString()) + .queryParam(QP_TTL, timeToLive); } @Override public void response(Response response) { logger.info("Locked response={} (code={})", response, response.getStatus()); - switch (response.getStatus()) { - case NO_CONTENT: { - // successful -- we are done - break; - } - - default: { - // notification failed -- free this one - globalLocks.unlock(key, currentOwnerUuid); - break; - } + if (response.getStatus() != NO_CONTENT) { + // notification failed -- free this one + globalLocks.unlock(key, currentOwnerUuid); } } }); @@ -1409,7 +1406,6 @@ public class TargetLock implements Lock, Serializable { while (abandonedHandler != null) { try { Reference<? extends TargetLock> wr = abandoned.remove(); - TargetLock notify = null; // At this point, we know that 'ref' is a // 'WeakReference<TargetLock>' instance that has been abandoned, @@ -1515,7 +1511,8 @@ public class TargetLock implements Lock, Serializable { */ static byte[] dumpLocksData(UUID serverUuid, int ttl) throws IOException { if (!Server.getThisServer().getUuid().equals(serverUuid)) { - if ((ttl -= 1) > 0) { + ttl -= 1; + if (ttl > 0) { Server server = Server.getServer(serverUuid); if (server != null) { WebTarget webTarget = @@ -1524,8 +1521,8 @@ public class TargetLock implements Lock, Serializable { logger.info("Forwarding 'lock/dumpLocksData' to uuid {}", serverUuid); return webTarget - .queryParam("server", serverUuid.toString()) - .queryParam("ttl", String.valueOf(ttl)) + .queryParam(QP_SERVER, serverUuid.toString()) + .queryParam(QP_TTL, String.valueOf(ttl)) .request().get(byte[].class); } } @@ -1571,8 +1568,8 @@ public class TargetLock implements Lock, Serializable { @Override public WebTarget webTarget(WebTarget webTarget) { return webTarget - .queryParam("server", server.getUuid().toString()) - .queryParam("ttl", timeToLive); + .queryParam(QP_SERVER, server.getUuid().toString()) + .queryParam(QP_TTL, timeToLive); } @Override @@ -1621,128 +1618,144 @@ public class TargetLock implements Lock, Serializable { // process the client-end data for (ClientData clientData : hostData.clientDataList) { - // 'true' if the bucket associated with this 'ClientData' - // doesn't belong to the remote server, as far as we can tell - boolean serverMismatch = - Bucket.bucketToServer(clientData.bucketNumber) != server; - - // each 'ClientDataRecord' instance corresponds to an - // active 'Identity' (TargetLock) instance - for (ClientDataRecord cdr : clientData.clientDataRecords) { - // update maximum 'key' and 'ownerKey' lengths - updateKeyLength(cdr.identity.key); - updateOwnerKeyLength(cdr.identity.ownerKey); - - // fetch UUID - UUID uuid = cdr.identity.uuid; - - // fetch/generate 'MergeData' instance for this UUID - MergedData md = mergedDataMap.get(uuid); - if (md == null) { - md = new MergedData(uuid); - mergedDataMap.put(uuid, md); - } - - // update 'MergedData.clientDataRecord' - if (md.clientDataRecord == null) { - md.clientDataRecord = cdr; - } else { - md.comment("Duplicate client entry for UUID"); - } - - if (serverMismatch) { - // need to generate an additional error - md.comment(server.toString() - + "(client) does not own bucket " - + clientData.bucketNumber); - } - } + populateLockData_clientData(clientData, server); } // process the server-end data for (ServerData serverData : hostData.serverDataList) { - // 'true' if the bucket associated with this 'ServerData' - // doesn't belong to the remote server, as far as we can tell - boolean serverMismatch = - Bucket.bucketToServer(serverData.bucketNumber) != server; - - // each 'LockEntry' instance corresponds to the current holder - // of a lock, and all requestors waiting for it to be freed - for (LockEntry le : serverData.globalLocks.keyToEntry.values()) { - // update maximum 'key' and 'ownerKey' lengths - updateKeyLength(le.key); - updateOwnerKeyLength(le.currentOwnerKey); - - // fetch uuid - UUID uuid = le.currentOwnerUuid; - - // fetch/generate 'MergeData' instance for this UUID - MergedData md = mergedDataMap.get(uuid); - if (md == null) { - md = new MergedData(uuid); - mergedDataMap.put(uuid, md); - } + populateLockData_serverData(serverData, server); + } + } else { + logger.error("TargetLock.DumpLocks.populateLockData: " + + "received data has class {}", + decodedData.getClass().getName()); + } + } - // update 'lockEntries' table entry - if (lockEntries.get(le.key) != null) { - md.comment("Duplicate server entry for key " + le.key); - } else { - lockEntries.put(le.key, le); - } + private void populateLockData_clientData(ClientData clientData, Server server) { + // 'true' if the bucket associated with this 'ClientData' + // doesn't belong to the remote server, as far as we can tell + boolean serverMismatch = + Bucket.bucketToServer(clientData.bucketNumber) != server; - // update 'MergedData.serverLockEntry' - // (leave 'MergedData.serverWaiting' as 'null', because - // this field is only used for waiting entries) - if (md.serverLockEntry == null) { - md.serverLockEntry = le; - } else { - md.comment("Duplicate server entry for UUID"); - } + // each 'ClientDataRecord' instance corresponds to an + // active 'Identity' (TargetLock) instance + for (ClientDataRecord cdr : clientData.clientDataRecords) { + // update maximum 'key' and 'ownerKey' lengths + updateKeyLength(cdr.identity.key); + updateOwnerKeyLength(cdr.identity.ownerKey); - if (serverMismatch) { - // need to generate an additional error - md.comment(server.toString() - + "(server) does not own bucket " - + serverData.bucketNumber); - } + // fetch UUID + UUID uuid = cdr.identity.uuid; - // we need 'MergeData' entries for all waiting requests - for (Waiting waiting : le.waitingList) { - // update maximum 'ownerKey' length - updateOwnerKeyLength(waiting.ownerKey); + // fetch/generate 'MergeData' instance for this UUID + MergedData md = mergedDataMap.get(uuid); + if (md == null) { + md = new MergedData(uuid); + mergedDataMap.put(uuid, md); + } - // fetch uuid - uuid = waiting.ownerUuid; + // update 'MergedData.clientDataRecord' + if (md.clientDataRecord == null) { + md.clientDataRecord = cdr; + } else { + md.comment("Duplicate client entry for UUID"); + } - // fetch/generate 'MergeData' instance for this UUID - md = mergedDataMap.get(uuid); - if (md == null) { - md = new MergedData(uuid); - mergedDataMap.put(uuid, md); - } + if (serverMismatch) { + // need to generate an additional error + md.comment(server.toString() + + "(client) does not own bucket " + + clientData.bucketNumber); + } + } + } - // update 'MergedData.serverLockEntry' and - // 'MergedData.serverWaiting' - if (md.serverLockEntry == null) { - md.serverLockEntry = le; - md.serverWaiting = waiting; - } else { - md.comment("Duplicate server entry for UUID"); - } + private void populateLockData_serverData(ServerData serverData, Server server) { + // 'true' if the bucket associated with this 'ServerData' + // doesn't belong to the remote server, as far as we can tell + boolean serverMismatch = + Bucket.bucketToServer(serverData.bucketNumber) != server; - if (serverMismatch) { - // need to generate an additional error - md.comment(server.toString() - + "(server) does not own bucket " - + serverData.bucketNumber); - } - } - } + // each 'LockEntry' instance corresponds to the current holder + // of a lock, and all requestors waiting for it to be freed + for (LockEntry le : serverData.globalLocks.keyToEntry.values()) { + // update maximum 'key' and 'ownerKey' lengths + updateKeyLength(le.key); + updateOwnerKeyLength(le.currentOwnerKey); + + // fetch uuid + UUID uuid = le.currentOwnerUuid; + + // fetch/generate 'MergeData' instance for this UUID + MergedData md = mergedDataMap.get(uuid); + if (md == null) { + md = new MergedData(uuid); + mergedDataMap.put(uuid, md); + } + + // update 'lockEntries' table entry + if (lockEntries.get(le.key) != null) { + md.comment("Duplicate server entry for key " + le.key); + } else { + lockEntries.put(le.key, le); + } + + // update 'MergedData.serverLockEntry' + // (leave 'MergedData.serverWaiting' as 'null', because + // this field is only used for waiting entries) + if (md.serverLockEntry == null) { + md.serverLockEntry = le; + } else { + md.comment("Duplicate server entry for UUID"); + } + + if (serverMismatch) { + // need to generate an additional error + md.comment(server.toString() + + "(server) does not own bucket " + + serverData.bucketNumber); + } + + // we need 'MergeData' entries for all waiting requests + for (Waiting waiting : le.waitingList) { + populateLockData_serverData_waiting( + serverData, server, serverMismatch, le, waiting); } + } + } + + private void populateLockData_serverData_waiting( + ServerData serverData, Server server, boolean serverMismatch, + LockEntry le, Waiting waiting) { + + // update maximum 'ownerKey' length + updateOwnerKeyLength(waiting.ownerKey); + + // fetch uuid + UUID uuid = waiting.ownerUuid; + + // fetch/generate 'MergeData' instance for this UUID + MergedData md = mergedDataMap.get(uuid); + if (md == null) { + md = new MergedData(uuid); + mergedDataMap.put(uuid, md); + } + + // update 'MergedData.serverLockEntry' and + // 'MergedData.serverWaiting' + if (md.serverLockEntry == null) { + md.serverLockEntry = le; + md.serverWaiting = waiting; } else { - logger.error("TargetLock.DumpLocks.populateLockData: " - + "received data has class " - + decodedData.getClass().getName()); + md.comment("Duplicate server entry for UUID"); + } + + if (serverMismatch) { + // need to generate an additional error + md.comment(server.toString() + + "(server) does not own bucket " + + serverData.bucketNumber); } } @@ -1801,6 +1814,11 @@ public class TargetLock implements Lock, Serializable { out.printf(format, "---", "---------", "----", "-----", "--------"); } + dump_serverTable(out); + dump_clientOnlyEntries(out); + } + + private void dump_serverTable(PrintStream out) { // iterate over the server table for (LockEntry le : lockEntries.values()) { // fetch merged data @@ -1841,7 +1859,9 @@ public class TargetLock implements Lock, Serializable { dumpMoreComments(out, md); } } + } + private void dump_clientOnlyEntries(PrintStream out) { // client records that don't have matching server entries for (MergedData md : clientOnlyEntries.values()) { ClientDataRecord cdr = md.clientDataRecord; @@ -2017,13 +2037,13 @@ public class TargetLock implements Lock, Serializable { */ static class HostData implements Serializable { // the UUID of the host sending the data - UUID hostUuid; + private UUID hostUuid; // all of the information derived from the 'LocalLocks' data - List<ClientData> clientDataList; + private List<ClientData> clientDataList; // all of the information derived from the 'GlobalLocks' data - List<ServerData> serverDataList; + private List<ServerData> serverDataList; /** * Constructor - this goes through all of the lock tables, @@ -2086,10 +2106,10 @@ public class TargetLock implements Lock, Serializable { */ static class ClientData implements Serializable { // number of the bucket - int bucketNumber; + private int bucketNumber; // all of the client locks within this bucket - List<ClientDataRecord> clientDataRecords; + private List<ClientDataRecord> clientDataRecords; /** * Constructor - initially, there are no 'clientDataRecords'. @@ -2108,11 +2128,11 @@ public class TargetLock implements Lock, Serializable { */ static class ClientDataRecord implements Serializable { // contains key, ownerKey, uuid - Identity identity; + private Identity identity; // state field of 'TargetLock' // (may be 'null' if there is no 'TargetLock') - State state; + private State state; /** * Constructor - initialize the fields. @@ -2132,10 +2152,10 @@ public class TargetLock implements Lock, Serializable { */ static class ServerData implements Serializable { // number of the bucket - int bucketNumber; + private int bucketNumber; // server-side data associated with a single bucket - GlobalLocks globalLocks; + private GlobalLocks globalLocks; /** * Constructor - initialize the fields. @@ -2158,15 +2178,15 @@ public class TargetLock implements Lock, Serializable { */ static class AuditData implements Serializable { // sending UUID - UUID hostUuid; + private UUID hostUuid; // client records that currently exist, or records to be cleared // (depending upon message) -- client/server is from the senders side - List<Identity> clientData; + private List<Identity> clientData; // server records that currently exist, or records to be cleared // (depending upon message) -- client/server is from the senders side - List<Identity> serverData; + private List<Identity> serverData; /** * Constructor - set 'hostUuid' to the current host, and start with @@ -2174,8 +2194,8 @@ public class TargetLock implements Lock, Serializable { */ AuditData() { hostUuid = Server.getThisServer().getUuid(); - clientData = new ArrayList<Identity>(); - serverData = new ArrayList<Identity>(); + clientData = new ArrayList<>(); + serverData = new ArrayList<>(); } /** @@ -2191,8 +2211,17 @@ public class TargetLock implements Lock, Serializable { AuditData response = new AuditData(); // compare remote servers client data with our server data + generateResponse_clientEnd(response, includeWarnings); + + // test server data + generateResponse_serverEnd(response, includeWarnings); + + return response; + } + + private void generateResponse_clientEnd(AuditData response, boolean includeWarnings) { for (Identity identity : clientData) { - // we are the server in this case + // remote end is the client, and we are the server Bucket bucket = Bucket.getBucket(identity.key); GlobalLocks globalLocks = bucket.getAdjunctDontCreate(GlobalLocks.class); @@ -2240,10 +2269,11 @@ public class TargetLock implements Lock, Serializable { // it was 'clientData' to the sender, but 'serverData' to us response.serverData.add(identity); } + } - // test server data + private void generateResponse_serverEnd(AuditData response, boolean includeWarnings) { for (Identity identity : serverData) { - // we are the client in this case + // remote end is the server, and we are the client Bucket bucket = Bucket.getBucket(identity.ownerKey); LocalLocks localLocks = bucket.getAdjunctDontCreate(LocalLocks.class); @@ -2275,8 +2305,6 @@ public class TargetLock implements Lock, Serializable { } response.clientData.add(identity); } - - return response; } /** @@ -2400,19 +2428,14 @@ public class TargetLock implements Lock, Serializable { * Run a single audit cycle. */ static void runAudit() { - try { - logger.info("Starting TargetLock audit"); - Audit audit = new Audit(); + logger.info("Starting TargetLock audit"); + Audit audit = new Audit(); - // populate 'auditMap' table - audit.build(); + // populate 'auditMap' table + audit.build(); - // send to all of the servers in 'auditMap' (may include this server) - audit.send(); - } catch (InterruptedException e) { - logger.error("TargetLock audit interrupted", e); - Thread.currentThread().interrupt(); - } + // send to all of the servers in 'auditMap' (may include this server) + audit.send(); } /** @@ -2441,63 +2464,59 @@ public class TargetLock implements Lock, Serializable { // this needs to run in the 'MainLoop' thread, because it is dependent // upon the list of servers, and our position in this list - MainLoop.queueWork(new Runnable() { - /** - * {@inheritDoc} - */ - @Override - public void run() { - // current list of servers - Collection<Server> servers = Server.getServers(); + MainLoop.queueWork(() -> { + // this runs in the 'MainLoop' thread - // count of the number of servers - int count = servers.size(); + // current list of servers + Collection<Server> servers = Server.getServers(); - // will contain our position in this list - int index = 0; + // count of the number of servers + int count = servers.size(); - // current server - Server thisServer = Server.getThisServer(); + // will contain our position in this list + int index = 0; - for (Server server : servers) { - if (server == thisServer) { - break; - } - index += 1; + // current server + Server thisServer = Server.getThisServer(); + + for (Server server : servers) { + if (server == thisServer) { + break; } + index += 1; + } - // if index == count, we didn't find this server - // (which shouldn't happen) - - if (index < count) { - // The servers are ordered by UUID, and 'index' is this - // server's position within the list. Suppose the period is - // 60000 (60 seconds), and there are 5 servers -- the first one - // will run the audit at 0 seconds after the minute, the next - // at 12 seconds after the minute, then 24, 36, 48. - long offset = (period * index) / count; - - // the earliest time we want the audit to run - long time = System.currentTimeMillis() + gracePeriod; - long startTime = time - (time % period) + offset; - if (startTime <= time) { - startTime += period; + // if index == count, we didn't find this server + // (which shouldn't happen) + + if (index < count) { + // The servers are ordered by UUID, and 'index' is this + // server's position within the list. Suppose the period is + // 60000 (60 seconds), and there are 5 servers -- the first one + // will run the audit at 0 seconds after the minute, the next + // at 12 seconds after the minute, then 24, 36, 48. + long offset = (period * index) / count; + + // the earliest time we want the audit to run + long time = System.currentTimeMillis() + gracePeriod; + long startTime = time - (time % period) + offset; + if (startTime <= time) { + startTime += period; + } + synchronized (Audit.class) { + if (timerTask != null) { + timerTask.cancel(); } - synchronized (Audit.class) { - if (timerTask != null) { - timerTask.cancel(); + timerTask = new TimerTask() { + @Override + public void run() { + runAudit(); } - timerTask = new TimerTask() { - @Override - public void run() { - runAudit(); - } - }; + }; - // now, schedule the timer - Util.timer.scheduleAtFixedRate( - timerTask, new Date(startTime), period); - } + // now, schedule the timer + Util.timer.scheduleAtFixedRate( + timerTask, new Date(startTime), period); } } }); @@ -2514,7 +2533,8 @@ public class TargetLock implements Lock, Serializable { */ static byte[] incomingAudit(UUID serverUuid, int ttl, byte[] encodedData) { if (!Server.getThisServer().getUuid().equals(serverUuid)) { - if ((ttl -= 1) > 0) { + ttl -= 1; + if (ttl > 0) { Server server = Server.getServer(serverUuid); if (server != null) { WebTarget webTarget = server.getWebTarget("lock/audit"); @@ -2525,8 +2545,8 @@ public class TargetLock implements Lock, Serializable { Entity.entity(new String(encodedData), MediaType.APPLICATION_OCTET_STREAM_TYPE); return webTarget - .queryParam("server", serverUuid.toString()) - .queryParam("ttl", String.valueOf(ttl)) + .queryParam(QP_SERVER, serverUuid.toString()) + .queryParam(QP_TTL, String.valueOf(ttl)) .request().post(entity, byte[].class); } } @@ -2556,53 +2576,63 @@ public class TargetLock implements Lock, Serializable { Bucket bucket = Bucket.getBucket(i); // client data - LocalLocks localLocks = - bucket.getAdjunctDontCreate(LocalLocks.class); - if (localLocks != null) { - synchronized (localLocks) { - // we have client data for this bucket - for (Identity identity : - localLocks.weakReferenceToIdentity.values()) { - // find or create the 'AuditData' instance associated - // with the server owning the 'key' - AuditData auditData = getAuditData(identity.key); - if (auditData != null) { - auditData.clientData.add(identity); - } + build_clientData(bucket); + + // server data + build_serverData(bucket); + } + } + + private void build_clientData(Bucket bucket) { + // client data + LocalLocks localLocks = + bucket.getAdjunctDontCreate(LocalLocks.class); + if (localLocks != null) { + synchronized (localLocks) { + // we have client data for this bucket + for (Identity identity : + localLocks.weakReferenceToIdentity.values()) { + // find or create the 'AuditData' instance associated + // with the server owning the 'key' + AuditData auditData = getAuditData(identity.key); + if (auditData != null) { + auditData.clientData.add(identity); } } } + } + } - // server data - GlobalLocks globalLocks = - bucket.getAdjunctDontCreate(GlobalLocks.class); - if (globalLocks != null) { - // we have server data for this bucket - Map<String, LockEntry> keyToEntry = globalLocks.keyToEntry; - synchronized (keyToEntry) { - for (LockEntry le : keyToEntry.values()) { + private void build_serverData(Bucket bucket) { + // server data + GlobalLocks globalLocks = + bucket.getAdjunctDontCreate(GlobalLocks.class); + if (globalLocks != null) { + // we have server data for this bucket + Map<String, LockEntry> keyToEntry = globalLocks.keyToEntry; + synchronized (keyToEntry) { + for (LockEntry le : keyToEntry.values()) { + // find or create the 'AuditData' instance associated + // with the current 'ownerKey' + AuditData auditData = getAuditData(le.currentOwnerKey); + if (auditData != null) { + // create an 'Identity' entry, and add it to + // the list associated with the remote server + auditData.serverData.add( + new Identity(le.key, le.currentOwnerKey, + le.currentOwnerUuid)); + } + + for (Waiting waiting : le.waitingList) { // find or create the 'AuditData' instance associated - // with the current 'ownerKey' - AuditData auditData = getAuditData(le.currentOwnerKey); + // with the waiting entry 'ownerKey' + auditData = getAuditData(waiting.ownerKey); if (auditData != null) { // create an 'Identity' entry, and add it to // the list associated with the remote server auditData.serverData.add( - new Identity(le.key, le.currentOwnerKey, - le.currentOwnerUuid)); - } - - for (Waiting waiting : le.waitingList) { - // find or create the 'AuditData' instance associated - // with the waiting entry 'ownerKey' - auditData = getAuditData(waiting.ownerKey); - if (auditData != null) { - // create an 'Identity' entry, and add it to - // the list associated with the remote server - auditData.serverData.add( - new Identity(le.key, waiting.ownerKey, - waiting.ownerUuid)); - } + new Identity(le.key, waiting.ownerKey, + waiting.ownerUuid)); } } } @@ -2618,12 +2648,8 @@ public class TargetLock implements Lock, Serializable { // map 'key -> bucket number', and then 'bucket number' -> 'server' Server server = Bucket.bucketToServer(Bucket.bucketNumber(key)); if (server != null) { - AuditData auditData = auditMap.get(server); - if (auditData == null) { - // doesn't exist yet -- create it - auditData = new AuditData(); - auditMap.put(server, auditData); - } + AuditData auditData = + auditMap.computeIfAbsent(server, sk -> new AuditData()); return auditData; } @@ -2635,7 +2661,7 @@ public class TargetLock implements Lock, Serializable { * Using the collected 'auditMap', send out the messages to all of the * servers. */ - void send() throws InterruptedException { + void send() { if (auditMap.isEmpty()) { logger.info("TargetLock audit: no locks on this server"); } else { @@ -2644,178 +2670,176 @@ public class TargetLock implements Lock, Serializable { } for (final Server server : auditMap.keySet()) { - // fetch audit data - AuditData auditData = auditMap.get(server); + send_server(server); + } + } - if (server == Server.getThisServer()) { - // process this locally - final AuditData respData = auditData.generateResponse(true); - if (respData.clientData.isEmpty() - && respData.serverData.isEmpty()) { - // no mismatches - logger.info("TargetLock.Audit.send: " - + "no errors from self ({})", server); - continue; + private void send_server(final Server server) { + // fetch audit data + AuditData auditData = auditMap.get(server); + + if (server == Server.getThisServer()) { + // process this locally + final AuditData respData = auditData.generateResponse(true); + if (respData.clientData.isEmpty() + && respData.serverData.isEmpty()) { + // no mismatches + logger.info("TargetLock.Audit.send: " + + "no errors from self ({})", server); + return; + } + + // do the rest in a separate thread + server.getThreadPool().execute(() -> { + // wait a few seconds, and see if we still know of these + // errors + if (AuditPostResponse.responseSupport( + respData, "self (" + server + ")", + "TargetLock.Audit.send")) { + // a return falue of 'true' either indicates the + // mismatches were resolved after a retry, or we + // received an interrupt, and need to abort + return; } - // do the rest in a separate thread - server.getThreadPool().execute(new Runnable() { - @Override - public void run() { - // wait a few seconds, and see if we still know of these - // errors - logger.info("TargetLock.Audit.send: " - + "mismatches from self ({})", server); - try { - Thread.sleep(auditRetryDelay); - } catch (InterruptedException e) { - logger.error("TargetLock.Audit.send: Interrupted " - + "handling audit response from self ({})", - server); - // just abort - Thread.currentThread().interrupt(); - return; - } + // any mismatches left in 'respData' are still issues + respData.processResponse(server); + }); + return; + } - // This will check against our own data -- any mismatches - // mean that things have changed since we sent out the - // first message. We will remove any mismatches from - // 'respData', and see if there are any left. - AuditData mismatches = respData.generateResponse(false); - - respData.serverData.removeAll(mismatches.clientData); - respData.clientData.removeAll(mismatches.serverData); - - if (respData.clientData.isEmpty() - && respData.serverData.isEmpty()) { - // no mismatches -- - // there must have been transient issues on our side - logger.info("TargetLock.Audit.send: " - + "no mismatches from self " - + "({}) after retry", server); - return; - } + // serialize + byte[] encodedData = auditData.encode(); + if (encodedData == null) { + // error has already been displayed + return; + } - // any mismatches left in 'respData' are still issues - respData.processResponse(server); - } - }); - continue; - } + // generate entity + Entity<String> entity = + Entity.entity(new String(encodedData), + MediaType.APPLICATION_OCTET_STREAM_TYPE); - // serialize - byte[] encodedData = auditData.encode(); - if (encodedData == null) { - // error has already been displayed - continue; - } + server.post("lock/audit", entity, new AuditPostResponse(server)); + } + } - // generate entity - Entity<String> entity = - Entity.entity(new String(encodedData), - MediaType.APPLICATION_OCTET_STREAM_TYPE); + static class AuditPostResponse implements Server.PostResponse { + private Server server; - server.post("lock/audit", entity, new Server.PostResponse() { - @Override - public WebTarget webTarget(WebTarget webTarget) { - // include the 'uuid' keyword - return webTarget - .queryParam("server", server.getUuid().toString()) - .queryParam("ttl", timeToLive); - } + AuditPostResponse(Server server) { + this.server = server; + } - @Override - public void response(Response response) { - // process the response here - AuditData respData = - AuditData.decode(response.readEntity(byte[].class)); - if (respData == null) { - logger.error("TargetLock.Audit.send: " - + "couldn't process response from {}", - server); - return; - } + @Override + public WebTarget webTarget(WebTarget webTarget) { + // include the 'uuid' keyword + return webTarget + .queryParam(QP_SERVER, server.getUuid().toString()) + .queryParam(QP_TTL, timeToLive); + } - // if we reach this point, we got a response - if (respData.clientData.isEmpty() - && respData.serverData.isEmpty()) { - // no mismatches - logger.info("TargetLock.Audit.send: " - + "no errors from {}", server); - return; - } + @Override + public void response(Response response) { + // process the response here + AuditData respData = + AuditData.decode(response.readEntity(byte[].class)); + if (respData == null) { + logger.error("TargetLock.Audit.send: " + + "couldn't process response from {}", + server); + return; + } - // wait a few seconds, and see if we still know of these - // errors - logger.info("TargetLock.Audit.send: mismatches from {}", - server); - try { - Thread.sleep(auditRetryDelay); - } catch (InterruptedException e) { - logger.error("TargetLock.Audit.send: Interrupted " - + "handling audit response from {}", - server); - // just abort - Thread.currentThread().interrupt(); - return; - } + // if we reach this point, we got a response + if (respData.clientData.isEmpty() + && respData.serverData.isEmpty()) { + // no mismatches + logger.info("TargetLock.Audit.send: " + + "no errors from {}", server); + return; + } - // This will check against our own data -- any mismatches - // mean that things have changed since we sent out the - // first message. We will remove any mismatches from - // 'respData', and see if there are any left. - AuditData mismatches = respData.generateResponse(false); - - respData.serverData.removeAll(mismatches.clientData); - respData.clientData.removeAll(mismatches.serverData); - - if (respData.clientData.isEmpty() - && respData.serverData.isEmpty()) { - // no mismatches -- - // there must have been transient issues on our side - logger.info("TargetLock.Audit.send: no mismatches from " - + "{} after retry", server); - return; - } + // wait a few seconds, and see if we still know of these + // errors + if (responseSupport(respData, server, "AuditPostResponse.response")) { + // a return falue of 'true' either indicates the mismatches + // were resolved after a retry, or we received an interrupt, + // and need to abort + return; + } - // any mismatches left in 'respData' are still there -- - // hopefully, they are transient issues on the other side - AuditData auditData = new AuditData(); - auditData.clientData = respData.serverData; - auditData.serverData = respData.clientData; - - // serialize - byte[] encodedData = auditData.encode(); - if (encodedData == null) { - // error has already been displayed - return; - } + // any mismatches left in 'respData' are still there -- + // hopefully, they are transient issues on the other side + AuditData auditData = new AuditData(); + auditData.clientData = respData.serverData; + auditData.serverData = respData.clientData; - // generate entity - Entity<String> entity = - Entity.entity(new String(encodedData), - MediaType.APPLICATION_OCTET_STREAM_TYPE); - - // send new list to other end - response = server - .getWebTarget("lock/audit") - .queryParam("server", server.getUuid().toString()) - .queryParam("ttl", timeToLive) - .request().post(entity); - - respData = AuditData.decode(response.readEntity(byte[].class)); - if (respData == null) { - logger.error("TargetLock.auditDataBuilder.send: " - + "couldn't process response from {}", - server); - return; - } + // serialize + byte[] encodedData = auditData.encode(); + if (encodedData == null) { + // error has already been displayed + return; + } - // if there are mismatches left, they are presumably real - respData.processResponse(server); - } - }); + // generate entity + Entity<String> entity = + Entity.entity(new String(encodedData), + MediaType.APPLICATION_OCTET_STREAM_TYPE); + + // send new list to other end + response = server + .getWebTarget("lock/audit") + .queryParam(QP_SERVER, server.getUuid().toString()) + .queryParam(QP_TTL, timeToLive) + .request().post(entity); + + respData = AuditData.decode(response.readEntity(byte[].class)); + if (respData == null) { + logger.error("TargetLock.auditDataBuilder.send: " + + "couldn't process response from {}", + server); + return; + } + + // if there are mismatches left, they are presumably real + respData.processResponse(server); + } + + // Handle mismatches indicated by an audit response -- a return value of + // 'true' indicates that there were no mismatches after a retry, or + // we received an interrupt. In either case, the caller returns. + private static boolean responseSupport(AuditData respData, Object serverString, String caller) { + logger.info("{}: mismatches from {}", caller, serverString); + try { + Thread.sleep(auditRetryDelay); + } catch (InterruptedException e) { + logger.error("{}: Interrupted handling audit response from {}", + caller, serverString); + // just abort + Thread.currentThread().interrupt(); + return true; + } + + // This will check against our own data -- any mismatches + // mean that things have changed since we sent out the + // first message. We will remove any mismatches from + // 'respData', and see if there are any left. + AuditData mismatches = respData.generateResponse(false); + + respData.serverData.removeAll(mismatches.clientData); + respData.clientData.removeAll(mismatches.serverData); + + if (respData.clientData.isEmpty() + && respData.serverData.isEmpty()) { + // no mismatches -- + // there must have been transient issues on our side + logger.info("{}: no mismatches from {} after retry", + caller, serverString); + return true; } + + return false; } } } |