diff options
author | Straubs, Ralph (rs8887) <rs8887@att.com> | 2020-05-08 05:35:27 -0500 |
---|---|---|
committer | Straubs, Ralph (rs8887) <rs8887@att.com> | 2020-05-15 06:17:29 -0500 |
commit | 36310ceb534c3c3fb10e963096bcf6065f686a1d (patch) | |
tree | 5f54a85c065c264815edbf221dc227c9c4f58828 /feature-server-pool/src/main/java/org/onap | |
parent | 3f5c5289f7a57745bbfc70298574733ce4df8a7e (diff) |
Reduce Sonar complaints in 'feature-server-pool'
Issue-ID: POLICY-2546
Change-Id: Ibddde78d705349b3c8510678f25dfce817b1a091
Signed-off-by: Straubs, Ralph (rs8887) <rs8887@att.com>
Diffstat (limited to 'feature-server-pool/src/main/java/org/onap')
12 files changed, 1149 insertions, 1098 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java index 2236506e..b949134f 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java @@ -174,7 +174,13 @@ public class Bucket { private volatile State state = null; // storage for additional data - private Map<Class<?>, Object> adjuncts = new HashMap<Class<?>, Object>(); + private Map<Class<?>, Object> adjuncts = new HashMap<>(); + + // HTTP query parameters + private static final String QP_BUCKET = "bucket"; + private static final String QP_KEYWORD = "keyword"; + private static final String QP_DEST = "dest"; + private static final String QP_TTL = "ttl"; // BACKUP data (only buckets for where we are the owner, or a backup) @@ -287,92 +293,6 @@ public class Bucket { } /** - * This method is called to start a 'rebalance' operation in a background - * thread, but it only does this on the lead server. Being balanced means - * the following: - * 1) Each server owns approximately the same number of buckets - * 2) If any server were to fail, and the designated primaries take over - * for all of that server's buckets, all remaining servers would still - * own approximately the same number of buckets. - * 3) If any two servers were to fail, and the designated primaries were - * to take over for the failed server's buckets (secondaries would take - * for buckets where the owner and primary are OOS), all remaining - * servers would still own approximately the same number of buckets. - * 4) Each server should have approximately the same number of - * (primary-backup + secondary-backup) buckets that it is responsible for. - * 5) The primary backup for each bucket must be on the same site as the - * owner, and the secondary backup must be on a different site. - */ - private static void rebalance() { - if (Leader.getLeader() == Server.getThisServer()) { - Rebalance rb = new Rebalance(); - synchronized (rebalanceLock) { - // the most recent 'Rebalance' instance is the only valid one - rebalance = rb; - } - - new Thread("BUCKET REBALANCER") { - @Override - public void run() { - /* - * copy bucket and host data, - * generating a temporary internal table. - */ - rb.copyData(); - - /* - * allocate owners for all buckets without an owner, - * and rebalance bucket owners, if necessary -- - * this takes card of item #1, above. - */ - rb.allocateBuckets(); - - /* - * make sure that primary backups always have the same site - * as the owner, and secondary backups always have a different - * site -- this takes care of #5, above. - */ - rb.checkSiteValues(); - - /* - * adjust primary backup lists to take care of item #2, above - * (taking #5 into account). - */ - rb.rebalancePrimaryBackups(); - - /* - * allocate secondary backups, and take care of items - * #3 and #4, above (taking #5 into account). - */ - rb.rebalanceSecondaryBackups(); - - try { - synchronized (rebalanceLock) { - /* - * if another 'Rebalance' instance has started in the - * mean time, don't do the update. - */ - if (rebalance == rb) { - /* - * build a message containing all of the updated bucket - * information, process it internally in this host - * (lead server), and send it out to others in the - * "notify list". - */ - rb.generateBucketMessage(); - rebalance = null; - } - } - } catch (IOException e) { - logger.error("Exception in Rebalance.generateBucketMessage", - e); - } - } - }.start(); - } - } - - /** * Handle an incoming /bucket/update REST message. * * @param data base64-encoded data, containing all bucket updates @@ -433,13 +353,12 @@ public class Bucket { int tag; while ((tag = dis.readUnsignedByte()) != END_OF_PARAMETERS_TAG) { switch (tag) { - case OWNER_UPDATE: { + case OWNER_UPDATE: // <OWNER_UPDATE> <owner-uuid> -- owner UUID specified bucketChanges = updateBucketInternalOwnerUpdate(bucket, dis, index); break; - } - case OWNER_NULL: { + case OWNER_NULL: // <OWNER_NULL> -- owner UUID should be set to 'null' if (bucket.getOwner() != null) { logger.info("Bucket {} owner: {}->null", @@ -451,9 +370,8 @@ public class Bucket { } } break; - } - case PRIMARY_BACKUP_UPDATE: { + case PRIMARY_BACKUP_UPDATE: // <PRIMARY_BACKUP_UPDATE> <primary-backup-uuid> -- // primary backup UUID specified Server newPrimaryBackup = @@ -465,9 +383,8 @@ public class Bucket { bucket.primaryBackup = newPrimaryBackup; } break; - } - case PRIMARY_BACKUP_NULL: { + case PRIMARY_BACKUP_NULL: // <PRIMARY_BACKUP_NULL> -- // primary backup should be set to 'null' if (bucket.primaryBackup != null) { @@ -477,9 +394,8 @@ public class Bucket { bucket.primaryBackup = null; } break; - } - case SECONDARY_BACKUP_UPDATE: { + case SECONDARY_BACKUP_UPDATE: // <SECONDARY_BACKUP_UPDATE> <secondary-backup-uuid> -- // secondary backup UUID specified Server newSecondaryBackup = @@ -491,9 +407,8 @@ public class Bucket { bucket.secondaryBackup = newSecondaryBackup; } break; - } - case SECONDARY_BACKUP_NULL: { + case SECONDARY_BACKUP_NULL: // <SECONDARY_BACKUP_NULL> -- // secondary backup should be set to 'null' if (bucket.secondaryBackup != null) { @@ -503,7 +418,6 @@ public class Bucket { bucket.secondaryBackup = null; } break; - } default: logger.error("Illegal tag: {}", tag); @@ -550,7 +464,6 @@ public class Bucket { bucket.state = bucket.new NewOwner(true, oldOwner); } else { // new owner has been confirmed - // orig bucket.state.newOwner(); bucket.state.newOwner(); } } @@ -685,22 +598,23 @@ public class Bucket { * selected bucket has no server assigned -- this should only be a * transient situation, until 'rebalance' is run. */ - out.println("Bucket is " + bucketNumber + ", which has no owner"); + out.format("Bucket is %d, which has no owner\n", bucketNumber); } else if (server == Server.getThisServer()) { /* * the selected bucket is associated with this particular server -- * no forwarding is needed. */ - out.println("Bucket is " + bucketNumber - + ", which is owned by this server: " + server.getUuid()); + out.format("Bucket is %d, which is owned by this server: %s\n", + bucketNumber, server.getUuid()); } else { /* * the selected bucket is assigned to a different server -- forward * the message. */ - out.println("Bucket is " + bucketNumber + ": sending from\n" - + " " + Server.getThisServer().getUuid() + " to \n" - + " " + server.getUuid()); + out.format("Bucket is %d: sending from\n" + + " %s to\n" + + " %s\n", + bucketNumber, Server.getThisServer().getUuid(), server.getUuid()); // do a POST call of /bucket/bucketResponse to the remoote server Entity<String> entity = @@ -723,8 +637,8 @@ public class Bucket { // we need to include the 'bucket' and 'keyword' parameters // in the POST that we are sending out return webTarget - .queryParam("bucket", bucketNumber) - .queryParam("keyword", keyword); + .queryParam(QP_BUCKET, bucketNumber) + .queryParam(QP_KEYWORD, keyword); } /** @@ -744,12 +658,12 @@ public class Bucket { if (response == null) { out.println("Timed out waiting for a response"); } else { - out.println("Received response code " + response.getStatus()); - out.println("Entity = " + response.readEntity(String.class)); + out.format("Received response code %s\nEntity = %s\n", + response.getStatus(), response.readEntity(String.class)); } } catch (InterruptedException e) { out.println(e); - throw new IOException(e); + Thread.currentThread().interrupt(); } } } @@ -905,7 +819,8 @@ public class Bucket { Server server; WebTarget webTarget; - if ((ttl -= 1) > 0 + ttl -= 1; + if (ttl > 0 && (server = Server.getServer(dest)) != null && (webTarget = server.getWebTarget("bucket/sessionData")) != null) { logger.info("Forwarding 'bucket/sessionData' to uuid {}", @@ -915,9 +830,9 @@ public class Bucket { MediaType.APPLICATION_OCTET_STREAM_TYPE); Response response = webTarget - .queryParam("bucket", bucketNumber) - .queryParam("dest", dest) - .queryParam("ttl", String.valueOf(ttl)) + .queryParam(QP_BUCKET, bucketNumber) + .queryParam(QP_DEST, dest) + .queryParam(QP_TTL, String.valueOf(ttl)) .request().post(entity); logger.info("/bucket/sessionData response code = {}", response.getStatus()); @@ -977,23 +892,16 @@ public class Bucket { * the 'newInstance' method is unable to create the adjunct) */ public <T> T getAdjunct(Class<T> clazz) { - synchronized (adjuncts) { - // look up the adjunct in the table - Object adj = adjuncts.get(clazz); - if (adj == null) { - // lookup failed -- create one - try { - // create the adjunct (may trigger an exception) - adj = clazz.newInstance(); - - // update the table - adjuncts.put(clazz, adj); - } catch (Exception e) { - logger.error("Can't create adjunct of {}", clazz, e); - } + Object adj = adjuncts.computeIfAbsent(clazz, key -> { + try { + // create the adjunct, if needed + return clazz.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + logger.error("Can't create adjunct of {}", clazz, e); + return null; } - return clazz.cast(adj); - } + }); + return clazz.cast(adj); } /** @@ -1019,7 +927,7 @@ public class Bucket { */ public Object putAdjunct(Object adj) { synchronized (adjuncts) { - Class clazz = adj.getClass(); + Class<?> clazz = adj.getClass(); return adjuncts.put(clazz, adj); } } @@ -1148,6 +1056,92 @@ public class Bucket { // trigger a rebalance (only happens if we are the lead server) rebalance(); } + + /** + * This method is called to start a 'rebalance' operation in a background + * thread, but it only does this on the lead server. Being balanced means + * the following: + * 1) Each server owns approximately the same number of buckets + * 2) If any server were to fail, and the designated primaries take over + * for all of that server's buckets, all remaining servers would still + * own approximately the same number of buckets. + * 3) If any two servers were to fail, and the designated primaries were + * to take over for the failed server's buckets (secondaries would take + * for buckets where the owner and primary are OOS), all remaining + * servers would still own approximately the same number of buckets. + * 4) Each server should have approximately the same number of + * (primary-backup + secondary-backup) buckets that it is responsible for. + * 5) The primary backup for each bucket must be on the same site as the + * owner, and the secondary backup must be on a different site. + */ + private void rebalance() { + if (Leader.getLeader() == Server.getThisServer()) { + Rebalance rb = new Rebalance(); + synchronized (rebalanceLock) { + // the most recent 'Rebalance' instance is the only valid one + rebalance = rb; + } + + new Thread("BUCKET REBALANCER") { + @Override + public void run() { + /* + * copy bucket and host data, + * generating a temporary internal table. + */ + rb.copyData(); + + /* + * allocate owners for all buckets without an owner, + * and rebalance bucket owners, if necessary -- + * this takes card of item #1, above. + */ + rb.allocateBuckets(); + + /* + * make sure that primary backups always have the same site + * as the owner, and secondary backups always have a different + * site -- this takes care of #5, above. + */ + rb.checkSiteValues(); + + /* + * adjust primary backup lists to take care of item #2, above + * (taking #5 into account). + */ + rb.rebalancePrimaryBackups(); + + /* + * allocate secondary backups, and take care of items + * #3 and #4, above (taking #5 into account). + */ + rb.rebalanceSecondaryBackups(); + + try { + synchronized (rebalanceLock) { + /* + * if another 'Rebalance' instance has started in the + * mean time, don't do the update. + */ + if (rebalance == rb) { + /* + * build a message containing all of the updated bucket + * information, process it internally in this host + * (lead server), and send it out to others in the + * "notify list". + */ + rb.generateBucketMessage(); + rebalance = null; + } + } + } catch (IOException e) { + logger.error("Exception in Rebalance.generateBucketMessage", + e); + } + } + }.start(); + } + } } /* ============================================================ */ @@ -1477,11 +1471,15 @@ public class Bucket { return 0; }; - FutureTask<Integer> ft = new FutureTask(callable); + FutureTask<Integer> ft = new FutureTask<>(callable); MainLoop.queueWork(ft); try { ft.get(60, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { + } catch (InterruptedException e) { + logger.error("Interrupted", e); + Thread.currentThread().interrupt(); + return; + } catch (ExecutionException | TimeoutException e) { logger.error("Exception in Rebalance.copyData", e); return; } @@ -1534,7 +1532,7 @@ public class Bucket { * 'needBuckets' TreeSet: those with the fewest buckets allocated are * at the head of the list. */ - Comparator<TestServer> bucketCount = new Comparator<TestServer>() { + Comparator<TestServer> bucketCount = new Comparator<>() { @Override public int compare(TestServer s1, TestServer s2) { int rval = s1.buckets.size() - s2.buckets.size(); @@ -1662,8 +1660,7 @@ public class Bucket { // populate a 'TreeSet' of 'AdjustedTestServer' instances based // the failure of 'failedServer' - TreeSet<AdjustedTestServer> adjustedTestServers = - new TreeSet<AdjustedTestServer>(); + TreeSet<AdjustedTestServer> adjustedTestServers = new TreeSet<>(); for (TestServer server : testServers.values()) { if (server == failedServer || !Objects.equals(siteSocketAddress, @@ -1943,7 +1940,7 @@ public class Bucket { int size = buckets.size(); if (size != 0) { // generate a linked list of the bucket data to display - LinkedList<String> data = new LinkedList<String>(); + LinkedList<String> data = new LinkedList<>(); StringBuilder sb = new StringBuilder(); int count = 8; @@ -1956,7 +1953,8 @@ public class Bucket { // add the bucket number sb.append(String.format("%4s", bucket.index)); - if ((count -= 1) <= 0) { + count -= 1; + if (count <= 0) { // filled up a row -- // add it to the list, and start a new line data.add(sb.toString()); @@ -2109,7 +2107,7 @@ public class Bucket { // when 'System.currentTimeMillis()' reaches this value, we time out long endTime; - // If not 'null', we are queueing messages for this bucket; + // If not 'null', we are queueing messages for this bucket // otherwise, we are sending them through. Queue<Message> messages = new ConcurrentLinkedQueue<>(); @@ -2287,66 +2285,70 @@ public class Bucket { } catch (Exception e) { logger.error("Exception in {}", this, e); } finally { - /* - * cleanly leave state -- we want to make sure that messages - * are processed in order, so the queue needs to remain until - * it is empty - */ - logger.info("{}: entering cleanup state", this); - for ( ; ; ) { - Message message = messages.poll(); - if (message == null) { - // no messages left, but this could change - synchronized (Bucket.this) { - message = messages.poll(); - if (message == null) { - // no messages left - if (state == this) { - if (owner == Server.getThisServer()) { - // we can now exit the state - state = null; - stateChanged(); - } else { - /* - * We need a grace period before we can - * remove the 'state' value (this can happen - * if we receive and process the bulk data - * before receiving official confirmation - * that we are owner of the bucket. - */ - messages = null; - } + run_cleanup(); + } + } + + private void run_cleanup() { + /* + * cleanly leave state -- we want to make sure that messages + * are processed in order, so the queue needs to remain until + * it is empty + */ + logger.info("{}: entering cleanup state", this); + for ( ; ; ) { + Message message = messages.poll(); + if (message == null) { + // no messages left, but this could change + synchronized (Bucket.this) { + message = messages.poll(); + if (message == null) { + // no messages left + if (state == this) { + if (owner == Server.getThisServer()) { + // we can now exit the state + state = null; + stateChanged(); + } else { + /* + * We need a grace period before we can + * remove the 'state' value (this can happen + * if we receive and process the bulk data + * before receiving official confirmation + * that we are owner of the bucket. + */ + messages = null; } - break; } + break; } } - // this doesn't work -- it ends up right back in the queue - // if 'messages' is defined - message.process(); } - if (messages == null) { - // this indicates we need to enter a grace period before cleanup, - try { - logger.info("{}: entering grace period before terminating", - this); - Thread.sleep(unconfirmedGracePeriod); - } catch (InterruptedException e) { - // we are exiting in any case - Thread.currentThread().interrupt(); - } finally { - synchronized (Bucket.this) { - // Do we need to confirm that we really are the owner? - // What does it mean if we are not? - if (state == this) { - state = null; - stateChanged(); - } + // this doesn't work -- it ends up right back in the queue + // if 'messages' is defined + message.process(); + } + if (messages == null) { + // this indicates we need to enter a grace period before cleanup, + try { + logger.info("{}: entering grace period before terminating", + this); + Thread.sleep(unconfirmedGracePeriod); + } catch (InterruptedException e) { + // we are exiting in any case + Thread.currentThread().interrupt(); + } finally { + synchronized (Bucket.this) { + // Do we need to confirm that we really are the owner? + // What does it mean if we are not? + if (state == this) { + state = null; + stateChanged(); } } } - logger.info("{}: exiting cleanup state", this); } + logger.info("{}: exiting cleanup state", this); } /** @@ -2357,31 +2359,32 @@ public class Bucket { public String toString() { return "Bucket.NewOwner(" + index + ")"; } - } - /** - * Restore bucket data. - * - * @param obj deserialized bucket data - */ - private void restoreBucketData(Object obj) { - if (obj instanceof List) { - for (Object entry : (List<?>)obj) { - if (entry instanceof Restore) { - // entry-specific 'restore' operation - ((Restore)entry).restore(this.index); - } else { - logger.error("{}: Expected '{}' but got '{}'", - this, Restore.class.getName(), - entry.getClass().getName()); + /** + * Restore bucket data. + * + * @param obj deserialized bucket data + */ + private void restoreBucketData(Object obj) { + if (obj instanceof List) { + for (Object entry : (List<?>)obj) { + if (entry instanceof Restore) { + // entry-specific 'restore' operation + ((Restore)entry).restore(Bucket.this.index); + } else { + logger.error("{}: Expected '{}' but got '{}'", + this, Restore.class.getName(), + entry.getClass().getName()); + } } + } else { + logger.error("{}: expected 'List' but got '{}'", + this, obj.getClass().getName()); } - } else { - logger.error("{}: expected 'List' but got '{}'", - this, obj.getClass().getName()); } } + /* ============================================================ */ /** @@ -2459,9 +2462,9 @@ public class Bucket { @Override public WebTarget webTarget(WebTarget webTarget) { return webTarget - .queryParam("bucket", index) - .queryParam("dest", newOwner.getUuid()) - .queryParam("ttl", timeToLive); + .queryParam(QP_BUCKET, index) + .queryParam(QP_DEST, newOwner.getUuid()) + .queryParam(QP_TTL, timeToLive); } @Override diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java index c507e97d..1d695a01 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java @@ -37,7 +37,6 @@ import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVER_PU import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty; import com.google.gson.Gson; -import com.google.gson.JsonObject; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java index 748a38f3..dfe211ce 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java @@ -26,9 +26,6 @@ import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUC import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUCKET_TIME_TO_LIVE; import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; - import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.Serializable; @@ -36,7 +33,6 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; -import java.util.Enumeration; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -53,8 +49,6 @@ import javax.ws.rs.core.Response; import lombok.AllArgsConstructor; -import org.drools.core.definitions.InternalKnowledgePackage; -import org.drools.core.impl.KnowledgeBaseImpl; import org.kie.api.runtime.KieSession; import org.kie.api.runtime.rule.FactHandle; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; @@ -75,7 +69,6 @@ import org.onap.policy.drools.system.PolicyControllerConstants; import org.onap.policy.drools.system.PolicyEngine; import org.onap.policy.drools.system.PolicyEngineConstants; import org.onap.policy.drools.utils.Pair; -import org.onap.policy.drools.utils.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,7 +91,7 @@ public class FeatureServerPool // used for JSON <-> String conversion private static StandardCoder coder = new StandardCoder(); - private static final String configFile = + private static final String CONFIG_FILE = "config/feature-server-pool.properties"; /* @@ -142,6 +135,15 @@ public class FeatureServerPool private static long droolsTimeoutMillis; private static String timeToLiveSecond; + // HTTP query parameters + private static final String QP_KEYWORD = "keyword"; + private static final String QP_SESSION = "session"; + private static final String QP_BUCKET = "bucket"; + private static final String QP_TTL = "ttl"; + private static final String QP_CONTROLLER = "controller"; + private static final String QP_PROTOCOL = "protocol"; + private static final String QP_TOPIC = "topic"; + /******************************/ /* 'OrderedService' interface */ /******************************/ @@ -166,7 +168,7 @@ public class FeatureServerPool @Override public boolean afterStart(PolicyEngine engine) { logger.info("Starting FeatureServerPool"); - Server.startup(configFile); + Server.startup(CONFIG_FILE); TargetLock.startup(); droolsTimeoutMillis = getProperty(BUCKET_DROOLS_TIMEOUT, DEFAULT_BUCKET_DROOLS_TIMEOUT); @@ -251,10 +253,10 @@ public class FeatureServerPool + session.getName(); return webTarget - .queryParam("keyword", keyword) - .queryParam("session", encodedSessionName) - .queryParam("bucket", bucketNumber) - .queryParam("ttl", timeToLiveSecond); + .queryParam(QP_KEYWORD, keyword) + .queryParam(QP_SESSION, encodedSessionName) + .queryParam(QP_BUCKET, bucketNumber) + .queryParam(QP_TTL, timeToLiveSecond); } @Override @@ -320,6 +322,20 @@ public class FeatureServerPool path[path.length - 1] = fieldName; } keyword = sco.getString(path); + if (keyword != null) { + if (conversionFunctionName == null) { + // We found a keyword -- we don't need to try other paths, + // so we should break out of the loop + break; + } + + // we have post-processing to do + keyword = Keyword.convertKeyword(keyword, conversionFunctionName); + if (keyword != null) { + // conversion was successful + break; + } + } } if (keyword == null) { @@ -451,13 +467,16 @@ public class FeatureServerPool } } } - } else if ((ttl -= 1) > 0) { - /* - * This host is not the intended destination -- this could happen - * if it was sent from another site. Forward the message in the - * same thread. - */ - forwardInsertDroolsMessage(bucket, keyword, sessionName, ttl, data); + } else { + ttl -= 1; + if (ttl > 0) { + /* + * This host is not the intended destination -- this could happen + * if it was sent from another site. Forward the message in the + * same thread. + */ + forwardInsertDroolsMessage(bucket, keyword, sessionName, ttl, data); + } } } @@ -496,10 +515,10 @@ public class FeatureServerPool Entity.entity(new String(data, StandardCharsets.UTF_8), MediaType.APPLICATION_OCTET_STREAM_TYPE); webTarget - .queryParam("keyword", keyword) - .queryParam("session", sessionName) - .queryParam("bucket", bucket) - .queryParam("ttl", ttl) + .queryParam(QP_KEYWORD, keyword) + .queryParam(QP_SESSION, sessionName) + .queryParam(QP_BUCKET, bucket) + .queryParam(QP_TTL, ttl) .request().post(entity); } } @@ -683,17 +702,22 @@ public class FeatureServerPool @Override public WebTarget webTarget(WebTarget webTarget) { return webTarget - .queryParam("bucket", bucketNumber) - .queryParam("keyword", keyword) - .queryParam("controller", controller.getName()) - .queryParam("protocol", protocol.toString()) - .queryParam("topic", topic); + .queryParam(QP_BUCKET, bucketNumber) + .queryParam(QP_KEYWORD, keyword) + .queryParam(QP_CONTROLLER, controller.getName()) + .queryParam(QP_PROTOCOL, protocol.toString()) + .queryParam(QP_TOPIC, topic); } @Override public void response(Response response) { - // TODO: eventually, we will want to do something different - // based upon success/failure + // log a message indicating success/failure + int status = response.getStatus(); + if (status >= 200 && status <= 299) { + logger.info("/bucket/topic response code = {}", status); + } else { + logger.error("/bucket/topic response code = {}", status); + } } }); } @@ -749,30 +773,28 @@ public class FeatureServerPool logger.info("{}: about to fetch data for session {}", this, session.getFullName()); - kieSession.insert(new DroolsRunnable() { - @Override - public void run() { - List<Object> droolsObjects = new ArrayList<>(); - for (FactHandle fh : kieSession.getFactHandles()) { - Object obj = kieSession.getObject(fh); - String keyword = Keyword.lookupKeyword(obj); - if (keyword != null - && Bucket.bucketNumber(keyword) == bucketNumber) { - // bucket matches -- include this object - droolsObjects.add(obj); - /* - * delete this factHandle from Drools memory - * this classes are used in bucket migration, - * so the delete is intentional. - */ - kieSession.delete(fh); - } + DroolsRunnable backupAndRemove = () -> { + List<Object> droolsObjects = new ArrayList<>(); + for (FactHandle fh : kieSession.getFactHandles()) { + Object obj = kieSession.getObject(fh); + String keyword = Keyword.lookupKeyword(obj); + if (keyword != null + && Bucket.bucketNumber(keyword) == bucketNumber) { + // bucket matches -- include this object + droolsObjects.add(obj); + /* + * delete this factHandle from Drools memory + * this classes are used in bucket migration, + * so the delete is intentional. + */ + kieSession.delete(fh); } - - // send notification that object list is complete - droolsObjectsWrapper.complete(droolsObjects); } - }); + + // send notification that object list is complete + droolsObjectsWrapper.complete(droolsObjects); + }; + kieSession.insert(backupAndRemove); // add pending operation to the list pendingData.add(new Pair<>(droolsObjectsWrapper, session)); @@ -858,6 +880,7 @@ public class FeatureServerPool } } catch (InterruptedException e) { logger.error("Exception in {}", this, e); + Thread.currentThread().interrupt(); } } } @@ -957,24 +980,22 @@ public class FeatureServerPool final KieSession kieSession = session.getKieSession(); // run the following within the Drools session thread - kieSession.insert(new DroolsRunnable() { - @Override - public void run() { - try { - /* - * Insert all of the objects -- note that this is running - * in the session thread, so no other rules can fire - * until all of the objects are inserted. - */ - for (Object obj : droolsObjects) { - kieSession.insert(obj); - } - } finally { - // send notification that the inserts have completed - sessionLatch.countDown(); + DroolsRunnable doRestore = () -> { + try { + /* + * Insert all of the objects -- note that this is running + * in the session thread, so no other rules can fire + * until all of the objects are inserted. + */ + for (Object droolsObj : droolsObjects) { + kieSession.insert(droolsObj); } + } finally { + // send notification that the inserts have completed + sessionLatch.countDown(); } - }); + }; + kieSession.insert(doRestore); return sessionLatch; } else { logger.error("{}: Invalid session data for session={}, type={}", diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java index 6c88ebd0..e0b97fda 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java @@ -47,16 +47,11 @@ public class Keyword { // this table can be used to map an object class into the method // to invoke to do the lookup - private static ConcurrentHashMap<Class, Lookup> classToLookup = + private static ConcurrentHashMap<Class<?>, Lookup> classToLookup = new ConcurrentHashMap<>(); // this is a pre-defined 'Lookup' instance that always returns 'null' - private static Lookup nullLookup = new Lookup() { - @Override - public String getKeyword(Object obj) { - return null; - } - }; + private static Lookup nullLookup = (Object obj) -> (String) null; /** * This method takes the object's class, looks it up in the 'classToLookup' @@ -78,7 +73,7 @@ public class Keyword { // try to locate a matching entry using 'inheritance' rules Class<?> thisClass = obj.getClass(); Class<?> matchingClass = null; - for (Map.Entry<Class, Lookup> entry : classToLookup.entrySet()) { + for (Map.Entry<Class<?>, Lookup> entry : classToLookup.entrySet()) { if (entry.getKey().isAssignableFrom(thisClass) && (matchingClass == null || matchingClass.isAssignableFrom(entry.getKey()))) { @@ -173,7 +168,14 @@ public class Keyword { } } - return lookupClassByName(classNameToSequence, clazz); + Class<?> keyClass = buildReflectiveLookup_findKeyClass(clazz); + + if (keyClass == null) { + // no matching class name found + return null; + } + + return buildReflectiveLookup_build(clazz, keyClass); } /** @@ -182,8 +184,7 @@ public class Keyword { * interfaces. If no match is found, repeat with the superclass, * and all the way up the superclass chain. */ - private static Lookup lookupClassByName(Map<String, String> classNameToSequence, - Class<?> clazz) { + private static Class<?> buildReflectiveLookup_findKeyClass(Class<?> clazz) { Class<?> keyClass = null; for (Class<?> cl = clazz ; cl != null ; cl = cl.getSuperclass()) { if (classNameToSequence.containsKey(cl.getName())) { @@ -210,11 +211,10 @@ public class Keyword { break; } } + return keyClass; + } - if (keyClass == null) { - // no matching class name found - return null; - } + private static Lookup buildReflectiveLookup_build(Class<?> clazz, Class<?> keyClass) { // we found a matching key in the table -- now, process the values Class<?> currentClass = keyClass; @@ -443,13 +443,10 @@ public class Keyword { static final int UUID_LENGTH = 36; static { - conversionFunction.put("uuid", new Function<String, String>() { - @Override - public String apply(String value) { - // truncate strings to 36 characters - return value != null && value.length() > UUID_LENGTH - ? value.substring(0, UUID_LENGTH) : value; - } + conversionFunction.put("uuid", value -> { + // truncate strings to 36 characters + return value != null && value.length() > UUID_LENGTH + ? value.substring(0, UUID_LENGTH) : value; }); } diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java index 9d864bd7..06b02527 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java @@ -83,6 +83,13 @@ class Leader { private static int stableVotingCycles; /** + * Hide implicit public constructor. + */ + private Leader() { + // everything here is static -- no instances of this class are created + } + + /** * Invoked at startup, or after some events -- immediately start a new vote. */ static void startup() { @@ -125,23 +132,18 @@ class Leader { // decode base64 data final byte[] packet = Base64.getDecoder().decode(data); - MainLoop.queueWork(new Runnable() { - /** - * This method is running within the 'MainLoop' thread. - */ - @Override - public void run() { - // create the 'VoteCycle' state machine, if needed - if (voteCycle == null) { - voteCycle = new VoteCycle(); - MainLoop.addBackgroundWork(voteCycle); - } - try { - // pass data to 'VoteCycle' state machine - voteCycle.packetReceived(packet); - } catch (IOException e) { - logger.error("Exception in 'Leader.voteData", e); - } + MainLoop.queueWork(() -> { + // This runs within the 'MainLoop' thread -- + // create the 'VoteCycle' state machine, if needed + if (voteCycle == null) { + voteCycle = new VoteCycle(); + MainLoop.addBackgroundWork(voteCycle); + } + try { + // pass data to 'VoteCycle' state machine + voteCycle.packetReceived(packet); + } catch (IOException e) { + logger.error("Exception in 'Leader.voteData", e); } }); } @@ -250,94 +252,107 @@ class Leader { @Override public void run() { switch (state) { - case STARTUP: { - // 5-second grace period -- wait for things to stablize before - // starting the vote - if ((cycleCount -= 1) <= 0) { - logger.info("VoteCycle: {} seconds have passed", - stableIdleCycles); - //MainLoop.removeBackgroundWork(this); - updateMyVote(); - sendOutUpdates(); - state = State.VOTING; - cycleCount = stableVotingCycles; - } + case STARTUP: + startupState(); break; - } - case VOTING: { - // need to be in the VOTING state without any vote changes - // for 5 seconds -- once this happens, the leader is chosen - if (sendOutUpdates()) { - // changes have occurred -- set the grace period to 5 seconds - cycleCount = stableVotingCycles; - } else if ((cycleCount -= 1) <= 0) { - // 5 second grace period has passed -- the leader is one with - // the most votes, which is the first entry in 'voteData' - Server oldLeader = leader; - leader = Server.getServer(voteData.first().uuid); - if (leader != oldLeader) { - // the leader has changed -- send out notifications - for (Events listener : Events.getListeners()) { - listener.newLeader(leader); - } - } else { - // the election is over, and the leader has been confirmed - for (Events listener : Events.getListeners()) { - listener.leaderConfirmed(leader); - } - } - if (leader == Server.getThisServer()) { - // this is the lead server -- - // make sure the 'Discovery' threads are running - Discovery.startDiscovery(); - } else { - // this is not the lead server -- stop 'Discovery' threads - Discovery.stopDiscovery(); - } - - // we are done with voting -- clean up, and report results - MainLoop.removeBackgroundWork(this); - voteCycle = null; - - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - PrintStream out = new PrintStream(bos); - - out.println("Voting results:"); - - // x(36) xxxxx x(36) - // UUID Votes Voter - String format = "%-36s %5s %-36s\n"; - out.format(format, "UUID", "Votes", "Voter(s)"); - out.format(format, "----", "-----", "--------"); - - for (VoteData vote : voteData) { - if (vote.voters.isEmpty()) { - out.format(format, vote.uuid, 0, ""); - } else { - boolean headerNeeded = true; - for (VoterData voter : vote.voters) { - if (headerNeeded) { - out.format(format, vote.uuid, - vote.voters.size(), voter.uuid); - headerNeeded = false; - } else { - out.format(format, "", "", voter.uuid); - } - } - } - } - - logger.info(bos.toString()); - } + case VOTING: + votingState(); break; - } + default: logger.error("Unknown state: {}", state); break; } } + private void startupState() { + // 5-second grace period -- wait for things to stablize before + // starting the vote + cycleCount -= 1; + if (cycleCount <= 0) { + logger.info("VoteCycle: {} seconds have passed", + stableIdleCycles); + updateMyVote(); + sendOutUpdates(); + state = State.VOTING; + cycleCount = stableVotingCycles; + } + } + + private void votingState() { + // need to be in the VOTING state without any vote changes + // for 5 seconds -- once this happens, the leader is chosen + if (sendOutUpdates()) { + // changes have occurred -- set the grace period to 5 seconds + cycleCount = stableVotingCycles; + return; + } + + cycleCount -= 1; + if (cycleCount > 0) { + return; + } + + // 5 second grace period has passed -- the leader is one with + // the most votes, which is the first entry in 'voteData' + Server oldLeader = leader; + leader = Server.getServer(voteData.first().uuid); + if (leader != oldLeader) { + // the leader has changed -- send out notifications + for (Events listener : Events.getListeners()) { + listener.newLeader(leader); + } + } else { + // the election is over, and the leader has been confirmed + for (Events listener : Events.getListeners()) { + listener.leaderConfirmed(leader); + } + } + if (leader == Server.getThisServer()) { + // this is the lead server -- + // make sure the 'Discovery' threads are running + Discovery.startDiscovery(); + } else { + // this is not the lead server -- stop 'Discovery' threads + Discovery.stopDiscovery(); + } + + // we are done with voting -- clean up, and report results + MainLoop.removeBackgroundWork(this); + voteCycle = null; + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(bos); + + out.println("Voting results:"); + + // x(36) xxxxx x(36) + // UUID Votes Voter + String format = "%-36s %5s %-36s\n"; + out.format(format, "UUID", "Votes", "Voter(s)"); + out.format(format, "----", "-----", "--------"); + + for (VoteData vote : voteData) { + if (vote.voters.isEmpty()) { + out.format(format, vote.uuid, 0, ""); + continue; + } + boolean headerNeeded = true; + for (VoterData voter : vote.voters) { + if (headerNeeded) { + out.format(format, vote.uuid, + vote.voters.size(), voter.uuid); + headerNeeded = false; + } else { + out.format(format, "", "", voter.uuid); + } + } + } + + logger.info(bos.toString()); + } + /** * Process an incoming /vote REST message. * @@ -375,7 +390,7 @@ class Leader { private void processVote(UUID voter, UUID vote, long timestamp) { // fetch old data for this voter VoterData voterData = uuidToVoterData.computeIfAbsent(voter, - (key) -> new VoterData(voter, timestamp)); + key -> new VoterData(voter, timestamp)); if (timestamp >= voterData.timestamp) { // this is a new vote for this voter -- update the timestamp voterData.timestamp = timestamp; @@ -389,7 +404,7 @@ class Leader { VoteData newVoteData = null; if (vote != null) { - newVoteData = uuidToVoteData.computeIfAbsent(vote, (key) -> new VoteData(vote)); + newVoteData = uuidToVoteData.computeIfAbsent(vote, key -> new VoteData(vote)); } if (oldVoteData != newVoteData) { diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/MainLoop.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/MainLoop.java index 1ed7ecb2..1c6281d9 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/MainLoop.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/MainLoop.java @@ -159,8 +159,8 @@ class MainLoop extends Thread { /** * Poll for and process incoming messages for up to 1 second. */ - static void handleIncomingWork() throws InterruptedException { - long currentTime = System.currentTimeMillis();; + static void handleIncomingWork() { + long currentTime = System.currentTimeMillis(); long wakeUpTime = currentTime + cycleTime; long timeDiff; @@ -176,7 +176,8 @@ class MainLoop extends Thread { work.run(); } catch (InterruptedException e) { logger.error("Interrupted in MainLoop"); - throw(e); + Thread.currentThread().interrupt(); + return; } catch (Exception e) { logger.error("Exception in MainLoop incoming work", e); } diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/RestServerPool.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/RestServerPool.java index 1c4cc7ba..8ece943e 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/RestServerPool.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/RestServerPool.java @@ -41,12 +41,6 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import org.onap.policy.drools.serverpool.Bucket; -import org.onap.policy.drools.serverpool.FeatureServerPool; -import org.onap.policy.drools.serverpool.Leader; -import org.onap.policy.drools.serverpool.Server; -import org.onap.policy.drools.serverpool.TargetLock; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java index 52e3d2dc..8ee0f2d2 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java @@ -62,7 +62,6 @@ import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.text.SimpleDateFormat; -import java.util.Arrays; import java.util.Base64; import java.util.Collection; import java.util.Date; @@ -81,15 +80,12 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import javax.servlet.ServletException; -import javax.ws.rs.ProcessingException; import javax.ws.rs.client.Client; import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import org.eclipse.jetty.server.ServerConnector; import org.glassfish.jersey.client.ClientProperties; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.http.client.HttpClient; @@ -97,7 +93,6 @@ import org.onap.policy.common.endpoints.http.client.HttpClientConfigException; import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance; import org.onap.policy.common.endpoints.http.server.HttpServletServer; import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance; -import org.onap.policy.drools.system.PolicyEngineConstants; import org.onap.policy.drools.utils.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,10 +121,6 @@ public class Server implements Comparable<Server> { // the current REST server private static HttpServletServer restServer; - // incoming packets from HTTP - private static LinkedTransferQueue<byte[]> incomingPackets = - new LinkedTransferQueue<>(); - /*==================================================*/ /* Some properties extracted at initialization time */ /*==================================================*/ @@ -212,6 +203,9 @@ public class Server implements Comparable<Server> { static final int SOCKET_ADDRESS_TAG = 1; static final int SITE_SOCKET_ADDRESS_TAG = 2; + // 'pingHosts' error + static final String PINGHOSTS_ERROR = "Server.pingHosts error"; + /*==============================*/ /* Comparable<Server> interface */ /*==============================*/ @@ -311,6 +305,7 @@ public class Server implements Comparable<Server> { InetAddress address = InetAddress.getByName(ipAddressString); InetSocketAddress socketAddress = new InetSocketAddress(address, port); + possibleError = "HTTP server initialization error"; restServer = HttpServletServerFactoryInstance.getServerFactory().build( "SERVER-POOL", // name useHttps, // https @@ -332,7 +327,9 @@ public class Server implements Comparable<Server> { } // we may not know the port until after the server is started + possibleError = "HTTP server start error"; restServer.start(); + possibleError = null; // determine the address to use if (DEFAULT_SERVER_IP_ADDRESS.contentEquals(address.getHostAddress())) { @@ -346,13 +343,10 @@ public class Server implements Comparable<Server> { // start background thread MainLoop.startThread(); - MainLoop.queueWork(new Runnable() { - @Override - public void run() { - // run this in the 'MainLoop' thread - Leader.startup(); - Bucket.startup(); - } + MainLoop.queueWork(() -> { + // run this in the 'MainLoop' thread + Leader.startup(); + Bucket.startup(); }); logger.info("Listening on port {}", port); @@ -491,14 +485,12 @@ public class Server implements Comparable<Server> { int tag; while ((tag = is.readUnsignedByte()) != END_OF_PARAMETERS_TAG) { switch (tag) { - case SOCKET_ADDRESS_TAG: { + case SOCKET_ADDRESS_TAG: socketAddress = readSocketAddress(is); break; - } - case SITE_SOCKET_ADDRESS_TAG: { + case SITE_SOCKET_ADDRESS_TAG: siteSocketAddress = readSocketAddress(is); break; - } default: // ignore tag logger.error("Illegal tag: {}", tag); @@ -513,8 +505,7 @@ public class Server implements Comparable<Server> { * @param is the 'DataInputStream' * @return the 'InetSocketAddress' */ - private static InetSocketAddress readSocketAddress(DataInputStream is) - throws IOException, UnknownHostException { + private static InetSocketAddress readSocketAddress(DataInputStream is) throws IOException { byte[] ipAddress = new byte[4]; is.read(ipAddress, 0, 4); @@ -926,47 +917,45 @@ public class Server implements Comparable<Server> { return; } - getThreadPool().execute(new Runnable() { + getThreadPool().execute(() -> { /** * This method is running within the 'MainLoop' thread. */ - @Override - public void run() { - try { - WebTarget webTarget = target.path(path); - if (responseCallback != null) { - // give callback a chance to modify 'WebTarget' - webTarget = responseCallback.webTarget(webTarget); - - // send the response to the callback - Response response; - if (entity == null) { - response = webTarget.request().get(); - } else { - response = webTarget.request().post(entity); - } - responseCallback.response(response); + try { + WebTarget webTarget = target.path(path); + if (responseCallback != null) { + // give callback a chance to modify 'WebTarget' + webTarget = responseCallback.webTarget(webTarget); + + // send the response to the callback + Response response; + if (entity == null) { + response = webTarget.request().get(); } else { - // just do the invoke, and ignore the response - if (entity == null) { - webTarget.request().get(); - } else { - webTarget.request().post(entity); - } + response = webTarget.request().post(entity); } - } catch (Exception e) { - logger.error("Failed to send to {} ({}, {})", - uuid, destSocketAddress, destName); + responseCallback.response(response); + } else { + // just do the invoke, and ignore the response + if (entity == null) { + webTarget.request().get(); + } else { + webTarget.request().post(entity); + } + } + } catch (Exception e) { + logger.error("Failed to send to {} ({}, {})", + uuid, destSocketAddress, destName); + if (responseCallback != null) { responseCallback.exceptionResponse(e); - MainLoop.queueWork(new Runnable() { - @Override - public void run() { - // the DNS cache may have been out-of-date when this server - // was first contacted -- fix the problem, if needed - checkServer(); - } - }); } + MainLoop.queueWork(() -> { + // this runs in the 'MainLoop' thread + + // the DNS cache may have been out-of-date when this server + // was first contacted -- fix the problem, if needed + checkServer(); + }); } }); } @@ -1037,18 +1026,20 @@ public class Server implements Comparable<Server> { * in 'notifyList' (may need to build or rebuild 'notifyList'). */ static void sendOutData() throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - // include 'thisServer' in the data -- first, advance the count - if ((thisServer.count += 1) == 0) { + thisServer.count += 1; + if (thisServer.count == 0) { /* - * counter wrapped (0 is a special case); + * counter wrapped (0 is a special case) -- * actually, we could probably leave this out, because it would take * more than a century to wrap if the increment is 1 second */ thisServer.count = 1; } + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + thisServer.lastUpdateTime = System.currentTimeMillis(); thisServer.writeServerData(dos); @@ -1129,11 +1120,11 @@ public class Server implements Comparable<Server> { } } catch (NumberFormatException e) { out.println(host + ": Invalid port value"); - logger.error("Server.pingHosts error", e); + logger.error(PINGHOSTS_ERROR, e); error = true; } catch (UnknownHostException e) { out.println(host + ": Unknown host"); - logger.error("Server.pingHosts error", e); + logger.error(PINGHOSTS_ERROR, e); error = true; } } @@ -1152,58 +1143,58 @@ public class Server implements Comparable<Server> { */ static void pingHosts(final PrintStream out, final Collection<InetSocketAddress> hosts) { - FutureTask<Integer> ft = new FutureTask<>(new Callable<Integer>() { - @Override - public Integer call() { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - - // add information for this server only - try { - thisServer.writeServerData(dos); - - // create an 'Entity' that can be sent out to all hosts - Entity<String> entity = Entity.entity( - new String(Base64.getEncoder().encode(bos.toByteArray()), - StandardCharsets.UTF_8), - MediaType.APPLICATION_OCTET_STREAM_TYPE); - - // loop through hosts - for (InetSocketAddress host : hosts) { - HttpClient client = null; - - try { - client = buildClient(host.toString(), host, + FutureTask<Integer> ft = new FutureTask<>(() -> { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + + // add information for this server only + try { + thisServer.writeServerData(dos); + + // create an 'Entity' that can be sent out to all hosts + Entity<String> entity = Entity.entity( + new String(Base64.getEncoder().encode(bos.toByteArray()), + StandardCharsets.UTF_8), + MediaType.APPLICATION_OCTET_STREAM_TYPE); + + // loop through hosts + for (InetSocketAddress host : hosts) { + HttpClient httpClient = null; + + try { + httpClient = buildClient(host.toString(), host, socketAddressToName(host)); - getTarget(client).path("admin").request().post(entity); - client.shutdown(); - client = null; - } catch (KeyManagementException | NoSuchAlgorithmException e) { - out.println(host + ": Unable to create client connection"); - logger.error("Server.pingHosts error", e); - } catch (NoSuchFieldException | IllegalAccessException e) { - out.println(host + ": Unable to get link to target"); - logger.error("Server.pingHosts error", e); - } catch (Exception e) { - out.println(host + ": " + e); - logger.error("Server.pingHosts error", e); - } - if (client != null) { - client.shutdown(); - } + getTarget(httpClient).path("admin").request().post(entity); + httpClient.shutdown(); + httpClient = null; + } catch (KeyManagementException | NoSuchAlgorithmException e) { + out.println(host + ": Unable to create client connection"); + logger.error(PINGHOSTS_ERROR, e); + } catch (NoSuchFieldException | IllegalAccessException e) { + out.println(host + ": Unable to get link to target"); + logger.error(PINGHOSTS_ERROR, e); + } catch (Exception e) { + out.println(host + ": " + e); + logger.error(PINGHOSTS_ERROR, e); + } + if (httpClient != null) { + httpClient.shutdown(); } - } catch (IOException e) { - out.println("Unable to generate 'ping' data: " + e); - logger.error("Server.pingHosts error", e); } - return 0; + } catch (IOException e) { + out.println("Unable to generate 'ping' data: " + e); + logger.error(PINGHOSTS_ERROR, e); } + return 0; }); MainLoop.queueWork(ft); try { ft.get(60, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { + } catch (InterruptedException e) { + logger.error("Server.pingHosts: interrupted waiting for queued work", e); + Thread.currentThread().interrupt(); + } catch (ExecutionException | TimeoutException e) { logger.error("Server.pingHosts: error waiting for queued work", e); } } @@ -1215,16 +1206,17 @@ public class Server implements Comparable<Server> { * @param out the 'PrintStream' to dump the table to */ public static void dumpHosts(final PrintStream out) { - FutureTask<Integer> ft = new FutureTask<Integer>(new Callable<Integer>() { - public Integer call() { - dumpHostsInternal(out); - return 0; - } + FutureTask<Integer> ft = new FutureTask<>(() -> { + dumpHostsInternal(out); + return 0; }); MainLoop.queueWork(ft); try { ft.get(60, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { + } catch (InterruptedException e) { + logger.error("Server.dumpHosts: interrupted waiting for queued work", e); + Thread.currentThread().interrupt(); + } catch (ExecutionException | TimeoutException e) { logger.error("Server.dumpHosts: error waiting for queued work", e); } } @@ -1278,12 +1270,6 @@ public class Server implements Comparable<Server> { } else if (localNotifyList.contains(server)) { thisOne = "n"; } - /* - else if (newHosts.contains(server)) - { - thisOne = "N"; - } - */ if (siteData) { String siteIp = ""; diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ServerPoolProperties.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ServerPoolProperties.java index fb6a791e..61188e6b 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ServerPoolProperties.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ServerPoolProperties.java @@ -235,6 +235,13 @@ public class ServerPoolProperties { private static Properties properties = new Properties(); /** + * Hide implicit public constructor. + */ + private ServerPoolProperties() { + // everything here is static -- no instances of this class are created + } + + /** * Store the application properties values. * * @param properties the properties to save diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java index 7e4b795f..65804082 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java @@ -101,8 +101,8 @@ public class TargetLock implements Lock, Serializable { private static ReferenceQueue<TargetLock> abandoned = new ReferenceQueue<>(); // some status codes - static final int ACCEPTED = 202; //Response.Status.ACCEPTED.getStatusCode(); - static final int NO_CONTENT = 204; //Response.Status.NO_CONTENT.getStatusCode(); + static final int ACCEPTED = 202; //Response.Status.ACCEPTED.getStatusCode() + static final int NO_CONTENT = 204; //Response.Status.NO_CONTENT.getStatusCode() static final int LOCKED = 423; // Values extracted from properties @@ -131,13 +131,21 @@ public class TargetLock implements Lock, Serializable { // this is used to notify the application when a lock is available, // or if it is not available - private LockCallback owner; + private volatile LockCallback owner; // This is what is actually called by the infrastructure to do the owner // notification. The owner may be running in a Drools session, in which case // the actual notification should be done within that thread -- the 'context' // object ensures that it happens this way. - private LockCallback context; + private volatile LockCallback context; + + // HTTP query parameters + private static final String QP_KEY = "key"; + private static final String QP_OWNER = "owner"; + private static final String QP_UUID = "uuid"; + private static final String QP_WAIT = "wait"; + private static final String QP_SERVER = "server"; + private static final String QP_TTL = "ttl"; /** * This method triggers registration of 'eventHandler', and also extracts @@ -221,7 +229,7 @@ public class TargetLock implements Lock, Serializable { if (session != null) { // deliver through a 'PolicySessionContext' class Object lcontext = session.getAdjunct(PolicySessionContext.class); - if (lcontext == null || !(lcontext instanceof LockCallback)) { + if (!(lcontext instanceof LockCallback)) { context = new PolicySessionContext(session); session.setAdjunct(PolicySessionContext.class, context); } else { @@ -301,11 +309,11 @@ public class TargetLock implements Lock, Serializable { @Override public WebTarget webTarget(WebTarget webTarget) { return webTarget - .queryParam("key", key) - .queryParam("owner", ownerKey) - .queryParam("uuid", identity.uuid.toString()) - .queryParam("wait", waitForLock) - .queryParam("ttl", timeToLive); + .queryParam(QP_KEY, key) + .queryParam(QP_OWNER, ownerKey) + .queryParam(QP_UUID, identity.uuid.toString()) + .queryParam(QP_WAIT, waitForLock) + .queryParam(QP_TTL, timeToLive); } /** @@ -323,14 +331,13 @@ public class TargetLock implements Lock, Serializable { * 423 (Locked) - lock in use, and 'waitForLock' is 'false' */ switch (response.getStatus()) { - case NO_CONTENT: { + case NO_CONTENT: // lock successful setState(State.ACTIVE); context.lockAvailable(TargetLock.this); break; - } - case LOCKED: { + case LOCKED: // failed -- lock in use, and 'waitForLock == false' setState(State.FREE); synchronized (localLocks) { @@ -340,13 +347,12 @@ public class TargetLock implements Lock, Serializable { wr.clear(); context.lockUnavailable(TargetLock.this); break; - } case ACCEPTED: break; default: - logger.error("Unknown status: ", response.getStatus()); + logger.error("Unknown status: {}", response.getStatus()); break; } } @@ -434,6 +440,7 @@ public class TargetLock implements Lock, Serializable { */ @Override public void extend(int holdSec, LockCallback callback) { + // not implemented yet } /********************/ @@ -473,7 +480,8 @@ public class TargetLock implements Lock, Serializable { if (!Bucket.isKeyOnThisServer(key)) { // this is the wrong server -- forward to the correct one // (we can use this thread) - if ((ttl -= 1) > 0) { + ttl -= 1; + if (ttl > 0) { Server server = Bucket.bucketToServer(Bucket.bucketNumber(key)); if (server != null) { WebTarget webTarget = server.getWebTarget("lock/lock"); @@ -483,11 +491,11 @@ public class TargetLock implements Lock, Serializable { server.getUuid(), key, ownerKey, uuid, waitForLock, ttl); return webTarget - .queryParam("key", key) - .queryParam("owner", ownerKey) - .queryParam("uuid", uuid.toString()) - .queryParam("wait", waitForLock) - .queryParam("ttl", String.valueOf(ttl)) + .queryParam(QP_KEY, key) + .queryParam(QP_OWNER, ownerKey) + .queryParam(QP_UUID, uuid.toString()) + .queryParam(QP_WAIT, waitForLock) + .queryParam(QP_TTL, String.valueOf(ttl)) .request().get(); } } @@ -527,7 +535,8 @@ public class TargetLock implements Lock, Serializable { if (!Bucket.isKeyOnThisServer(key)) { // this is the wrong server -- forward to the correct one // (we can use this thread) - if ((ttl -= 1) > 0) { + ttl -= 1; + if (ttl > 0) { Server server = Bucket.bucketToServer(Bucket.bucketNumber(key)); if (server != null) { WebTarget webTarget = server.getWebTarget("lock/free"); @@ -536,10 +545,10 @@ public class TargetLock implements Lock, Serializable { + "(key={},owner={},uuid={},ttl={})", server.getUuid(), key, ownerKey, uuid, ttl); return webTarget - .queryParam("key", key) - .queryParam("owner", ownerKey) - .queryParam("uuid", uuid.toString()) - .queryParam("ttl", String.valueOf(ttl)) + .queryParam(QP_KEY, key) + .queryParam(QP_OWNER, ownerKey) + .queryParam(QP_UUID, uuid.toString()) + .queryParam(QP_TTL, String.valueOf(ttl)) .request().get(); } } @@ -575,7 +584,8 @@ public class TargetLock implements Lock, Serializable { if (!Bucket.isKeyOnThisServer(ownerKey)) { // this is the wrong server -- forward to the correct one // (we can use this thread) - if ((ttl -= 1) > 0) { + ttl -= 1; + if (ttl > 0) { Server server = Bucket.bucketToServer(Bucket.bucketNumber(key)); if (server != null) { WebTarget webTarget = server.getWebTarget("lock/locked"); @@ -584,10 +594,10 @@ public class TargetLock implements Lock, Serializable { + "(key={},owner={},uuid={},ttl={})", server.getUuid(), key, ownerKey, uuid, ttl); return webTarget - .queryParam("key", key) - .queryParam("owner", ownerKey) - .queryParam("uuid", uuid.toString()) - .queryParam("ttl", String.valueOf(ttl)) + .queryParam(QP_KEY, key) + .queryParam(QP_OWNER, ownerKey) + .queryParam(QP_UUID, uuid.toString()) + .queryParam(QP_TTL, String.valueOf(ttl)) .request().get(); } } @@ -744,6 +754,7 @@ public class TargetLock implements Lock, Serializable { */ @Override public void shutdown() { + // nothing needs to be done } /** @@ -887,10 +898,10 @@ public class TargetLock implements Lock, Serializable { @Override public WebTarget webTarget(WebTarget webTarget) { return webTarget - .queryParam("key", key) - .queryParam("owner", ownerKey) - .queryParam("uuid", uuid.toString()) - .queryParam("ttl", timeToLive); + .queryParam(QP_KEY, key) + .queryParam(QP_OWNER, ownerKey) + .queryParam(QP_UUID, uuid.toString()) + .queryParam(QP_TTL, timeToLive); } @Override @@ -898,21 +909,19 @@ public class TargetLock implements Lock, Serializable { logger.info("Free response={} (code={})", response, response.getStatus()); switch (response.getStatus()) { - case NO_CONTENT: { + case NO_CONTENT: // free successful -- don't need to do anything break; - } - case LOCKED: { + case LOCKED: // free failed logger.error("TargetLock free failed, " + "key={}, owner={}, uuid={}", key, ownerKey, uuid); break; - } default: - logger.error("Unknown status: ", response.getStatus()); + logger.error("Unknown status: {}", response.getStatus()); break; } } @@ -986,12 +995,10 @@ public class TargetLock implements Lock, Serializable { public void lockAvailable(final Lock lock) { // Run 'owner.lockAvailable' within the Drools session if (policySession != null) { - policySession.getKieSession().insert(new DroolsRunnable() { - @Override - public void run() { - ((TargetLock)lock).owner.lockAvailable(lock); - } - }); + DroolsRunnable callback = () -> { + ((TargetLock)lock).owner.lockAvailable(lock); + }; + policySession.getKieSession().insert(callback); } } @@ -1002,12 +1009,10 @@ public class TargetLock implements Lock, Serializable { public void lockUnavailable(Lock lock) { // Run 'owner.unlockAvailable' within the Drools session if (policySession != null) { - policySession.getKieSession().insert(new DroolsRunnable() { - @Override - public void run() { - ((TargetLock)lock).owner.lockUnavailable(lock); - } - }); + DroolsRunnable callback = () -> { + ((TargetLock)lock).owner.lockUnavailable(lock); + }; + policySession.getKieSession().insert(callback); } } @@ -1218,16 +1223,16 @@ public class TargetLock implements Lock, Serializable { */ private static class LockEntry implements Serializable { // string key identifying the lock - String key; + private String key; // string key identifying the owner - String currentOwnerKey; + private String currentOwnerKey; // UUID identifying the original 'TargetLock - UUID currentOwnerUuid; + private UUID currentOwnerUuid; // list of pending lock requests for this key - Queue<Waiting> waitingList = new LinkedList<>(); + private Queue<Waiting> waitingList = new LinkedList<>(); /** * Constructor - initialize the 'LockEntry'. @@ -1273,27 +1278,19 @@ public class TargetLock implements Lock, Serializable { @Override public WebTarget webTarget(WebTarget webTarget) { return webTarget - .queryParam("key", key) - .queryParam("owner", currentOwnerKey) - .queryParam("uuid", currentOwnerUuid.toString()) - .queryParam("ttl", timeToLive); + .queryParam(QP_KEY, key) + .queryParam(QP_OWNER, currentOwnerKey) + .queryParam(QP_UUID, currentOwnerUuid.toString()) + .queryParam(QP_TTL, timeToLive); } @Override public void response(Response response) { logger.info("Locked response={} (code={})", response, response.getStatus()); - switch (response.getStatus()) { - case NO_CONTENT: { - // successful -- we are done - break; - } - - default: { - // notification failed -- free this one - globalLocks.unlock(key, currentOwnerUuid); - break; - } + if (response.getStatus() != NO_CONTENT) { + // notification failed -- free this one + globalLocks.unlock(key, currentOwnerUuid); } } }); @@ -1409,7 +1406,6 @@ public class TargetLock implements Lock, Serializable { while (abandonedHandler != null) { try { Reference<? extends TargetLock> wr = abandoned.remove(); - TargetLock notify = null; // At this point, we know that 'ref' is a // 'WeakReference<TargetLock>' instance that has been abandoned, @@ -1515,7 +1511,8 @@ public class TargetLock implements Lock, Serializable { */ static byte[] dumpLocksData(UUID serverUuid, int ttl) throws IOException { if (!Server.getThisServer().getUuid().equals(serverUuid)) { - if ((ttl -= 1) > 0) { + ttl -= 1; + if (ttl > 0) { Server server = Server.getServer(serverUuid); if (server != null) { WebTarget webTarget = @@ -1524,8 +1521,8 @@ public class TargetLock implements Lock, Serializable { logger.info("Forwarding 'lock/dumpLocksData' to uuid {}", serverUuid); return webTarget - .queryParam("server", serverUuid.toString()) - .queryParam("ttl", String.valueOf(ttl)) + .queryParam(QP_SERVER, serverUuid.toString()) + .queryParam(QP_TTL, String.valueOf(ttl)) .request().get(byte[].class); } } @@ -1571,8 +1568,8 @@ public class TargetLock implements Lock, Serializable { @Override public WebTarget webTarget(WebTarget webTarget) { return webTarget - .queryParam("server", server.getUuid().toString()) - .queryParam("ttl", timeToLive); + .queryParam(QP_SERVER, server.getUuid().toString()) + .queryParam(QP_TTL, timeToLive); } @Override @@ -1621,128 +1618,144 @@ public class TargetLock implements Lock, Serializable { // process the client-end data for (ClientData clientData : hostData.clientDataList) { - // 'true' if the bucket associated with this 'ClientData' - // doesn't belong to the remote server, as far as we can tell - boolean serverMismatch = - Bucket.bucketToServer(clientData.bucketNumber) != server; - - // each 'ClientDataRecord' instance corresponds to an - // active 'Identity' (TargetLock) instance - for (ClientDataRecord cdr : clientData.clientDataRecords) { - // update maximum 'key' and 'ownerKey' lengths - updateKeyLength(cdr.identity.key); - updateOwnerKeyLength(cdr.identity.ownerKey); - - // fetch UUID - UUID uuid = cdr.identity.uuid; - - // fetch/generate 'MergeData' instance for this UUID - MergedData md = mergedDataMap.get(uuid); - if (md == null) { - md = new MergedData(uuid); - mergedDataMap.put(uuid, md); - } - - // update 'MergedData.clientDataRecord' - if (md.clientDataRecord == null) { - md.clientDataRecord = cdr; - } else { - md.comment("Duplicate client entry for UUID"); - } - - if (serverMismatch) { - // need to generate an additional error - md.comment(server.toString() - + "(client) does not own bucket " - + clientData.bucketNumber); - } - } + populateLockData_clientData(clientData, server); } // process the server-end data for (ServerData serverData : hostData.serverDataList) { - // 'true' if the bucket associated with this 'ServerData' - // doesn't belong to the remote server, as far as we can tell - boolean serverMismatch = - Bucket.bucketToServer(serverData.bucketNumber) != server; - - // each 'LockEntry' instance corresponds to the current holder - // of a lock, and all requestors waiting for it to be freed - for (LockEntry le : serverData.globalLocks.keyToEntry.values()) { - // update maximum 'key' and 'ownerKey' lengths - updateKeyLength(le.key); - updateOwnerKeyLength(le.currentOwnerKey); - - // fetch uuid - UUID uuid = le.currentOwnerUuid; - - // fetch/generate 'MergeData' instance for this UUID - MergedData md = mergedDataMap.get(uuid); - if (md == null) { - md = new MergedData(uuid); - mergedDataMap.put(uuid, md); - } + populateLockData_serverData(serverData, server); + } + } else { + logger.error("TargetLock.DumpLocks.populateLockData: " + + "received data has class {}", + decodedData.getClass().getName()); + } + } - // update 'lockEntries' table entry - if (lockEntries.get(le.key) != null) { - md.comment("Duplicate server entry for key " + le.key); - } else { - lockEntries.put(le.key, le); - } + private void populateLockData_clientData(ClientData clientData, Server server) { + // 'true' if the bucket associated with this 'ClientData' + // doesn't belong to the remote server, as far as we can tell + boolean serverMismatch = + Bucket.bucketToServer(clientData.bucketNumber) != server; - // update 'MergedData.serverLockEntry' - // (leave 'MergedData.serverWaiting' as 'null', because - // this field is only used for waiting entries) - if (md.serverLockEntry == null) { - md.serverLockEntry = le; - } else { - md.comment("Duplicate server entry for UUID"); - } + // each 'ClientDataRecord' instance corresponds to an + // active 'Identity' (TargetLock) instance + for (ClientDataRecord cdr : clientData.clientDataRecords) { + // update maximum 'key' and 'ownerKey' lengths + updateKeyLength(cdr.identity.key); + updateOwnerKeyLength(cdr.identity.ownerKey); - if (serverMismatch) { - // need to generate an additional error - md.comment(server.toString() - + "(server) does not own bucket " - + serverData.bucketNumber); - } + // fetch UUID + UUID uuid = cdr.identity.uuid; - // we need 'MergeData' entries for all waiting requests - for (Waiting waiting : le.waitingList) { - // update maximum 'ownerKey' length - updateOwnerKeyLength(waiting.ownerKey); + // fetch/generate 'MergeData' instance for this UUID + MergedData md = mergedDataMap.get(uuid); + if (md == null) { + md = new MergedData(uuid); + mergedDataMap.put(uuid, md); + } - // fetch uuid - uuid = waiting.ownerUuid; + // update 'MergedData.clientDataRecord' + if (md.clientDataRecord == null) { + md.clientDataRecord = cdr; + } else { + md.comment("Duplicate client entry for UUID"); + } - // fetch/generate 'MergeData' instance for this UUID - md = mergedDataMap.get(uuid); - if (md == null) { - md = new MergedData(uuid); - mergedDataMap.put(uuid, md); - } + if (serverMismatch) { + // need to generate an additional error + md.comment(server.toString() + + "(client) does not own bucket " + + clientData.bucketNumber); + } + } + } - // update 'MergedData.serverLockEntry' and - // 'MergedData.serverWaiting' - if (md.serverLockEntry == null) { - md.serverLockEntry = le; - md.serverWaiting = waiting; - } else { - md.comment("Duplicate server entry for UUID"); - } + private void populateLockData_serverData(ServerData serverData, Server server) { + // 'true' if the bucket associated with this 'ServerData' + // doesn't belong to the remote server, as far as we can tell + boolean serverMismatch = + Bucket.bucketToServer(serverData.bucketNumber) != server; - if (serverMismatch) { - // need to generate an additional error - md.comment(server.toString() - + "(server) does not own bucket " - + serverData.bucketNumber); - } - } - } + // each 'LockEntry' instance corresponds to the current holder + // of a lock, and all requestors waiting for it to be freed + for (LockEntry le : serverData.globalLocks.keyToEntry.values()) { + // update maximum 'key' and 'ownerKey' lengths + updateKeyLength(le.key); + updateOwnerKeyLength(le.currentOwnerKey); + + // fetch uuid + UUID uuid = le.currentOwnerUuid; + + // fetch/generate 'MergeData' instance for this UUID + MergedData md = mergedDataMap.get(uuid); + if (md == null) { + md = new MergedData(uuid); + mergedDataMap.put(uuid, md); + } + + // update 'lockEntries' table entry + if (lockEntries.get(le.key) != null) { + md.comment("Duplicate server entry for key " + le.key); + } else { + lockEntries.put(le.key, le); + } + + // update 'MergedData.serverLockEntry' + // (leave 'MergedData.serverWaiting' as 'null', because + // this field is only used for waiting entries) + if (md.serverLockEntry == null) { + md.serverLockEntry = le; + } else { + md.comment("Duplicate server entry for UUID"); + } + + if (serverMismatch) { + // need to generate an additional error + md.comment(server.toString() + + "(server) does not own bucket " + + serverData.bucketNumber); + } + + // we need 'MergeData' entries for all waiting requests + for (Waiting waiting : le.waitingList) { + populateLockData_serverData_waiting( + serverData, server, serverMismatch, le, waiting); } + } + } + + private void populateLockData_serverData_waiting( + ServerData serverData, Server server, boolean serverMismatch, + LockEntry le, Waiting waiting) { + + // update maximum 'ownerKey' length + updateOwnerKeyLength(waiting.ownerKey); + + // fetch uuid + UUID uuid = waiting.ownerUuid; + + // fetch/generate 'MergeData' instance for this UUID + MergedData md = mergedDataMap.get(uuid); + if (md == null) { + md = new MergedData(uuid); + mergedDataMap.put(uuid, md); + } + + // update 'MergedData.serverLockEntry' and + // 'MergedData.serverWaiting' + if (md.serverLockEntry == null) { + md.serverLockEntry = le; + md.serverWaiting = waiting; } else { - logger.error("TargetLock.DumpLocks.populateLockData: " - + "received data has class " - + decodedData.getClass().getName()); + md.comment("Duplicate server entry for UUID"); + } + + if (serverMismatch) { + // need to generate an additional error + md.comment(server.toString() + + "(server) does not own bucket " + + serverData.bucketNumber); } } @@ -1801,6 +1814,11 @@ public class TargetLock implements Lock, Serializable { out.printf(format, "---", "---------", "----", "-----", "--------"); } + dump_serverTable(out); + dump_clientOnlyEntries(out); + } + + private void dump_serverTable(PrintStream out) { // iterate over the server table for (LockEntry le : lockEntries.values()) { // fetch merged data @@ -1841,7 +1859,9 @@ public class TargetLock implements Lock, Serializable { dumpMoreComments(out, md); } } + } + private void dump_clientOnlyEntries(PrintStream out) { // client records that don't have matching server entries for (MergedData md : clientOnlyEntries.values()) { ClientDataRecord cdr = md.clientDataRecord; @@ -2017,13 +2037,13 @@ public class TargetLock implements Lock, Serializable { */ static class HostData implements Serializable { // the UUID of the host sending the data - UUID hostUuid; + private UUID hostUuid; // all of the information derived from the 'LocalLocks' data - List<ClientData> clientDataList; + private List<ClientData> clientDataList; // all of the information derived from the 'GlobalLocks' data - List<ServerData> serverDataList; + private List<ServerData> serverDataList; /** * Constructor - this goes through all of the lock tables, @@ -2086,10 +2106,10 @@ public class TargetLock implements Lock, Serializable { */ static class ClientData implements Serializable { // number of the bucket - int bucketNumber; + private int bucketNumber; // all of the client locks within this bucket - List<ClientDataRecord> clientDataRecords; + private List<ClientDataRecord> clientDataRecords; /** * Constructor - initially, there are no 'clientDataRecords'. @@ -2108,11 +2128,11 @@ public class TargetLock implements Lock, Serializable { */ static class ClientDataRecord implements Serializable { // contains key, ownerKey, uuid - Identity identity; + private Identity identity; // state field of 'TargetLock' // (may be 'null' if there is no 'TargetLock') - State state; + private State state; /** * Constructor - initialize the fields. @@ -2132,10 +2152,10 @@ public class TargetLock implements Lock, Serializable { */ static class ServerData implements Serializable { // number of the bucket - int bucketNumber; + private int bucketNumber; // server-side data associated with a single bucket - GlobalLocks globalLocks; + private GlobalLocks globalLocks; /** * Constructor - initialize the fields. @@ -2158,15 +2178,15 @@ public class TargetLock implements Lock, Serializable { */ static class AuditData implements Serializable { // sending UUID - UUID hostUuid; + private UUID hostUuid; // client records that currently exist, or records to be cleared // (depending upon message) -- client/server is from the senders side - List<Identity> clientData; + private List<Identity> clientData; // server records that currently exist, or records to be cleared // (depending upon message) -- client/server is from the senders side - List<Identity> serverData; + private List<Identity> serverData; /** * Constructor - set 'hostUuid' to the current host, and start with @@ -2174,8 +2194,8 @@ public class TargetLock implements Lock, Serializable { */ AuditData() { hostUuid = Server.getThisServer().getUuid(); - clientData = new ArrayList<Identity>(); - serverData = new ArrayList<Identity>(); + clientData = new ArrayList<>(); + serverData = new ArrayList<>(); } /** @@ -2191,8 +2211,17 @@ public class TargetLock implements Lock, Serializable { AuditData response = new AuditData(); // compare remote servers client data with our server data + generateResponse_clientEnd(response, includeWarnings); + + // test server data + generateResponse_serverEnd(response, includeWarnings); + + return response; + } + + private void generateResponse_clientEnd(AuditData response, boolean includeWarnings) { for (Identity identity : clientData) { - // we are the server in this case + // remote end is the client, and we are the server Bucket bucket = Bucket.getBucket(identity.key); GlobalLocks globalLocks = bucket.getAdjunctDontCreate(GlobalLocks.class); @@ -2240,10 +2269,11 @@ public class TargetLock implements Lock, Serializable { // it was 'clientData' to the sender, but 'serverData' to us response.serverData.add(identity); } + } - // test server data + private void generateResponse_serverEnd(AuditData response, boolean includeWarnings) { for (Identity identity : serverData) { - // we are the client in this case + // remote end is the server, and we are the client Bucket bucket = Bucket.getBucket(identity.ownerKey); LocalLocks localLocks = bucket.getAdjunctDontCreate(LocalLocks.class); @@ -2275,8 +2305,6 @@ public class TargetLock implements Lock, Serializable { } response.clientData.add(identity); } - - return response; } /** @@ -2400,19 +2428,14 @@ public class TargetLock implements Lock, Serializable { * Run a single audit cycle. */ static void runAudit() { - try { - logger.info("Starting TargetLock audit"); - Audit audit = new Audit(); + logger.info("Starting TargetLock audit"); + Audit audit = new Audit(); - // populate 'auditMap' table - audit.build(); + // populate 'auditMap' table + audit.build(); - // send to all of the servers in 'auditMap' (may include this server) - audit.send(); - } catch (InterruptedException e) { - logger.error("TargetLock audit interrupted", e); - Thread.currentThread().interrupt(); - } + // send to all of the servers in 'auditMap' (may include this server) + audit.send(); } /** @@ -2441,63 +2464,59 @@ public class TargetLock implements Lock, Serializable { // this needs to run in the 'MainLoop' thread, because it is dependent // upon the list of servers, and our position in this list - MainLoop.queueWork(new Runnable() { - /** - * {@inheritDoc} - */ - @Override - public void run() { - // current list of servers - Collection<Server> servers = Server.getServers(); + MainLoop.queueWork(() -> { + // this runs in the 'MainLoop' thread - // count of the number of servers - int count = servers.size(); + // current list of servers + Collection<Server> servers = Server.getServers(); - // will contain our position in this list - int index = 0; + // count of the number of servers + int count = servers.size(); - // current server - Server thisServer = Server.getThisServer(); + // will contain our position in this list + int index = 0; - for (Server server : servers) { - if (server == thisServer) { - break; - } - index += 1; + // current server + Server thisServer = Server.getThisServer(); + + for (Server server : servers) { + if (server == thisServer) { + break; } + index += 1; + } - // if index == count, we didn't find this server - // (which shouldn't happen) - - if (index < count) { - // The servers are ordered by UUID, and 'index' is this - // server's position within the list. Suppose the period is - // 60000 (60 seconds), and there are 5 servers -- the first one - // will run the audit at 0 seconds after the minute, the next - // at 12 seconds after the minute, then 24, 36, 48. - long offset = (period * index) / count; - - // the earliest time we want the audit to run - long time = System.currentTimeMillis() + gracePeriod; - long startTime = time - (time % period) + offset; - if (startTime <= time) { - startTime += period; + // if index == count, we didn't find this server + // (which shouldn't happen) + + if (index < count) { + // The servers are ordered by UUID, and 'index' is this + // server's position within the list. Suppose the period is + // 60000 (60 seconds), and there are 5 servers -- the first one + // will run the audit at 0 seconds after the minute, the next + // at 12 seconds after the minute, then 24, 36, 48. + long offset = (period * index) / count; + + // the earliest time we want the audit to run + long time = System.currentTimeMillis() + gracePeriod; + long startTime = time - (time % period) + offset; + if (startTime <= time) { + startTime += period; + } + synchronized (Audit.class) { + if (timerTask != null) { + timerTask.cancel(); } - synchronized (Audit.class) { - if (timerTask != null) { - timerTask.cancel(); + timerTask = new TimerTask() { + @Override + public void run() { + runAudit(); } - timerTask = new TimerTask() { - @Override - public void run() { - runAudit(); - } - }; + }; - // now, schedule the timer - Util.timer.scheduleAtFixedRate( - timerTask, new Date(startTime), period); - } + // now, schedule the timer + Util.timer.scheduleAtFixedRate( + timerTask, new Date(startTime), period); } } }); @@ -2514,7 +2533,8 @@ public class TargetLock implements Lock, Serializable { */ static byte[] incomingAudit(UUID serverUuid, int ttl, byte[] encodedData) { if (!Server.getThisServer().getUuid().equals(serverUuid)) { - if ((ttl -= 1) > 0) { + ttl -= 1; + if (ttl > 0) { Server server = Server.getServer(serverUuid); if (server != null) { WebTarget webTarget = server.getWebTarget("lock/audit"); @@ -2525,8 +2545,8 @@ public class TargetLock implements Lock, Serializable { Entity.entity(new String(encodedData), MediaType.APPLICATION_OCTET_STREAM_TYPE); return webTarget - .queryParam("server", serverUuid.toString()) - .queryParam("ttl", String.valueOf(ttl)) + .queryParam(QP_SERVER, serverUuid.toString()) + .queryParam(QP_TTL, String.valueOf(ttl)) .request().post(entity, byte[].class); } } @@ -2556,53 +2576,63 @@ public class TargetLock implements Lock, Serializable { Bucket bucket = Bucket.getBucket(i); // client data - LocalLocks localLocks = - bucket.getAdjunctDontCreate(LocalLocks.class); - if (localLocks != null) { - synchronized (localLocks) { - // we have client data for this bucket - for (Identity identity : - localLocks.weakReferenceToIdentity.values()) { - // find or create the 'AuditData' instance associated - // with the server owning the 'key' - AuditData auditData = getAuditData(identity.key); - if (auditData != null) { - auditData.clientData.add(identity); - } + build_clientData(bucket); + + // server data + build_serverData(bucket); + } + } + + private void build_clientData(Bucket bucket) { + // client data + LocalLocks localLocks = + bucket.getAdjunctDontCreate(LocalLocks.class); + if (localLocks != null) { + synchronized (localLocks) { + // we have client data for this bucket + for (Identity identity : + localLocks.weakReferenceToIdentity.values()) { + // find or create the 'AuditData' instance associated + // with the server owning the 'key' + AuditData auditData = getAuditData(identity.key); + if (auditData != null) { + auditData.clientData.add(identity); } } } + } + } - // server data - GlobalLocks globalLocks = - bucket.getAdjunctDontCreate(GlobalLocks.class); - if (globalLocks != null) { - // we have server data for this bucket - Map<String, LockEntry> keyToEntry = globalLocks.keyToEntry; - synchronized (keyToEntry) { - for (LockEntry le : keyToEntry.values()) { + private void build_serverData(Bucket bucket) { + // server data + GlobalLocks globalLocks = + bucket.getAdjunctDontCreate(GlobalLocks.class); + if (globalLocks != null) { + // we have server data for this bucket + Map<String, LockEntry> keyToEntry = globalLocks.keyToEntry; + synchronized (keyToEntry) { + for (LockEntry le : keyToEntry.values()) { + // find or create the 'AuditData' instance associated + // with the current 'ownerKey' + AuditData auditData = getAuditData(le.currentOwnerKey); + if (auditData != null) { + // create an 'Identity' entry, and add it to + // the list associated with the remote server + auditData.serverData.add( + new Identity(le.key, le.currentOwnerKey, + le.currentOwnerUuid)); + } + + for (Waiting waiting : le.waitingList) { // find or create the 'AuditData' instance associated - // with the current 'ownerKey' - AuditData auditData = getAuditData(le.currentOwnerKey); + // with the waiting entry 'ownerKey' + auditData = getAuditData(waiting.ownerKey); if (auditData != null) { // create an 'Identity' entry, and add it to // the list associated with the remote server auditData.serverData.add( - new Identity(le.key, le.currentOwnerKey, - le.currentOwnerUuid)); - } - - for (Waiting waiting : le.waitingList) { - // find or create the 'AuditData' instance associated - // with the waiting entry 'ownerKey' - auditData = getAuditData(waiting.ownerKey); - if (auditData != null) { - // create an 'Identity' entry, and add it to - // the list associated with the remote server - auditData.serverData.add( - new Identity(le.key, waiting.ownerKey, - waiting.ownerUuid)); - } + new Identity(le.key, waiting.ownerKey, + waiting.ownerUuid)); } } } @@ -2618,12 +2648,8 @@ public class TargetLock implements Lock, Serializable { // map 'key -> bucket number', and then 'bucket number' -> 'server' Server server = Bucket.bucketToServer(Bucket.bucketNumber(key)); if (server != null) { - AuditData auditData = auditMap.get(server); - if (auditData == null) { - // doesn't exist yet -- create it - auditData = new AuditData(); - auditMap.put(server, auditData); - } + AuditData auditData = + auditMap.computeIfAbsent(server, sk -> new AuditData()); return auditData; } @@ -2635,7 +2661,7 @@ public class TargetLock implements Lock, Serializable { * Using the collected 'auditMap', send out the messages to all of the * servers. */ - void send() throws InterruptedException { + void send() { if (auditMap.isEmpty()) { logger.info("TargetLock audit: no locks on this server"); } else { @@ -2644,178 +2670,176 @@ public class TargetLock implements Lock, Serializable { } for (final Server server : auditMap.keySet()) { - // fetch audit data - AuditData auditData = auditMap.get(server); + send_server(server); + } + } - if (server == Server.getThisServer()) { - // process this locally - final AuditData respData = auditData.generateResponse(true); - if (respData.clientData.isEmpty() - && respData.serverData.isEmpty()) { - // no mismatches - logger.info("TargetLock.Audit.send: " - + "no errors from self ({})", server); - continue; + private void send_server(final Server server) { + // fetch audit data + AuditData auditData = auditMap.get(server); + + if (server == Server.getThisServer()) { + // process this locally + final AuditData respData = auditData.generateResponse(true); + if (respData.clientData.isEmpty() + && respData.serverData.isEmpty()) { + // no mismatches + logger.info("TargetLock.Audit.send: " + + "no errors from self ({})", server); + return; + } + + // do the rest in a separate thread + server.getThreadPool().execute(() -> { + // wait a few seconds, and see if we still know of these + // errors + if (AuditPostResponse.responseSupport( + respData, "self (" + server + ")", + "TargetLock.Audit.send")) { + // a return falue of 'true' either indicates the + // mismatches were resolved after a retry, or we + // received an interrupt, and need to abort + return; } - // do the rest in a separate thread - server.getThreadPool().execute(new Runnable() { - @Override - public void run() { - // wait a few seconds, and see if we still know of these - // errors - logger.info("TargetLock.Audit.send: " - + "mismatches from self ({})", server); - try { - Thread.sleep(auditRetryDelay); - } catch (InterruptedException e) { - logger.error("TargetLock.Audit.send: Interrupted " - + "handling audit response from self ({})", - server); - // just abort - Thread.currentThread().interrupt(); - return; - } + // any mismatches left in 'respData' are still issues + respData.processResponse(server); + }); + return; + } - // This will check against our own data -- any mismatches - // mean that things have changed since we sent out the - // first message. We will remove any mismatches from - // 'respData', and see if there are any left. - AuditData mismatches = respData.generateResponse(false); - - respData.serverData.removeAll(mismatches.clientData); - respData.clientData.removeAll(mismatches.serverData); - - if (respData.clientData.isEmpty() - && respData.serverData.isEmpty()) { - // no mismatches -- - // there must have been transient issues on our side - logger.info("TargetLock.Audit.send: " - + "no mismatches from self " - + "({}) after retry", server); - return; - } + // serialize + byte[] encodedData = auditData.encode(); + if (encodedData == null) { + // error has already been displayed + return; + } - // any mismatches left in 'respData' are still issues - respData.processResponse(server); - } - }); - continue; - } + // generate entity + Entity<String> entity = + Entity.entity(new String(encodedData), + MediaType.APPLICATION_OCTET_STREAM_TYPE); - // serialize - byte[] encodedData = auditData.encode(); - if (encodedData == null) { - // error has already been displayed - continue; - } + server.post("lock/audit", entity, new AuditPostResponse(server)); + } + } - // generate entity - Entity<String> entity = - Entity.entity(new String(encodedData), - MediaType.APPLICATION_OCTET_STREAM_TYPE); + static class AuditPostResponse implements Server.PostResponse { + private Server server; - server.post("lock/audit", entity, new Server.PostResponse() { - @Override - public WebTarget webTarget(WebTarget webTarget) { - // include the 'uuid' keyword - return webTarget - .queryParam("server", server.getUuid().toString()) - .queryParam("ttl", timeToLive); - } + AuditPostResponse(Server server) { + this.server = server; + } - @Override - public void response(Response response) { - // process the response here - AuditData respData = - AuditData.decode(response.readEntity(byte[].class)); - if (respData == null) { - logger.error("TargetLock.Audit.send: " - + "couldn't process response from {}", - server); - return; - } + @Override + public WebTarget webTarget(WebTarget webTarget) { + // include the 'uuid' keyword + return webTarget + .queryParam(QP_SERVER, server.getUuid().toString()) + .queryParam(QP_TTL, timeToLive); + } - // if we reach this point, we got a response - if (respData.clientData.isEmpty() - && respData.serverData.isEmpty()) { - // no mismatches - logger.info("TargetLock.Audit.send: " - + "no errors from {}", server); - return; - } + @Override + public void response(Response response) { + // process the response here + AuditData respData = + AuditData.decode(response.readEntity(byte[].class)); + if (respData == null) { + logger.error("TargetLock.Audit.send: " + + "couldn't process response from {}", + server); + return; + } - // wait a few seconds, and see if we still know of these - // errors - logger.info("TargetLock.Audit.send: mismatches from {}", - server); - try { - Thread.sleep(auditRetryDelay); - } catch (InterruptedException e) { - logger.error("TargetLock.Audit.send: Interrupted " - + "handling audit response from {}", - server); - // just abort - Thread.currentThread().interrupt(); - return; - } + // if we reach this point, we got a response + if (respData.clientData.isEmpty() + && respData.serverData.isEmpty()) { + // no mismatches + logger.info("TargetLock.Audit.send: " + + "no errors from {}", server); + return; + } - // This will check against our own data -- any mismatches - // mean that things have changed since we sent out the - // first message. We will remove any mismatches from - // 'respData', and see if there are any left. - AuditData mismatches = respData.generateResponse(false); - - respData.serverData.removeAll(mismatches.clientData); - respData.clientData.removeAll(mismatches.serverData); - - if (respData.clientData.isEmpty() - && respData.serverData.isEmpty()) { - // no mismatches -- - // there must have been transient issues on our side - logger.info("TargetLock.Audit.send: no mismatches from " - + "{} after retry", server); - return; - } + // wait a few seconds, and see if we still know of these + // errors + if (responseSupport(respData, server, "AuditPostResponse.response")) { + // a return falue of 'true' either indicates the mismatches + // were resolved after a retry, or we received an interrupt, + // and need to abort + return; + } - // any mismatches left in 'respData' are still there -- - // hopefully, they are transient issues on the other side - AuditData auditData = new AuditData(); - auditData.clientData = respData.serverData; - auditData.serverData = respData.clientData; - - // serialize - byte[] encodedData = auditData.encode(); - if (encodedData == null) { - // error has already been displayed - return; - } + // any mismatches left in 'respData' are still there -- + // hopefully, they are transient issues on the other side + AuditData auditData = new AuditData(); + auditData.clientData = respData.serverData; + auditData.serverData = respData.clientData; - // generate entity - Entity<String> entity = - Entity.entity(new String(encodedData), - MediaType.APPLICATION_OCTET_STREAM_TYPE); - - // send new list to other end - response = server - .getWebTarget("lock/audit") - .queryParam("server", server.getUuid().toString()) - .queryParam("ttl", timeToLive) - .request().post(entity); - - respData = AuditData.decode(response.readEntity(byte[].class)); - if (respData == null) { - logger.error("TargetLock.auditDataBuilder.send: " - + "couldn't process response from {}", - server); - return; - } + // serialize + byte[] encodedData = auditData.encode(); + if (encodedData == null) { + // error has already been displayed + return; + } - // if there are mismatches left, they are presumably real - respData.processResponse(server); - } - }); + // generate entity + Entity<String> entity = + Entity.entity(new String(encodedData), + MediaType.APPLICATION_OCTET_STREAM_TYPE); + + // send new list to other end + response = server + .getWebTarget("lock/audit") + .queryParam(QP_SERVER, server.getUuid().toString()) + .queryParam(QP_TTL, timeToLive) + .request().post(entity); + + respData = AuditData.decode(response.readEntity(byte[].class)); + if (respData == null) { + logger.error("TargetLock.auditDataBuilder.send: " + + "couldn't process response from {}", + server); + return; + } + + // if there are mismatches left, they are presumably real + respData.processResponse(server); + } + + // Handle mismatches indicated by an audit response -- a return value of + // 'true' indicates that there were no mismatches after a retry, or + // we received an interrupt. In either case, the caller returns. + private static boolean responseSupport(AuditData respData, Object serverString, String caller) { + logger.info("{}: mismatches from {}", caller, serverString); + try { + Thread.sleep(auditRetryDelay); + } catch (InterruptedException e) { + logger.error("{}: Interrupted handling audit response from {}", + caller, serverString); + // just abort + Thread.currentThread().interrupt(); + return true; + } + + // This will check against our own data -- any mismatches + // mean that things have changed since we sent out the + // first message. We will remove any mismatches from + // 'respData', and see if there are any left. + AuditData mismatches = respData.generateResponse(false); + + respData.serverData.removeAll(mismatches.clientData); + respData.clientData.removeAll(mismatches.serverData); + + if (respData.clientData.isEmpty() + && respData.serverData.isEmpty()) { + // no mismatches -- + // there must have been transient issues on our side + logger.info("{}: no mismatches from {} after retry", + caller, serverString); + return true; } + + return false; } } } diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Util.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Util.java index 2ad0a401..66a9eac3 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Util.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Util.java @@ -43,6 +43,13 @@ public class Util { public static final Timer timer = new Timer("Server Pool Timer", true); /** + * Hide implicit public constructor. + */ + private Util() { + // everything here is static -- no instances of this class are created + } + + /** * Internally, UUID objects use two 'long' variables, and the default * comparison is signed, which means the order for the first and 16th digit * is: '89abcdef01234567', while the order for the rest is @@ -50,21 +57,18 @@ public class Util { * The following comparator uses the ordering '0123456789abcdef' for all * digits. */ - public static final Comparator<UUID> uuidComparator = - new Comparator<UUID>() { - public int compare(UUID u1, UUID u2) { - // compare most significant portion - int rval = Long.compareUnsigned(u1.getMostSignificantBits(), - u2.getMostSignificantBits()); - if (rval == 0) { - // most significant portion matches -- - // compare least significant portion - rval = Long.compareUnsigned(u1.getLeastSignificantBits(), - u2.getLeastSignificantBits()); - } - return rval; - } - }; + public static final Comparator<UUID> uuidComparator = (UUID u1, UUID u2) -> { + // compare most significant portion + int rval = Long.compareUnsigned(u1.getMostSignificantBits(), + u2.getMostSignificantBits()); + if (rval == 0) { + // most significant portion matches -- + // compare least significant portion + rval = Long.compareUnsigned(u1.getLeastSignificantBits(), + u2.getLeastSignificantBits()); + } + return rval; + }; /* ============================================================ */ @@ -104,7 +108,6 @@ public class Util { try { return IOUtils.toString(input, StandardCharsets.UTF_8); } catch (IOException e) { - // TODO Auto-generated catch block logger.error("Util.inputStreamToString error", e); return ""; } diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java index 295194d2..60e740c5 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java @@ -20,7 +20,6 @@ package org.onap.policy.drools.serverpool.persistence; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; @@ -73,6 +72,12 @@ import org.slf4j.LoggerFactory; public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { private static Logger logger = LoggerFactory.getLogger(Persistence.class); + // HTTP query parameters + private static final String QP_BUCKET = "bucket"; + private static final String QP_SESSION = "session"; + private static final String QP_COUNT = "count"; + private static final String QP_DEST = "dest"; + /***************************************/ /* 'PolicySessionFeatureApi' interface */ /***************************************/ @@ -209,7 +214,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { * @param bucket the bucket containing the 'GlobalLocks' adjunct * @param globalLocks the 'GlobalLocks' adjunct */ - private static void sendLockDataToBackups(Bucket bucket, GlobalLocks globalLocks) { + private static void sendLockDataToBackups(final Bucket bucket, final GlobalLocks globalLocks) { final int bucketNumber = bucket.getIndex(); SenderBucketData sbd = bucket.getAdjunct(SenderBucketData.class); int lockCount = 0; @@ -245,18 +250,15 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { for (final Server server : servers) { if (server != null) { // send out REST command - server.getThreadPool().execute(new Runnable() { - @Override - public void run() { - WebTarget webTarget = - server.getWebTarget("persistence/lock"); - if (webTarget != null) { - webTarget - .queryParam("bucket", bucketNumber) - .queryParam("count", count) - .queryParam("dest", server.getUuid()) - .request().post(entity); - } + server.getThreadPool().execute(() -> { + WebTarget webTarget = + server.getWebTarget("persistence/lock"); + if (webTarget != null) { + webTarget + .queryParam(QP_BUCKET, bucketNumber) + .queryParam(QP_COUNT, count) + .queryParam(QP_DEST, server.getUuid()) + .request().post(entity); } }); } @@ -339,21 +341,18 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { for (final Server server : servers) { if (server != null) { // send out REST command - server.getThreadPool().execute(new Runnable() { - @Override - public void run() { - WebTarget webTarget = - server.getWebTarget("persistence/session"); - if (webTarget != null) { - webTarget - .queryParam("bucket", - bucket.getIndex()) - .queryParam("session", - encodedSessionName) - .queryParam("count", count) - .queryParam("dest", server.getUuid()) - .request().post(entity); - } + server.getThreadPool().execute(() -> { + WebTarget webTarget = + server.getWebTarget("persistence/session"); + if (webTarget != null) { + webTarget + .queryParam(QP_BUCKET, + bucket.getIndex()) + .queryParam(QP_SESSION, + encodedSessionName) + .queryParam(QP_COUNT, count) + .queryParam(QP_DEST, server.getUuid()) + .request().post(entity); } }); } @@ -552,14 +551,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { * @return the associated 'SenderSessionBucketData' instance */ synchronized SenderSessionBucketData getSessionData(PolicySession session) { - // try to fetch the associated instance - SenderSessionBucketData rval = sessionData.get(session); - if (rval == null) { - // it doesn't exist, so create one - rval = new SenderSessionBucketData(); - sessionData.put(session, rval); - } - return rval; + return sessionData.computeIfAbsent(session, key -> new SenderSessionBucketData()); } /** @@ -596,6 +588,9 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { * bucket owner fails. */ public static class ReceiverBucketData { + static final String RESTORE_BUCKET_ERROR = + "Persistence.ReceiverBucketData.restoreBucket: "; + // maps session name into encoded data Map<String, ReceiverSessionBucketData> sessionData = new HashMap<>(); @@ -672,8 +667,31 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { // one entry for each Drools session being restored -- // indicates when the restore is complete (restore runs within // the Drools session thread) + List<CountDownLatch> sessionLatches = restoreBucket_droolsSessions(); + + // restore lock data + restoreBucket_locks(bucket); + + // wait for all of the sessions to update + try { + for (CountDownLatch sessionLatch : sessionLatches) { + if (!sessionLatch.await(10000L, TimeUnit.MILLISECONDS)) { + logger.error("{}: timed out waiting for session latch", + this); + } + } + } catch (InterruptedException e) { + logger.error("Exception in {}", this, e); + Thread.currentThread().interrupt(); + } + } + + private List<CountDownLatch> restoreBucket_droolsSessions() { List<CountDownLatch> sessionLatches = new LinkedList<>(); - for (String sessionName : sessionData.keySet()) { + for (Map.Entry<String, ReceiverSessionBucketData> entry : sessionData.entrySet()) { + String sessionName = entry.getKey(); + ReceiverSessionBucketData rsbd = entry.getValue(); + // [0]="<groupId>" [1]="<artifactId>", [2]="<sessionName>" String[] nameSegments = sessionName.split(":"); PolicySession policySession = null; @@ -693,7 +711,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { } if (policySession == null) { - logger.error("Persistence.ReceiverBucketData.restoreBucket: " + logger.error(RESTORE_BUCKET_ERROR + "Can't find PolicySession{}", sessionName); continue; } @@ -701,11 +719,10 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { Object obj = null; try { // deserialization needs to use the correct 'ClassLoader' - ReceiverSessionBucketData rsbd = sessionData.get(sessionName); obj = Util.deserialize(Base64.getDecoder().decode(rsbd.encodedSerializedData), policySession.getPolicyContainer().getClassLoader()); } catch (IOException | ClassNotFoundException | IllegalArgumentException e) { - logger.error("Persistence.ReceiverBucketData.restoreBucket: " + logger.error(RESTORE_BUCKET_ERROR + "Failed to read data for session '{}'", sessionName, e); @@ -714,7 +731,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { } if (!(obj instanceof Map)) { - logger.error("Persistence.ReceiverBucketData.restoreBucket: " + logger.error(RESTORE_BUCKET_ERROR + "Session '{}' data has class {}, expected 'Map'", sessionName, obj.getClass().getName()); @@ -733,29 +750,26 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { final KieSession kieSession = policySession.getKieSession(); // run the following within the Drools session thread - kieSession.insert(new DroolsRunnable() { - /** - * {@inheritDoc} - */ - @Override - public void run() { - try { - // insert all of the Drools objects into the session - for (Object obj : droolsObjects.keySet()) { - kieSession.insert(obj); - } - } finally { - // signal completion - sessionLatch.countDown(); + DroolsRunnable insertDroolsObjects = () -> { + try { + // insert all of the Drools objects into the session + for (Object droolsObj : droolsObjects.keySet()) { + kieSession.insert(droolsObj); } + } finally { + // signal completion + sessionLatch.countDown(); } - }); + }; + kieSession.insert(insertDroolsObjects); // add this to the set of 'CountDownLatch's we are waiting for sessionLatches.add(sessionLatch); } + return sessionLatches; + } - // restore lock data + private void restoreBucket_locks(Bucket bucket) { if (lockData != null) { Object obj = null; try { @@ -767,30 +781,17 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { // send out updated date sendLockDataToBackups(bucket, (GlobalLocks)obj); } else { - logger.error("Persistence.ReceiverBucketData.restoreBucket: " + logger.error(RESTORE_BUCKET_ERROR + "Expected 'GlobalLocks', got '{}'", obj.getClass().getName()); } } catch (IOException | ClassNotFoundException | IllegalArgumentException e) { - logger.error("Persistence.ReceiverBucketData.restoreBucket: " + logger.error(RESTORE_BUCKET_ERROR + "Failed to read lock data", e); // skip the lock data } } - - // wait for all of the sessions to update - try { - for (CountDownLatch sessionLatch : sessionLatches) { - if (!sessionLatch.await(10000L, TimeUnit.MILLISECONDS)) { - logger.error("{}: timed out waiting for session latch", - this); - } - } - } catch (InterruptedException e) { - logger.error("Exception in {}", this, e); - Thread.currentThread().interrupt(); - } } } @@ -804,10 +805,10 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { @POST @Path("/persistence/session") @Consumes(MediaType.APPLICATION_OCTET_STREAM) - public void receiveSession(@QueryParam("bucket") int bucket, - @QueryParam("session") String sessionName, - @QueryParam("count") int count, - @QueryParam("dest") UUID dest, + public void receiveSession(@QueryParam(QP_BUCKET) int bucket, + @QueryParam(QP_SESSION) String sessionName, + @QueryParam(QP_COUNT) int count, + @QueryParam(QP_DEST) UUID dest, byte[] data) { logger.debug("/persistence/session: (bucket={},session={},count={}) " + "got {} bytes of data", @@ -829,9 +830,9 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { Entity.entity(new String(data), MediaType.APPLICATION_OCTET_STREAM_TYPE); webTarget - .queryParam("bucket", bucket) - .queryParam("session", sessionName) - .queryParam("count", count) + .queryParam(QP_BUCKET, bucket) + .queryParam(QP_SESSION, sessionName) + .queryParam(QP_COUNT, count) .request().post(entity); } } @@ -843,9 +844,9 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { @POST @Path("/persistence/lock") @Consumes(MediaType.APPLICATION_OCTET_STREAM) - public void receiveLockData(@QueryParam("bucket") int bucket, - @QueryParam("count") int count, - @QueryParam("dest") UUID dest, + public void receiveLockData(@QueryParam(QP_BUCKET) int bucket, + @QueryParam(QP_COUNT) int count, + @QueryParam(QP_DEST) UUID dest, byte[] data) { logger.debug("/persistence/lock: (bucket={},count={}) " + "got {} bytes of data", bucket, count, data.length); @@ -865,8 +866,8 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { Entity.entity(new String(data), MediaType.APPLICATION_OCTET_STREAM_TYPE); webTarget - .queryParam("bucket", bucket) - .queryParam("count", count) + .queryParam(QP_BUCKET, bucket) + .queryParam(QP_COUNT, count) .request().post(entity); } } |