diff options
Diffstat (limited to 'feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java')
-rw-r--r-- | feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java | 171 |
1 files changed, 86 insertions, 85 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java index 295194d2..60e740c5 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java @@ -20,7 +20,6 @@ package org.onap.policy.drools.serverpool.persistence; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; @@ -73,6 +72,12 @@ import org.slf4j.LoggerFactory; public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { private static Logger logger = LoggerFactory.getLogger(Persistence.class); + // HTTP query parameters + private static final String QP_BUCKET = "bucket"; + private static final String QP_SESSION = "session"; + private static final String QP_COUNT = "count"; + private static final String QP_DEST = "dest"; + /***************************************/ /* 'PolicySessionFeatureApi' interface */ /***************************************/ @@ -209,7 +214,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { * @param bucket the bucket containing the 'GlobalLocks' adjunct * @param globalLocks the 'GlobalLocks' adjunct */ - private static void sendLockDataToBackups(Bucket bucket, GlobalLocks globalLocks) { + private static void sendLockDataToBackups(final Bucket bucket, final GlobalLocks globalLocks) { final int bucketNumber = bucket.getIndex(); SenderBucketData sbd = bucket.getAdjunct(SenderBucketData.class); int lockCount = 0; @@ -245,18 +250,15 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { for (final Server server : servers) { if (server != null) { // send out REST command - server.getThreadPool().execute(new Runnable() { - @Override - public void run() { - WebTarget webTarget = - server.getWebTarget("persistence/lock"); - if (webTarget != null) { - webTarget - .queryParam("bucket", bucketNumber) - .queryParam("count", count) - .queryParam("dest", server.getUuid()) - .request().post(entity); - } + server.getThreadPool().execute(() -> { + WebTarget webTarget = + server.getWebTarget("persistence/lock"); + if (webTarget != null) { + webTarget + .queryParam(QP_BUCKET, bucketNumber) + .queryParam(QP_COUNT, count) + .queryParam(QP_DEST, server.getUuid()) + .request().post(entity); } }); } @@ -339,21 +341,18 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { for (final Server server : servers) { if (server != null) { // send out REST command - server.getThreadPool().execute(new Runnable() { - @Override - public void run() { - WebTarget webTarget = - server.getWebTarget("persistence/session"); - if (webTarget != null) { - webTarget - .queryParam("bucket", - bucket.getIndex()) - .queryParam("session", - encodedSessionName) - .queryParam("count", count) - .queryParam("dest", server.getUuid()) - .request().post(entity); - } + server.getThreadPool().execute(() -> { + WebTarget webTarget = + server.getWebTarget("persistence/session"); + if (webTarget != null) { + webTarget + .queryParam(QP_BUCKET, + bucket.getIndex()) + .queryParam(QP_SESSION, + encodedSessionName) + .queryParam(QP_COUNT, count) + .queryParam(QP_DEST, server.getUuid()) + .request().post(entity); } }); } @@ -552,14 +551,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { * @return the associated 'SenderSessionBucketData' instance */ synchronized SenderSessionBucketData getSessionData(PolicySession session) { - // try to fetch the associated instance - SenderSessionBucketData rval = sessionData.get(session); - if (rval == null) { - // it doesn't exist, so create one - rval = new SenderSessionBucketData(); - sessionData.put(session, rval); - } - return rval; + return sessionData.computeIfAbsent(session, key -> new SenderSessionBucketData()); } /** @@ -596,6 +588,9 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { * bucket owner fails. */ public static class ReceiverBucketData { + static final String RESTORE_BUCKET_ERROR = + "Persistence.ReceiverBucketData.restoreBucket: "; + // maps session name into encoded data Map<String, ReceiverSessionBucketData> sessionData = new HashMap<>(); @@ -672,8 +667,31 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { // one entry for each Drools session being restored -- // indicates when the restore is complete (restore runs within // the Drools session thread) + List<CountDownLatch> sessionLatches = restoreBucket_droolsSessions(); + + // restore lock data + restoreBucket_locks(bucket); + + // wait for all of the sessions to update + try { + for (CountDownLatch sessionLatch : sessionLatches) { + if (!sessionLatch.await(10000L, TimeUnit.MILLISECONDS)) { + logger.error("{}: timed out waiting for session latch", + this); + } + } + } catch (InterruptedException e) { + logger.error("Exception in {}", this, e); + Thread.currentThread().interrupt(); + } + } + + private List<CountDownLatch> restoreBucket_droolsSessions() { List<CountDownLatch> sessionLatches = new LinkedList<>(); - for (String sessionName : sessionData.keySet()) { + for (Map.Entry<String, ReceiverSessionBucketData> entry : sessionData.entrySet()) { + String sessionName = entry.getKey(); + ReceiverSessionBucketData rsbd = entry.getValue(); + // [0]="<groupId>" [1]="<artifactId>", [2]="<sessionName>" String[] nameSegments = sessionName.split(":"); PolicySession policySession = null; @@ -693,7 +711,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { } if (policySession == null) { - logger.error("Persistence.ReceiverBucketData.restoreBucket: " + logger.error(RESTORE_BUCKET_ERROR + "Can't find PolicySession{}", sessionName); continue; } @@ -701,11 +719,10 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { Object obj = null; try { // deserialization needs to use the correct 'ClassLoader' - ReceiverSessionBucketData rsbd = sessionData.get(sessionName); obj = Util.deserialize(Base64.getDecoder().decode(rsbd.encodedSerializedData), policySession.getPolicyContainer().getClassLoader()); } catch (IOException | ClassNotFoundException | IllegalArgumentException e) { - logger.error("Persistence.ReceiverBucketData.restoreBucket: " + logger.error(RESTORE_BUCKET_ERROR + "Failed to read data for session '{}'", sessionName, e); @@ -714,7 +731,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { } if (!(obj instanceof Map)) { - logger.error("Persistence.ReceiverBucketData.restoreBucket: " + logger.error(RESTORE_BUCKET_ERROR + "Session '{}' data has class {}, expected 'Map'", sessionName, obj.getClass().getName()); @@ -733,29 +750,26 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { final KieSession kieSession = policySession.getKieSession(); // run the following within the Drools session thread - kieSession.insert(new DroolsRunnable() { - /** - * {@inheritDoc} - */ - @Override - public void run() { - try { - // insert all of the Drools objects into the session - for (Object obj : droolsObjects.keySet()) { - kieSession.insert(obj); - } - } finally { - // signal completion - sessionLatch.countDown(); + DroolsRunnable insertDroolsObjects = () -> { + try { + // insert all of the Drools objects into the session + for (Object droolsObj : droolsObjects.keySet()) { + kieSession.insert(droolsObj); } + } finally { + // signal completion + sessionLatch.countDown(); } - }); + }; + kieSession.insert(insertDroolsObjects); // add this to the set of 'CountDownLatch's we are waiting for sessionLatches.add(sessionLatch); } + return sessionLatches; + } - // restore lock data + private void restoreBucket_locks(Bucket bucket) { if (lockData != null) { Object obj = null; try { @@ -767,30 +781,17 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { // send out updated date sendLockDataToBackups(bucket, (GlobalLocks)obj); } else { - logger.error("Persistence.ReceiverBucketData.restoreBucket: " + logger.error(RESTORE_BUCKET_ERROR + "Expected 'GlobalLocks', got '{}'", obj.getClass().getName()); } } catch (IOException | ClassNotFoundException | IllegalArgumentException e) { - logger.error("Persistence.ReceiverBucketData.restoreBucket: " + logger.error(RESTORE_BUCKET_ERROR + "Failed to read lock data", e); // skip the lock data } } - - // wait for all of the sessions to update - try { - for (CountDownLatch sessionLatch : sessionLatches) { - if (!sessionLatch.await(10000L, TimeUnit.MILLISECONDS)) { - logger.error("{}: timed out waiting for session latch", - this); - } - } - } catch (InterruptedException e) { - logger.error("Exception in {}", this, e); - Thread.currentThread().interrupt(); - } } } @@ -804,10 +805,10 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { @POST @Path("/persistence/session") @Consumes(MediaType.APPLICATION_OCTET_STREAM) - public void receiveSession(@QueryParam("bucket") int bucket, - @QueryParam("session") String sessionName, - @QueryParam("count") int count, - @QueryParam("dest") UUID dest, + public void receiveSession(@QueryParam(QP_BUCKET) int bucket, + @QueryParam(QP_SESSION) String sessionName, + @QueryParam(QP_COUNT) int count, + @QueryParam(QP_DEST) UUID dest, byte[] data) { logger.debug("/persistence/session: (bucket={},session={},count={}) " + "got {} bytes of data", @@ -829,9 +830,9 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { Entity.entity(new String(data), MediaType.APPLICATION_OCTET_STREAM_TYPE); webTarget - .queryParam("bucket", bucket) - .queryParam("session", sessionName) - .queryParam("count", count) + .queryParam(QP_BUCKET, bucket) + .queryParam(QP_SESSION, sessionName) + .queryParam(QP_COUNT, count) .request().post(entity); } } @@ -843,9 +844,9 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { @POST @Path("/persistence/lock") @Consumes(MediaType.APPLICATION_OCTET_STREAM) - public void receiveLockData(@QueryParam("bucket") int bucket, - @QueryParam("count") int count, - @QueryParam("dest") UUID dest, + public void receiveLockData(@QueryParam(QP_BUCKET) int bucket, + @QueryParam(QP_COUNT) int count, + @QueryParam(QP_DEST) UUID dest, byte[] data) { logger.debug("/persistence/lock: (bucket={},count={}) " + "got {} bytes of data", bucket, count, data.length); @@ -865,8 +866,8 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { Entity.entity(new String(data), MediaType.APPLICATION_OCTET_STREAM_TYPE); webTarget - .queryParam("bucket", bucket) - .queryParam("count", count) + .queryParam(QP_BUCKET, bucket) + .queryParam(QP_COUNT, count) .request().post(entity); } } |