aboutsummaryrefslogtreecommitdiffstats
path: root/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java')
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java142
1 files changed, 83 insertions, 59 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java
index 48bd1c1a..e8121f3a 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java
@@ -245,6 +245,11 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
servers.add(bucket.getPrimaryBackup());
servers.add(bucket.getSecondaryBackup());
}
+ sendLocksToBackupServers(bucketNumber, entity, count, servers);
+ }
+
+ private static void sendLocksToBackupServers(final int bucketNumber, final Entity<String> entity, final int count,
+ Set<Server> servers) {
for (final Server server : servers) {
if (server != null) {
// send out REST command
@@ -336,25 +341,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
servers.add(bucket.getPrimaryBackup());
servers.add(bucket.getSecondaryBackup());
}
- for (final Server server : servers) {
- if (server != null) {
- // send out REST command
- server.getThreadPool().execute(() -> {
- WebTarget webTarget =
- server.getWebTarget("persistence/session");
- if (webTarget != null) {
- webTarget
- .queryParam(QP_BUCKET,
- bucket.getIndex())
- .queryParam(QP_SESSION,
- encodedSessionName)
- .queryParam(QP_COUNT, count)
- .queryParam(QP_DEST, server.getUuid())
- .request().post(entity);
- }
- });
- }
- }
+ sendBucketToBackupServers(bucket, count, entity, servers);
}
}
} catch (Exception e) {
@@ -362,6 +349,30 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
}
}
+ private void sendBucketToBackupServers(Bucket bucket, final int count, final Entity<String> entity,
+ Set<Server> servers) {
+
+ for (final Server server : servers) {
+ if (server != null) {
+ // send out REST command
+ server.getThreadPool().execute(() -> {
+ WebTarget webTarget =
+ server.getWebTarget("persistence/session");
+ if (webTarget != null) {
+ webTarget
+ .queryParam(QP_BUCKET,
+ bucket.getIndex())
+ .queryParam(QP_SESSION,
+ encodedSessionName)
+ .queryParam(QP_COUNT, count)
+ .queryParam(QP_DEST, server.getUuid())
+ .request().post(entity);
+ }
+ });
+ }
+ }
+ }
+
/* ************************************** */
/* 'RuleRuntimeEventListener' interface */
/* ************************************** */
@@ -690,57 +701,20 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
String sessionName = entry.getKey();
ReceiverSessionBucketData rsbd = entry.getValue();
- // [0]="<groupId>" [1]="<artifactId>", [2]="<sessionName>"
- String[] nameSegments = sessionName.split(":");
- PolicySession policySession = null;
-
- // locate the 'PolicyContainer' and 'PolicySession'
- if (nameSegments.length == 3) {
- // step through all 'PolicyContainer' instances looking
- // for a matching 'artifactId' & 'groupId'
- for (PolicyContainer pc : PolicyContainer.getPolicyContainers()) {
- if (nameSegments[1].equals(pc.getArtifactId())
- && nameSegments[0].equals(pc.getGroupId())) {
- // 'PolicyContainer' matches -- try to fetch the session
- policySession = pc.getPolicySession(nameSegments[2]);
- break;
- }
- }
- }
-
+ PolicySession policySession = detmPolicySession(sessionName);
if (policySession == null) {
logger.error(RESTORE_BUCKET_ERROR
+ "Can't find PolicySession{}", sessionName);
continue;
}
- Object obj = null;
- try {
- // deserialization needs to use the correct 'ClassLoader'
- obj = Util.deserialize(Base64.getDecoder().decode(rsbd.encodedSerializedData),
- policySession.getPolicyContainer().getClassLoader());
- } catch (IOException | ClassNotFoundException | IllegalArgumentException e) {
- logger.error(RESTORE_BUCKET_ERROR
- + "Failed to read data for session '{}'",
- sessionName, e);
-
- // can't decode -- skip this session
- continue;
- }
-
- if (!(obj instanceof Map)) {
- logger.error(RESTORE_BUCKET_ERROR
- + "Session '{}' data has class {}, expected 'Map'",
- sessionName, obj.getClass().getName());
-
- // wrong object type decoded -- skip this session
+ final Map<?, ?> droolsObjects = deserializeMap(sessionName, rsbd, policySession);
+ if (droolsObjects == null) {
continue;
}
// if we reach this point, we have decoded the persistent data
- final Map<?, ?> droolsObjects = (Map<?, ?>) obj;
-
// signal when restore is complete
final CountDownLatch sessionLatch = new CountDownLatch(1);
@@ -767,6 +741,56 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
return sessionLatches;
}
+ private PolicySession detmPolicySession(String sessionName) {
+ // [0]="<groupId>" [1]="<artifactId>", [2]="<sessionName>"
+ String[] nameSegments = sessionName.split(":");
+
+ // locate the 'PolicyContainer' and 'PolicySession'
+ if (nameSegments.length == 3) {
+ // step through all 'PolicyContainer' instances looking
+ // for a matching 'artifactId' & 'groupId'
+ for (PolicyContainer pc : PolicyContainer.getPolicyContainers()) {
+ if (nameSegments[1].equals(pc.getArtifactId())
+ && nameSegments[0].equals(pc.getGroupId())) {
+ // 'PolicyContainer' matches -- try to fetch the session
+ return pc.getPolicySession(nameSegments[2]);
+ }
+ }
+ }
+ return null;
+ }
+
+ private Map<?, ?> deserializeMap(String sessionName, ReceiverSessionBucketData rsbd,
+ PolicySession policySession) {
+ Object obj;
+
+ try {
+ // deserialization needs to use the correct 'ClassLoader'
+ obj = Util.deserialize(Base64.getDecoder().decode(rsbd.encodedSerializedData),
+ policySession.getPolicyContainer().getClassLoader());
+ } catch (IOException | ClassNotFoundException | IllegalArgumentException e) {
+ logger.error(RESTORE_BUCKET_ERROR
+ + "Failed to read data for session '{}'",
+ sessionName, e);
+
+ // can't decode -- skip this session
+ return null;
+ }
+
+ if (!(obj instanceof Map)) {
+ logger.error(RESTORE_BUCKET_ERROR
+ + "Session '{}' data has class {}, expected 'Map'",
+ sessionName, obj.getClass().getName());
+
+ // wrong object type decoded -- skip this session
+ return null;
+ }
+
+ // if we reach this point, we have decoded the persistent data
+
+ return (Map<?, ?>) obj;
+ }
+
private void restoreBucketLocks(Bucket bucket) {
if (lockData != null) {
Object obj = null;