aboutsummaryrefslogtreecommitdiffstats
path: root/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java')
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java171
1 files changed, 86 insertions, 85 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java
index 295194d2..60e740c5 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java
@@ -20,7 +20,6 @@
package org.onap.policy.drools.serverpool.persistence;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
@@ -73,6 +72,12 @@ import org.slf4j.LoggerFactory;
public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
private static Logger logger = LoggerFactory.getLogger(Persistence.class);
+ // HTTP query parameters
+ private static final String QP_BUCKET = "bucket";
+ private static final String QP_SESSION = "session";
+ private static final String QP_COUNT = "count";
+ private static final String QP_DEST = "dest";
+
/***************************************/
/* 'PolicySessionFeatureApi' interface */
/***************************************/
@@ -209,7 +214,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
* @param bucket the bucket containing the 'GlobalLocks' adjunct
* @param globalLocks the 'GlobalLocks' adjunct
*/
- private static void sendLockDataToBackups(Bucket bucket, GlobalLocks globalLocks) {
+ private static void sendLockDataToBackups(final Bucket bucket, final GlobalLocks globalLocks) {
final int bucketNumber = bucket.getIndex();
SenderBucketData sbd = bucket.getAdjunct(SenderBucketData.class);
int lockCount = 0;
@@ -245,18 +250,15 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
for (final Server server : servers) {
if (server != null) {
// send out REST command
- server.getThreadPool().execute(new Runnable() {
- @Override
- public void run() {
- WebTarget webTarget =
- server.getWebTarget("persistence/lock");
- if (webTarget != null) {
- webTarget
- .queryParam("bucket", bucketNumber)
- .queryParam("count", count)
- .queryParam("dest", server.getUuid())
- .request().post(entity);
- }
+ server.getThreadPool().execute(() -> {
+ WebTarget webTarget =
+ server.getWebTarget("persistence/lock");
+ if (webTarget != null) {
+ webTarget
+ .queryParam(QP_BUCKET, bucketNumber)
+ .queryParam(QP_COUNT, count)
+ .queryParam(QP_DEST, server.getUuid())
+ .request().post(entity);
}
});
}
@@ -339,21 +341,18 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
for (final Server server : servers) {
if (server != null) {
// send out REST command
- server.getThreadPool().execute(new Runnable() {
- @Override
- public void run() {
- WebTarget webTarget =
- server.getWebTarget("persistence/session");
- if (webTarget != null) {
- webTarget
- .queryParam("bucket",
- bucket.getIndex())
- .queryParam("session",
- encodedSessionName)
- .queryParam("count", count)
- .queryParam("dest", server.getUuid())
- .request().post(entity);
- }
+ server.getThreadPool().execute(() -> {
+ WebTarget webTarget =
+ server.getWebTarget("persistence/session");
+ if (webTarget != null) {
+ webTarget
+ .queryParam(QP_BUCKET,
+ bucket.getIndex())
+ .queryParam(QP_SESSION,
+ encodedSessionName)
+ .queryParam(QP_COUNT, count)
+ .queryParam(QP_DEST, server.getUuid())
+ .request().post(entity);
}
});
}
@@ -552,14 +551,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
* @return the associated 'SenderSessionBucketData' instance
*/
synchronized SenderSessionBucketData getSessionData(PolicySession session) {
- // try to fetch the associated instance
- SenderSessionBucketData rval = sessionData.get(session);
- if (rval == null) {
- // it doesn't exist, so create one
- rval = new SenderSessionBucketData();
- sessionData.put(session, rval);
- }
- return rval;
+ return sessionData.computeIfAbsent(session, key -> new SenderSessionBucketData());
}
/**
@@ -596,6 +588,9 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
* bucket owner fails.
*/
public static class ReceiverBucketData {
+ static final String RESTORE_BUCKET_ERROR =
+ "Persistence.ReceiverBucketData.restoreBucket: ";
+
// maps session name into encoded data
Map<String, ReceiverSessionBucketData> sessionData = new HashMap<>();
@@ -672,8 +667,31 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
// one entry for each Drools session being restored --
// indicates when the restore is complete (restore runs within
// the Drools session thread)
+ List<CountDownLatch> sessionLatches = restoreBucket_droolsSessions();
+
+ // restore lock data
+ restoreBucket_locks(bucket);
+
+ // wait for all of the sessions to update
+ try {
+ for (CountDownLatch sessionLatch : sessionLatches) {
+ if (!sessionLatch.await(10000L, TimeUnit.MILLISECONDS)) {
+ logger.error("{}: timed out waiting for session latch",
+ this);
+ }
+ }
+ } catch (InterruptedException e) {
+ logger.error("Exception in {}", this, e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private List<CountDownLatch> restoreBucket_droolsSessions() {
List<CountDownLatch> sessionLatches = new LinkedList<>();
- for (String sessionName : sessionData.keySet()) {
+ for (Map.Entry<String, ReceiverSessionBucketData> entry : sessionData.entrySet()) {
+ String sessionName = entry.getKey();
+ ReceiverSessionBucketData rsbd = entry.getValue();
+
// [0]="<groupId>" [1]="<artifactId>", [2]="<sessionName>"
String[] nameSegments = sessionName.split(":");
PolicySession policySession = null;
@@ -693,7 +711,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
}
if (policySession == null) {
- logger.error("Persistence.ReceiverBucketData.restoreBucket: "
+ logger.error(RESTORE_BUCKET_ERROR
+ "Can't find PolicySession{}", sessionName);
continue;
}
@@ -701,11 +719,10 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
Object obj = null;
try {
// deserialization needs to use the correct 'ClassLoader'
- ReceiverSessionBucketData rsbd = sessionData.get(sessionName);
obj = Util.deserialize(Base64.getDecoder().decode(rsbd.encodedSerializedData),
policySession.getPolicyContainer().getClassLoader());
} catch (IOException | ClassNotFoundException | IllegalArgumentException e) {
- logger.error("Persistence.ReceiverBucketData.restoreBucket: "
+ logger.error(RESTORE_BUCKET_ERROR
+ "Failed to read data for session '{}'",
sessionName, e);
@@ -714,7 +731,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
}
if (!(obj instanceof Map)) {
- logger.error("Persistence.ReceiverBucketData.restoreBucket: "
+ logger.error(RESTORE_BUCKET_ERROR
+ "Session '{}' data has class {}, expected 'Map'",
sessionName, obj.getClass().getName());
@@ -733,29 +750,26 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
final KieSession kieSession = policySession.getKieSession();
// run the following within the Drools session thread
- kieSession.insert(new DroolsRunnable() {
- /**
- * {@inheritDoc}
- */
- @Override
- public void run() {
- try {
- // insert all of the Drools objects into the session
- for (Object obj : droolsObjects.keySet()) {
- kieSession.insert(obj);
- }
- } finally {
- // signal completion
- sessionLatch.countDown();
+ DroolsRunnable insertDroolsObjects = () -> {
+ try {
+ // insert all of the Drools objects into the session
+ for (Object droolsObj : droolsObjects.keySet()) {
+ kieSession.insert(droolsObj);
}
+ } finally {
+ // signal completion
+ sessionLatch.countDown();
}
- });
+ };
+ kieSession.insert(insertDroolsObjects);
// add this to the set of 'CountDownLatch's we are waiting for
sessionLatches.add(sessionLatch);
}
+ return sessionLatches;
+ }
- // restore lock data
+ private void restoreBucket_locks(Bucket bucket) {
if (lockData != null) {
Object obj = null;
try {
@@ -767,30 +781,17 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
// send out updated date
sendLockDataToBackups(bucket, (GlobalLocks)obj);
} else {
- logger.error("Persistence.ReceiverBucketData.restoreBucket: "
+ logger.error(RESTORE_BUCKET_ERROR
+ "Expected 'GlobalLocks', got '{}'",
obj.getClass().getName());
}
} catch (IOException | ClassNotFoundException | IllegalArgumentException e) {
- logger.error("Persistence.ReceiverBucketData.restoreBucket: "
+ logger.error(RESTORE_BUCKET_ERROR
+ "Failed to read lock data", e);
// skip the lock data
}
}
-
- // wait for all of the sessions to update
- try {
- for (CountDownLatch sessionLatch : sessionLatches) {
- if (!sessionLatch.await(10000L, TimeUnit.MILLISECONDS)) {
- logger.error("{}: timed out waiting for session latch",
- this);
- }
- }
- } catch (InterruptedException e) {
- logger.error("Exception in {}", this, e);
- Thread.currentThread().interrupt();
- }
}
}
@@ -804,10 +805,10 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
@POST
@Path("/persistence/session")
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
- public void receiveSession(@QueryParam("bucket") int bucket,
- @QueryParam("session") String sessionName,
- @QueryParam("count") int count,
- @QueryParam("dest") UUID dest,
+ public void receiveSession(@QueryParam(QP_BUCKET) int bucket,
+ @QueryParam(QP_SESSION) String sessionName,
+ @QueryParam(QP_COUNT) int count,
+ @QueryParam(QP_DEST) UUID dest,
byte[] data) {
logger.debug("/persistence/session: (bucket={},session={},count={}) "
+ "got {} bytes of data",
@@ -829,9 +830,9 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
Entity.entity(new String(data),
MediaType.APPLICATION_OCTET_STREAM_TYPE);
webTarget
- .queryParam("bucket", bucket)
- .queryParam("session", sessionName)
- .queryParam("count", count)
+ .queryParam(QP_BUCKET, bucket)
+ .queryParam(QP_SESSION, sessionName)
+ .queryParam(QP_COUNT, count)
.request().post(entity);
}
}
@@ -843,9 +844,9 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
@POST
@Path("/persistence/lock")
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
- public void receiveLockData(@QueryParam("bucket") int bucket,
- @QueryParam("count") int count,
- @QueryParam("dest") UUID dest,
+ public void receiveLockData(@QueryParam(QP_BUCKET) int bucket,
+ @QueryParam(QP_COUNT) int count,
+ @QueryParam(QP_DEST) UUID dest,
byte[] data) {
logger.debug("/persistence/lock: (bucket={},count={}) "
+ "got {} bytes of data", bucket, count, data.length);
@@ -865,8 +866,8 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
Entity.entity(new String(data),
MediaType.APPLICATION_OCTET_STREAM_TYPE);
webTarget
- .queryParam("bucket", bucket)
- .queryParam("count", count)
+ .queryParam(QP_BUCKET, bucket)
+ .queryParam(QP_COUNT, count)
.request().post(entity);
}
}