diff options
Diffstat (limited to 'feature-server-pool/src/main/java')
8 files changed, 473 insertions, 345 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java index b82f2e1d..6241a297 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java @@ -360,63 +360,31 @@ public class Bucket { case OWNER_NULL: // <OWNER_NULL> -- owner UUID should be set to 'null' - if (bucket.getOwner() != null) { - logger.info("Bucket {} owner: {}->null", - index, bucket.getOwner()); - bucketChanges = true; - synchronized (bucket) { - bucket.setOwner(null); - bucket.setState(null); - } - } + bucketChanges = nullifyOwner(index, bucket, bucketChanges); break; case PRIMARY_BACKUP_UPDATE: // <PRIMARY_BACKUP_UPDATE> <primary-backup-uuid> -- // primary backup UUID specified - Server newPrimaryBackup = - Server.getServer(Util.readUuid(dis)); - if (bucket.primaryBackup != newPrimaryBackup) { - logger.info("Bucket {} primary backup: {}->{}", index, - bucket.primaryBackup, newPrimaryBackup); - bucketChanges = true; - bucket.primaryBackup = newPrimaryBackup; - } + bucketChanges = updatePrimaryBackup(dis, index, bucket, bucketChanges); break; case PRIMARY_BACKUP_NULL: // <PRIMARY_BACKUP_NULL> -- // primary backup should be set to 'null' - if (bucket.primaryBackup != null) { - logger.info("Bucket {} primary backup: {}->null", - index, bucket.primaryBackup); - bucketChanges = true; - bucket.primaryBackup = null; - } + bucketChanges = nullifyPrimaryBackup(index, bucket, bucketChanges); break; case SECONDARY_BACKUP_UPDATE: // <SECONDARY_BACKUP_UPDATE> <secondary-backup-uuid> -- // secondary backup UUID specified - Server newSecondaryBackup = - Server.getServer(Util.readUuid(dis)); - if (bucket.secondaryBackup != newSecondaryBackup) { - logger.info("Bucket {} secondary backup: {}->{}", index, - bucket.secondaryBackup, newSecondaryBackup); - bucketChanges = true; - bucket.secondaryBackup = newSecondaryBackup; - } + bucketChanges = updateSecondaryBackup(dis, index, bucket, bucketChanges); break; case SECONDARY_BACKUP_NULL: // <SECONDARY_BACKUP_NULL> -- // secondary backup should be set to 'null' - if (bucket.secondaryBackup != null) { - logger.info("Bucket {} secondary backup: {}->null", - index, bucket.secondaryBackup); - bucketChanges = true; - bucket.secondaryBackup = null; - } + bucketChanges = nullifySecondaryBackup(index, bucket, bucketChanges); break; default: @@ -433,6 +401,65 @@ public class Bucket { return changes; } + private static boolean nullifyOwner(int index, Bucket bucket, boolean bucketChanges) { + if (bucket.getOwner() != null) { + logger.info("Bucket {} owner: {}->null", + index, bucket.getOwner()); + bucketChanges = true; + synchronized (bucket) { + bucket.setOwner(null); + bucket.setState(null); + } + } + return bucketChanges; + } + + private static boolean updatePrimaryBackup(DataInputStream dis, int index, Bucket bucket, boolean bucketChanges) + throws IOException { + Server newPrimaryBackup = + Server.getServer(Util.readUuid(dis)); + if (bucket.primaryBackup != newPrimaryBackup) { + logger.info("Bucket {} primary backup: {}->{}", index, + bucket.primaryBackup, newPrimaryBackup); + bucketChanges = true; + bucket.primaryBackup = newPrimaryBackup; + } + return bucketChanges; + } + + private static boolean nullifyPrimaryBackup(int index, Bucket bucket, boolean bucketChanges) { + if (bucket.primaryBackup != null) { + logger.info("Bucket {} primary backup: {}->null", + index, bucket.primaryBackup); + bucketChanges = true; + bucket.primaryBackup = null; + } + return bucketChanges; + } + + private static boolean updateSecondaryBackup(DataInputStream dis, int index, Bucket bucket, boolean bucketChanges) + throws IOException { + Server newSecondaryBackup = + Server.getServer(Util.readUuid(dis)); + if (bucket.secondaryBackup != newSecondaryBackup) { + logger.info("Bucket {} secondary backup: {}->{}", index, + bucket.secondaryBackup, newSecondaryBackup); + bucketChanges = true; + bucket.secondaryBackup = newSecondaryBackup; + } + return bucketChanges; + } + + private static boolean nullifySecondaryBackup(int index, Bucket bucket, boolean bucketChanges) { + if (bucket.secondaryBackup != null) { + logger.info("Bucket {} secondary backup: {}->null", + index, bucket.secondaryBackup); + bucketChanges = true; + bucket.secondaryBackup = null; + } + return bucketChanges; + } + /** * Update bucket owner information. * @@ -768,19 +795,7 @@ public class Bucket { out.println("Moving bucket " + bucketNumber + " from " + oldHost + " to " + newHost); - /* - * update the owner, and ensure that the primary and secondary backup - * remain different from the owner. - */ - bucket.setOwner(newHost); - if (newHost == bucket.primaryBackup) { - out.println("Moving primary back from " + newHost + " to " + oldHost); - bucket.setPrimaryBackup(oldHost); - } else if (newHost == bucket.secondaryBackup) { - out.println("Moving secondary back from " + newHost - + " to " + oldHost); - bucket.setSecondaryBackup(oldHost); - } + updateOwner(out, bucket, oldHost, newHost); try { /* @@ -796,6 +811,22 @@ public class Bucket { } } + private static void updateOwner(PrintStream out, TestBucket bucket, TestServer oldHost, TestServer newHost) { + /* + * update the owner, and ensure that the primary and secondary backup + * remain different from the owner. + */ + bucket.setOwner(newHost); + if (newHost == bucket.primaryBackup) { + out.println("Moving primary back from " + newHost + " to " + oldHost); + bucket.setPrimaryBackup(oldHost); + } else if (newHost == bucket.secondaryBackup) { + out.println("Moving secondary back from " + newHost + + " to " + oldHost); + bucket.setSecondaryBackup(oldHost); + } + } + /** * This method is called when an incoming /bucket/sessionData message is * received from the old owner of the bucket, which presumably means that @@ -1485,6 +1516,10 @@ public class Bucket { return; } + makeTestBucket(bucketSnapshot); + } + + private void makeTestBucket(final Bucket[] bucketSnapshot) { /* * Now, create a 'TestBucket' table that mirrors the 'Bucket' table. * Unlike the standard 'Bucket' and 'Server' tables, the 'TestServer' @@ -1796,28 +1831,13 @@ public class Bucket { dos.writeShort(i); // 'owner' field - if (newOwner != null) { - dos.writeByte(OWNER_UPDATE); - Util.writeUuid(dos, newOwner); - } else { - dos.writeByte(OWNER_NULL); - } + writeOwner(dos, newOwner); // 'primaryBackup' field - if (newPrimary != null) { - dos.writeByte(PRIMARY_BACKUP_UPDATE); - Util.writeUuid(dos, newPrimary); - } else { - dos.writeByte(PRIMARY_BACKUP_NULL); - } + writePrimary(dos, newPrimary); // 'secondaryBackup' field - if (newSecondary != null) { - dos.writeByte(SECONDARY_BACKUP_UPDATE); - Util.writeUuid(dos, newSecondary); - } else { - dos.writeByte(SECONDARY_BACKUP_NULL); - } + writeSecondary(dos, newSecondary); dos.writeByte(END_OF_PARAMETERS_TAG); } @@ -1851,6 +1871,33 @@ public class Bucket { MainLoop.queueWork(task); } + private void writeOwner(DataOutputStream dos, UUID newOwner) throws IOException { + if (newOwner != null) { + dos.writeByte(OWNER_UPDATE); + Util.writeUuid(dos, newOwner); + } else { + dos.writeByte(OWNER_NULL); + } + } + + private void writePrimary(DataOutputStream dos, UUID newPrimary) throws IOException { + if (newPrimary != null) { + dos.writeByte(PRIMARY_BACKUP_UPDATE); + Util.writeUuid(dos, newPrimary); + } else { + dos.writeByte(PRIMARY_BACKUP_NULL); + } + } + + private void writeSecondary(DataOutputStream dos, UUID newSecondary) throws IOException { + if (newSecondary != null) { + dos.writeByte(SECONDARY_BACKUP_UPDATE); + Util.writeUuid(dos, newSecondary); + } else { + dos.writeByte(SECONDARY_BACKUP_NULL); + } + } + /** * Supports the '/cmd/dumpBuckets' REST message -- this isn't part of * a 'rebalance' operation, but it turned out to be a convenient way @@ -2304,22 +2351,7 @@ public class Bucket { message = messages.poll(); if (message == null) { // no messages left - if (state == this) { - if (owner == Server.getThisServer()) { - // we can now exit the state - state = null; - stateChanged(); - } else { - /* - * We need a grace period before we can - * remove the 'state' value (this can happen - * if we receive and process the bulk data - * before receiving official confirmation - * that we are owner of the bucket. - */ - messages = null; - } - } + noMoreMessages(); break; } } @@ -2330,25 +2362,48 @@ public class Bucket { } if (messages == null) { // this indicates we need to enter a grace period before cleanup, - try { - logger.info("{}: entering grace period before terminating", - this); - Thread.sleep(unconfirmedGracePeriod); - } catch (InterruptedException e) { - // we are exiting in any case - Thread.currentThread().interrupt(); - } finally { - synchronized (Bucket.this) { - // Do we need to confirm that we really are the owner? - // What does it mean if we are not? - if (state == this) { - state = null; - stateChanged(); - } + sleepBeforeCleanup(); + } + logger.info("{}: exiting cleanup state", this); + } + + private void noMoreMessages() { + if (state == this) { + if (owner == Server.getThisServer()) { + // we can now exit the state + state = null; + stateChanged(); + } else { + /* + * We need a grace period before we can + * remove the 'state' value (this can happen + * if we receive and process the bulk data + * before receiving official confirmation + * that we are owner of the bucket. + */ + messages = null; + } + } + } + + private void sleepBeforeCleanup() { + try { + logger.info("{}: entering grace period before terminating", + this); + Thread.sleep(unconfirmedGracePeriod); + } catch (InterruptedException e) { + // we are exiting in any case + Thread.currentThread().interrupt(); + } finally { + synchronized (Bucket.this) { + // Do we need to confirm that we really are the owner? + // What does it mean if we are not? + if (state == this) { + state = null; + stateChanged(); } } } - logger.info("{}: exiting cleanup state", this); } /** diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java index 2d643a34..a53fb4d1 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java @@ -180,14 +180,14 @@ public class Discovery implements TopicListener { * same format base64-encoded message that 'Server' * instances periodically exchange */ - LinkedHashMap<String, String> map = new LinkedHashMap<>(); try { - map = coder.decode(event, LinkedHashMap.class); + @SuppressWarnings("unchecked") + LinkedHashMap<String, String> map = coder.decode(event, LinkedHashMap.class); String message = map.get("pingData"); Server.adminRequest(message.getBytes(StandardCharsets.UTF_8)); logger.info("Received a message, server count={}", Server.getServerCount()); } catch (CoderException e) { - logger.error("Can't decode message: {}", e); + logger.error("Can't decode message", e); } } @@ -332,6 +332,7 @@ public class Discovery implements TopicListener { publisher.send(jsonString); } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); logger.error("Exception in Discovery.Publisher.run():", e); return; } catch (Exception e) { @@ -340,6 +341,7 @@ public class Discovery implements TopicListener { try { Thread.sleep(15000); } catch (InterruptedException e2) { + Thread.currentThread().interrupt(); logger.error("Discovery.Publisher sleep interrupted"); } return; diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java index dd1c7c32..5f93d2a3 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java @@ -168,17 +168,25 @@ public class FeatureServerPool logger.info("Starting FeatureServerPool"); Server.startup(CONFIG_FILE); TargetLock.startup(); - droolsTimeoutMillis = - getProperty(BUCKET_DROOLS_TIMEOUT, DEFAULT_BUCKET_DROOLS_TIMEOUT); + setDroolsTimeoutMillis( + getProperty(BUCKET_DROOLS_TIMEOUT, DEFAULT_BUCKET_DROOLS_TIMEOUT)); int intTimeToLive = getProperty(BUCKET_TIME_TO_LIVE, DEFAULT_BUCKET_TIME_TO_LIVE); - timeToLiveSecond = String.valueOf(intTimeToLive); + setTimeToLiveSecond(String.valueOf(intTimeToLive)); buildKeywordTable(); Bucket.Backup.register(new DroolsSessionBackup()); Bucket.Backup.register(new TargetLock.LockBackup()); return false; } + private static void setDroolsTimeoutMillis(long timeoutMs) { + droolsTimeoutMillis = timeoutMs; + } + + private static void setTimeToLiveSecond(String ttlSec) { + timeToLiveSecond = ttlSec; + } + /** * {@inheritDoc} */ @@ -319,7 +327,7 @@ public class FeatureServerPool path = Arrays.copyOf(path, path.length); path[path.length - 1] = fieldName; } - keyword = sco.getString(path); + keyword = sco.getString((Object[]) path); if (keyword != null) { if (conversionFunctionName != null) { @@ -554,21 +562,11 @@ public class FeatureServerPool continue; } - int beginIndex = begin.length(); - int endIndex = name.length() - end.length(); - if (beginIndex < endIndex) { - // <topic> is specified, so this table is limited to this - // specific topic - topic = name.substring(beginIndex, endIndex); - } + topic = detmTopic(name, begin, end); // now, process the value // Example: requestID,CommonHeader.RequestID - String[] commaSeparatedEntries = prop.getProperty(name).split(","); - String[][] paths = new String[commaSeparatedEntries.length][]; - for (int i = 0; i < commaSeparatedEntries.length; i += 1) { - paths[i] = commaSeparatedEntries[i].split("\\."); - } + String[][] paths = splitPaths(prop, name); if (topic == null) { // these paths are used for any topics not explicitly @@ -581,6 +579,28 @@ public class FeatureServerPool } } + private static String detmTopic(String name, String begin, String end) { + int beginIndex = begin.length(); + int endIndex = name.length() - end.length(); + if (beginIndex < endIndex) { + // <topic> is specified, so this table is limited to this + // specific topic + return name.substring(beginIndex, endIndex); + } + + return null; + } + + private static String[][] splitPaths(Properties prop, String name) { + String[] commaSeparatedEntries = prop.getProperty(name).split(","); + String[][] paths = new String[commaSeparatedEntries.length][]; + for (int i = 0; i < commaSeparatedEntries.length; i += 1) { + paths[i] = commaSeparatedEntries[i].split("\\."); + } + + return paths; + } + /*======================================*/ /* 'DroolsPdpStateControlApi' interface */ /*======================================*/ @@ -993,8 +1013,10 @@ public class FeatureServerPool } }; kieSession.insert(doRestore); + ois.close(); return sessionLatch; } else { + ois.close(); logger.error("{}: Invalid session data for session={}, type={}", this, session.getFullName(), obj.getClass().getName()); } diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java index c9f4c782..0059a4b9 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java @@ -26,7 +26,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; +import java.util.function.UnaryOperator; import lombok.AllArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -183,33 +183,26 @@ public class Keyword { * and all the way up the superclass chain. */ private static Class<?> buildReflectiveLookupFindKeyClass(Class<?> clazz) { - Class<?> keyClass = null; for (Class<?> cl = clazz; cl != null; cl = cl.getSuperclass()) { if (classNameToSequence.containsKey(cl.getName())) { // matches the class - keyClass = cl; - break; + return cl; } for (Class<?> intf : cl.getInterfaces()) { if (classNameToSequence.containsKey(intf.getName())) { // matches one of the interfaces - keyClass = intf; - break; + return intf; } // interface can have superclass for (Class<?> cla = clazz; cla != null; cla = intf.getSuperclass()) { if (classNameToSequence.containsKey(cla.getName())) { // matches the class - keyClass = cla; - break; + return cla; } } } - if (keyClass != null) { - break; - } } - return keyClass; + return null; } private static Lookup buildReflectiveLookupBuild(Class<?> clazz, Class<?> keyClass) { @@ -438,7 +431,7 @@ public class Keyword { */ // used to lookup optional conversion functions - private static Map<String, Function<String, String>> conversionFunction = + private static Map<String, UnaryOperator<String>> conversionFunction = new ConcurrentHashMap<>(); // conversion function 'uuid': @@ -459,7 +452,7 @@ public class Keyword { * @param name the conversion function name * @param function the object that does the transformation */ - public static void addConversionFunction(String name, Function<String, String> function) { + public static void addConversionFunction(String name, UnaryOperator<String> function) { conversionFunction.put(name, function); } @@ -478,7 +471,7 @@ public class Keyword { } // look up the function - Function<String, String> function = conversionFunction.get(functionName); + UnaryOperator<String> function = conversionFunction.get(functionName); if (function == null) { logger.error("{}: conversion function not found", functionName); return null; diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java index 96b6598b..1a596455 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java @@ -41,6 +41,7 @@ import java.util.UUID; import javax.ws.rs.client.Entity; import javax.ws.rs.core.MediaType; import lombok.EqualsAndHashCode; +import lombok.Setter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,9 +65,11 @@ class Leader { } // Server currently in the leader roll + @Setter private static Server leaderLocal = null; // Vote state machine -- it is null, unless a vote is in progress + @Setter private static VoteCycle voteCycle = null; private static UUID emptyUUID = new UUID(0L, 0L); @@ -172,7 +175,7 @@ class Leader { if (server == leaderLocal) { // the lead server has failed -- // start/restart the VoteCycle state machine - leaderLocal = null; + setLeaderLocal(null); startVoting(); // send out a notification that the lead server has failed @@ -296,7 +299,7 @@ class Leader { // 5 second grace period has passed -- the leader is one with // the most votes, which is the first entry in 'voteData' Server oldLeader = leaderLocal; - leaderLocal = Server.getServer(voteData.first().uuid); + setLeaderLocal(Server.getServer(voteData.first().uuid)); if (leaderLocal != oldLeader) { // the leader has changed -- send out notifications for (Events listener : Events.getListeners()) { @@ -319,7 +322,7 @@ class Leader { // we are done with voting -- clean up, and report results MainLoop.removeBackgroundWork(this); - voteCycle = null; + setVoteCycle(null); ByteArrayOutputStream bos = new ByteArrayOutputStream(); PrintStream out = new PrintStream(bos); @@ -332,6 +335,11 @@ class Leader { out.format(format, "UUID", "Votes", "Voter(s)"); out.format(format, "----", "-----", "--------"); + outputVotes(out, format); + logger.info("Output - {}", bos); + } + + private void outputVotes(PrintStream out, String format) { for (VoteData vote : voteData) { if (vote.voters.isEmpty()) { out.format(format, vote.uuid, 0, ""); @@ -348,7 +356,6 @@ class Leader { } } } - logger.info("Output - {}", bos); } /** diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java index d310805a..80b5891a 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java @@ -81,6 +81,7 @@ import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import lombok.Setter; import org.glassfish.jersey.client.ClientProperties; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.http.client.HttpClient; @@ -105,6 +106,7 @@ public class Server implements Comparable<Server> { new TreeMap<>(Util.uuidComparator); // subset of servers to be notified (null means it needs to be rebuilt) + @Setter private static LinkedList<Server> notifyList = null; // data to be sent out to notify list @@ -320,7 +322,6 @@ public class Server implements Comparable<Server> { InetAddress address = InetAddress.getByName(ipAddressString); InetSocketAddress socketAddress = new InetSocketAddress(address, port); - possibleError = "HTTP server initialization error"; restServer = HttpServletServerFactoryInstance.getServerFactory().build( "SERVER-POOL", // name useHttps, // https @@ -342,7 +343,6 @@ public class Server implements Comparable<Server> { } // we may not know the port until after the server is started - possibleError = "HTTP server start error"; restServer.start(); possibleError = null; @@ -625,7 +625,7 @@ public class Server implements Comparable<Server> { updatedList.add(this); // notify list will need to be rebuilt - notifyList = null; + setNotifyList(null); if (socketAddress != null && this != thisServer) { // initialize 'client' and 'target' fields @@ -769,60 +769,63 @@ public class Server implements Comparable<Server> { // The 'notifyList' value is initially 'null', and it is reset to 'null' // every time a new host joins, or one leaves. That way, it is marked for // recalculation, but only when needed. - if (notifyList == null) { - // next index we are looking for - int dest = 1; - - // our current position in the Server table -- starting at 'thisServer' - UUID current = thisServer.uuid; - - // site socket address of 'current' - InetSocketAddress thisSiteSocketAddress = thisServer.siteSocketAddress; - - // hash set of all site socket addresses located - HashSet<InetSocketAddress> siteSocketAddresses = new HashSet<>(); - siteSocketAddresses.add(thisSiteSocketAddress); - - // the list we are building - notifyList = new LinkedList<>(); - - int index = 1; - for ( ; ; ) { - // move to the next key (UUID) -- if we hit the end of the table, - // wrap to the beginning - current = servers.higherKey(current); - if (current == null) { - current = servers.firstKey(); - } - if (current.equals(thisServer.uuid)) { - // we have looped through the entire list - break; - } + if (notifyList != null) { + return notifyList; + } - // fetch associated server & site socket address - Server server = servers.get(current); - InetSocketAddress currentSiteSocketAddress = - server.siteSocketAddress; - - if (Objects.equals(thisSiteSocketAddress, - currentSiteSocketAddress)) { - // same site -- see if we should add this one - if (index == dest) { - // this is the next index we are looking for -- - // add the server - notifyList.add(server); - - // advance to the next offset (current-offset * 2) - dest = dest << 1; - } - index += 1; - } else if (!siteSocketAddresses.contains(currentSiteSocketAddress)) { - // we need at least one member from each site + // next index we are looking for + int dest = 1; + + // our current position in the Server table -- starting at 'thisServer' + UUID current = thisServer.uuid; + + // site socket address of 'current' + InetSocketAddress thisSiteSocketAddress = thisServer.siteSocketAddress; + + // hash set of all site socket addresses located + HashSet<InetSocketAddress> siteSocketAddresses = new HashSet<>(); + siteSocketAddresses.add(thisSiteSocketAddress); + + // the list we are building + notifyList = new LinkedList<>(); + + int index = 1; + for ( ; ; ) { + // move to the next key (UUID) -- if we hit the end of the table, + // wrap to the beginning + current = servers.higherKey(current); + if (current == null) { + current = servers.firstKey(); + } + if (current.equals(thisServer.uuid)) { + // we have looped through the entire list + break; + } + + // fetch associated server & site socket address + Server server = servers.get(current); + InetSocketAddress currentSiteSocketAddress = + server.siteSocketAddress; + + if (Objects.equals(thisSiteSocketAddress, + currentSiteSocketAddress)) { + // same site -- see if we should add this one + if (index == dest) { + // this is the next index we are looking for -- + // add the server notifyList.add(server); - siteSocketAddresses.add(currentSiteSocketAddress); + + // advance to the next offset (current-offset * 2) + dest = dest << 1; } + index += 1; + } else if (!siteSocketAddresses.contains(currentSiteSocketAddress)) { + // we need at least one member from each site + notifyList.add(server); + siteSocketAddresses.add(currentSiteSocketAddress); } } + return notifyList; } @@ -932,27 +935,8 @@ public class Server implements Comparable<Server> { * This method is running within the 'MainLoop' thread. */ try { - WebTarget webTarget = target.path(path); - if (responseCallback != null) { - // give callback a chance to modify 'WebTarget' - webTarget = responseCallback.webTarget(webTarget); + invokeWebTarget(path, entity, responseCallback); - // send the response to the callback - Response response; - if (entity == null) { - response = webTarget.request().get(); - } else { - response = webTarget.request().post(entity); - } - responseCallback.response(response); - } else { - // just do the invoke, and ignore the response - if (entity == null) { - webTarget.request().get(); - } else { - webTarget.request().post(entity); - } - } } catch (Exception e) { logger.error("Failed to send to {} ({}, {})", uuid, destSocketAddress, destName); @@ -968,6 +952,30 @@ public class Server implements Comparable<Server> { }); } + private void invokeWebTarget(final String path, final Entity<?> entity, PostResponse responseCallback) { + WebTarget webTarget = target.path(path); + if (responseCallback != null) { + // give callback a chance to modify 'WebTarget' + webTarget = responseCallback.webTarget(webTarget); + + // send the response to the callback + Response response; + if (entity == null) { + response = webTarget.request().get(); + } else { + response = webTarget.request().post(entity); + } + responseCallback.response(response); + } else { + // just do the invoke, and ignore the response + if (entity == null) { + webTarget.request().get(); + } else { + webTarget.request().post(entity); + } + } + } + /** * This method may be invoked from any thread. * diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java index 1637e9ef..470801e2 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java @@ -83,6 +83,8 @@ import org.slf4j.LoggerFactory; * owned by the host containing the entry. */ public class TargetLock implements Lock, Serializable { + private static final String LOCK_MSG = "(key={},owner={},uuid={},ttl={})"; + private static final long serialVersionUID = 1L; private static Logger logger = LoggerFactory.getLogger(TargetLock.class); @@ -550,7 +552,7 @@ public class TargetLock implements Lock, Serializable { WebTarget webTarget = server.getWebTarget("lock/free"); if (webTarget != null) { logger.warn("Forwarding 'lock/free' to uuid {} " - + "(key={},owner={},uuid={},ttl={})", + + LOCK_MSG, server.getUuid(), key, ownerKey, uuid, ttl); return webTarget .queryParam(QP_KEY, key) @@ -565,7 +567,7 @@ public class TargetLock implements Lock, Serializable { // if we reach this point, we didn't forward for some reason -- // return failure by indicating it is locked and unavailable logger.error("Couldn't forward 'lock/free' " - + "(key={},owner={},uuid={},ttl={})", + + LOCK_MSG, key, ownerKey, uuid, ttl); return null; } @@ -599,7 +601,7 @@ public class TargetLock implements Lock, Serializable { WebTarget webTarget = server.getWebTarget("lock/locked"); if (webTarget != null) { logger.warn("Forwarding 'lock/locked' to uuid {} " - + "(key={},owner={},uuid={},ttl={})", + + LOCK_MSG, server.getUuid(), key, ownerKey, uuid, ttl); return webTarget .queryParam(QP_KEY, key) @@ -614,42 +616,15 @@ public class TargetLock implements Lock, Serializable { // if we reach this point, we didn't forward for some reason -- // return failure by indicating it is locked and unavailable logger.error("Couldn't forward 'lock/locked' " - + "(key={},owner={},uuid={},ttl={})", + + LOCK_MSG, key, ownerKey, uuid, ttl); return Response.noContent().status(LOCKED).build(); } - TargetLock targetLock = null; + TargetLock targetLock; LocalLocks localLocks = LocalLocks.get(ownerKey); synchronized (localLocks) { - WeakReference<TargetLock> wr = - localLocks.uuidToWeakReference.get(uuid); - - if (wr != null) { - targetLock = wr.get(); - if (targetLock == null) { - // lock has been abandoned - // (AbandonedHandler should usually find this first) - localLocks.weakReferenceToIdentity.remove(wr); - localLocks.uuidToWeakReference.remove(uuid); - } else { - // the lock has been made available -- update the state - // TBD: This could be outside of 'synchronized (localLocks)' - synchronized (targetLock) { - if (targetLock.state == State.WAITING) { - targetLock.state = State.ACTIVE; - } else { - // will return a failure -- not sure how this happened - logger.error("incomingLocked: {} is in state {}", - targetLock, targetLock.state); - targetLock = null; - } - } - } - } else { - // clean up what we can - localLocks.uuidToWeakReference.remove(uuid); - } + targetLock = grabLock(uuid, localLocks); } if (targetLock == null) { // We can't locate the target lock @@ -661,6 +636,39 @@ public class TargetLock implements Lock, Serializable { } } + private static TargetLock grabLock(UUID uuid, LocalLocks localLocks) { + WeakReference<TargetLock> wr = + localLocks.uuidToWeakReference.get(uuid); + + if (wr != null) { + TargetLock targetLock = wr.get(); + if (targetLock == null) { + // lock has been abandoned + // (AbandonedHandler should usually find this first) + localLocks.weakReferenceToIdentity.remove(wr); + localLocks.uuidToWeakReference.remove(uuid); + } else { + // the lock has been made available -- update the state + // TBD: This could be outside of 'synchronized (localLocks)' + synchronized (targetLock) { + if (targetLock.state == State.WAITING) { + targetLock.state = State.ACTIVE; + return targetLock; + } else { + // will return a failure -- not sure how this happened + logger.error("incomingLocked: {} is in state {}", + targetLock, targetLock.state); + } + } + } + } else { + // clean up what we can + localLocks.uuidToWeakReference.remove(uuid); + } + + return null; + } + /** * This is called when the state of a bucket has changed, but is currently * stable. Note that this method is called while being synchronized on the @@ -1778,6 +1786,12 @@ public class TargetLock implements Lock, Serializable { } } + dumpMergeData(out); + dumpServerTable(out); + dumpClientOnlyEntries(out); + } + + private void dumpMergeData(PrintStream out) { if (detail) { // generate format based upon the maximum key length, maximum // owner key length, and whether comments are included anywhere @@ -1801,9 +1815,6 @@ public class TargetLock implements Lock, Serializable { out.printf(format, "Key", "Owner Key", "UUID", "State", "Comments"); out.printf(format, "---", PRINTOUT_DASHES, "----", "-----", "--------"); } - - dumpServerTable(out); - dumpClientOnlyEntries(out); } private void dumpServerTable(PrintStream out) { @@ -2060,23 +2071,7 @@ public class TargetLock implements Lock, Serializable { clientDataList.add(clientData); synchronized (localLocks) { - for (WeakReference<TargetLock> wr : - localLocks.weakReferenceToIdentity.keySet()) { - // Note: 'targetLock' may be 'null' if it has - // been abandoned, and garbage collected - TargetLock targetLock = wr.get(); - - // fetch associated 'identity' - Identity identity = - localLocks.weakReferenceToIdentity.get(wr); - if (identity != null) { - // add a new 'ClientDataRecord' for this bucket - clientData.clientDataRecords.add( - new ClientDataRecord(identity, - (targetLock == null ? null : - targetLock.getState()))); - } - } + generateClientLockData(localLocks, clientData); } } @@ -2089,6 +2084,26 @@ public class TargetLock implements Lock, Serializable { } } } + + private void generateClientLockData(LocalLocks localLocks, ClientData clientData) { + for (WeakReference<TargetLock> wr : + localLocks.weakReferenceToIdentity.keySet()) { + // Note: 'targetLock' may be 'null' if it has + // been abandoned, and garbage collected + TargetLock targetLock = wr.get(); + + // fetch associated 'identity' + Identity identity = + localLocks.weakReferenceToIdentity.get(wr); + if (identity != null) { + // add a new 'ClientDataRecord' for this bucket + clientData.clientDataRecords.add( + new ClientDataRecord(identity, + (targetLock == null ? null : + targetLock.getState()))); + } + } + } } /** @@ -2223,28 +2238,8 @@ public class TargetLock implements Lock, Serializable { if (globalLocks != null) { Map<String, LockEntry> keyToEntry = globalLocks.keyToEntry; synchronized (keyToEntry) { - LockEntry le = keyToEntry.get(identity.key); - if (le != null) { - if (identity.uuid.equals(le.currentOwnerUuid) - && identity.ownerKey.equals(le.currentOwnerKey)) { - // we found a match - continue; - } - - // check the waiting list - boolean match = false; - for (Waiting waiting : le.waitingList) { - if (identity.uuid.equals(waiting.ownerUuid) - && identity.ownerKey.equals(waiting.ownerKey)) { - // we found a match on the waiting list - match = true; - break; - } - } - if (match) { - // there was a match on the waiting list - continue; - } + if (matchIdentity(keyToEntry, identity)) { + continue; } } } @@ -2265,6 +2260,28 @@ public class TargetLock implements Lock, Serializable { } } + private boolean matchIdentity(Map<String, LockEntry> keyToEntry, Identity identity) { + LockEntry le = keyToEntry.get(identity.key); + if (le != null) { + if (identity.uuid.equals(le.currentOwnerUuid) + && identity.ownerKey.equals(le.currentOwnerKey)) { + // we found a match + return true; + } + + // check the waiting list + for (Waiting waiting : le.waitingList) { + if (identity.uuid.equals(waiting.ownerUuid) + && identity.ownerKey.equals(waiting.ownerKey)) { + // we found a match on the waiting list + return true; + } + } + } + + return false; + } + private void generateResponseServerEnd(AuditData response, boolean includeWarnings) { for (Identity identity : serverData) { // remote end is the server, and we are the client @@ -2779,7 +2796,7 @@ public class TargetLock implements Lock, Serializable { // send new list to other end response = server - .getWebTarget("lock/audit") + .getWebTarget(LOCK_AUDIT) .queryParam(QP_SERVER, server.getUuid().toString()) .queryParam(QP_TTL, timeToLive) .request().post(entity); diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java index 48bd1c1a..e8121f3a 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java @@ -245,6 +245,11 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { servers.add(bucket.getPrimaryBackup()); servers.add(bucket.getSecondaryBackup()); } + sendLocksToBackupServers(bucketNumber, entity, count, servers); + } + + private static void sendLocksToBackupServers(final int bucketNumber, final Entity<String> entity, final int count, + Set<Server> servers) { for (final Server server : servers) { if (server != null) { // send out REST command @@ -336,25 +341,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { servers.add(bucket.getPrimaryBackup()); servers.add(bucket.getSecondaryBackup()); } - for (final Server server : servers) { - if (server != null) { - // send out REST command - server.getThreadPool().execute(() -> { - WebTarget webTarget = - server.getWebTarget("persistence/session"); - if (webTarget != null) { - webTarget - .queryParam(QP_BUCKET, - bucket.getIndex()) - .queryParam(QP_SESSION, - encodedSessionName) - .queryParam(QP_COUNT, count) - .queryParam(QP_DEST, server.getUuid()) - .request().post(entity); - } - }); - } - } + sendBucketToBackupServers(bucket, count, entity, servers); } } } catch (Exception e) { @@ -362,6 +349,30 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { } } + private void sendBucketToBackupServers(Bucket bucket, final int count, final Entity<String> entity, + Set<Server> servers) { + + for (final Server server : servers) { + if (server != null) { + // send out REST command + server.getThreadPool().execute(() -> { + WebTarget webTarget = + server.getWebTarget("persistence/session"); + if (webTarget != null) { + webTarget + .queryParam(QP_BUCKET, + bucket.getIndex()) + .queryParam(QP_SESSION, + encodedSessionName) + .queryParam(QP_COUNT, count) + .queryParam(QP_DEST, server.getUuid()) + .request().post(entity); + } + }); + } + } + } + /* ************************************** */ /* 'RuleRuntimeEventListener' interface */ /* ************************************** */ @@ -690,57 +701,20 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { String sessionName = entry.getKey(); ReceiverSessionBucketData rsbd = entry.getValue(); - // [0]="<groupId>" [1]="<artifactId>", [2]="<sessionName>" - String[] nameSegments = sessionName.split(":"); - PolicySession policySession = null; - - // locate the 'PolicyContainer' and 'PolicySession' - if (nameSegments.length == 3) { - // step through all 'PolicyContainer' instances looking - // for a matching 'artifactId' & 'groupId' - for (PolicyContainer pc : PolicyContainer.getPolicyContainers()) { - if (nameSegments[1].equals(pc.getArtifactId()) - && nameSegments[0].equals(pc.getGroupId())) { - // 'PolicyContainer' matches -- try to fetch the session - policySession = pc.getPolicySession(nameSegments[2]); - break; - } - } - } - + PolicySession policySession = detmPolicySession(sessionName); if (policySession == null) { logger.error(RESTORE_BUCKET_ERROR + "Can't find PolicySession{}", sessionName); continue; } - Object obj = null; - try { - // deserialization needs to use the correct 'ClassLoader' - obj = Util.deserialize(Base64.getDecoder().decode(rsbd.encodedSerializedData), - policySession.getPolicyContainer().getClassLoader()); - } catch (IOException | ClassNotFoundException | IllegalArgumentException e) { - logger.error(RESTORE_BUCKET_ERROR - + "Failed to read data for session '{}'", - sessionName, e); - - // can't decode -- skip this session - continue; - } - - if (!(obj instanceof Map)) { - logger.error(RESTORE_BUCKET_ERROR - + "Session '{}' data has class {}, expected 'Map'", - sessionName, obj.getClass().getName()); - - // wrong object type decoded -- skip this session + final Map<?, ?> droolsObjects = deserializeMap(sessionName, rsbd, policySession); + if (droolsObjects == null) { continue; } // if we reach this point, we have decoded the persistent data - final Map<?, ?> droolsObjects = (Map<?, ?>) obj; - // signal when restore is complete final CountDownLatch sessionLatch = new CountDownLatch(1); @@ -767,6 +741,56 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { return sessionLatches; } + private PolicySession detmPolicySession(String sessionName) { + // [0]="<groupId>" [1]="<artifactId>", [2]="<sessionName>" + String[] nameSegments = sessionName.split(":"); + + // locate the 'PolicyContainer' and 'PolicySession' + if (nameSegments.length == 3) { + // step through all 'PolicyContainer' instances looking + // for a matching 'artifactId' & 'groupId' + for (PolicyContainer pc : PolicyContainer.getPolicyContainers()) { + if (nameSegments[1].equals(pc.getArtifactId()) + && nameSegments[0].equals(pc.getGroupId())) { + // 'PolicyContainer' matches -- try to fetch the session + return pc.getPolicySession(nameSegments[2]); + } + } + } + return null; + } + + private Map<?, ?> deserializeMap(String sessionName, ReceiverSessionBucketData rsbd, + PolicySession policySession) { + Object obj; + + try { + // deserialization needs to use the correct 'ClassLoader' + obj = Util.deserialize(Base64.getDecoder().decode(rsbd.encodedSerializedData), + policySession.getPolicyContainer().getClassLoader()); + } catch (IOException | ClassNotFoundException | IllegalArgumentException e) { + logger.error(RESTORE_BUCKET_ERROR + + "Failed to read data for session '{}'", + sessionName, e); + + // can't decode -- skip this session + return null; + } + + if (!(obj instanceof Map)) { + logger.error(RESTORE_BUCKET_ERROR + + "Session '{}' data has class {}, expected 'Map'", + sessionName, obj.getClass().getName()); + + // wrong object type decoded -- skip this session + return null; + } + + // if we reach this point, we have decoded the persistent data + + return (Map<?, ?>) obj; + } + private void restoreBucketLocks(Bucket bucket) { if (lockData != null) { Object obj = null; |