diff options
Diffstat (limited to 'feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java')
-rw-r--r-- | feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java | 142 |
1 files changed, 83 insertions, 59 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java index 48bd1c1a..e8121f3a 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java @@ -245,6 +245,11 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { servers.add(bucket.getPrimaryBackup()); servers.add(bucket.getSecondaryBackup()); } + sendLocksToBackupServers(bucketNumber, entity, count, servers); + } + + private static void sendLocksToBackupServers(final int bucketNumber, final Entity<String> entity, final int count, + Set<Server> servers) { for (final Server server : servers) { if (server != null) { // send out REST command @@ -336,25 +341,7 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { servers.add(bucket.getPrimaryBackup()); servers.add(bucket.getSecondaryBackup()); } - for (final Server server : servers) { - if (server != null) { - // send out REST command - server.getThreadPool().execute(() -> { - WebTarget webTarget = - server.getWebTarget("persistence/session"); - if (webTarget != null) { - webTarget - .queryParam(QP_BUCKET, - bucket.getIndex()) - .queryParam(QP_SESSION, - encodedSessionName) - .queryParam(QP_COUNT, count) - .queryParam(QP_DEST, server.getUuid()) - .request().post(entity); - } - }); - } - } + sendBucketToBackupServers(bucket, count, entity, servers); } } } catch (Exception e) { @@ -362,6 +349,30 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { } } + private void sendBucketToBackupServers(Bucket bucket, final int count, final Entity<String> entity, + Set<Server> servers) { + + for (final Server server : servers) { + if (server != null) { + // send out REST command + server.getThreadPool().execute(() -> { + WebTarget webTarget = + server.getWebTarget("persistence/session"); + if (webTarget != null) { + webTarget + .queryParam(QP_BUCKET, + bucket.getIndex()) + .queryParam(QP_SESSION, + encodedSessionName) + .queryParam(QP_COUNT, count) + .queryParam(QP_DEST, server.getUuid()) + .request().post(entity); + } + }); + } + } + } + /* ************************************** */ /* 'RuleRuntimeEventListener' interface */ /* ************************************** */ @@ -690,57 +701,20 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { String sessionName = entry.getKey(); ReceiverSessionBucketData rsbd = entry.getValue(); - // [0]="<groupId>" [1]="<artifactId>", [2]="<sessionName>" - String[] nameSegments = sessionName.split(":"); - PolicySession policySession = null; - - // locate the 'PolicyContainer' and 'PolicySession' - if (nameSegments.length == 3) { - // step through all 'PolicyContainer' instances looking - // for a matching 'artifactId' & 'groupId' - for (PolicyContainer pc : PolicyContainer.getPolicyContainers()) { - if (nameSegments[1].equals(pc.getArtifactId()) - && nameSegments[0].equals(pc.getGroupId())) { - // 'PolicyContainer' matches -- try to fetch the session - policySession = pc.getPolicySession(nameSegments[2]); - break; - } - } - } - + PolicySession policySession = detmPolicySession(sessionName); if (policySession == null) { logger.error(RESTORE_BUCKET_ERROR + "Can't find PolicySession{}", sessionName); continue; } - Object obj = null; - try { - // deserialization needs to use the correct 'ClassLoader' - obj = Util.deserialize(Base64.getDecoder().decode(rsbd.encodedSerializedData), - policySession.getPolicyContainer().getClassLoader()); - } catch (IOException | ClassNotFoundException | IllegalArgumentException e) { - logger.error(RESTORE_BUCKET_ERROR - + "Failed to read data for session '{}'", - sessionName, e); - - // can't decode -- skip this session - continue; - } - - if (!(obj instanceof Map)) { - logger.error(RESTORE_BUCKET_ERROR - + "Session '{}' data has class {}, expected 'Map'", - sessionName, obj.getClass().getName()); - - // wrong object type decoded -- skip this session + final Map<?, ?> droolsObjects = deserializeMap(sessionName, rsbd, policySession); + if (droolsObjects == null) { continue; } // if we reach this point, we have decoded the persistent data - final Map<?, ?> droolsObjects = (Map<?, ?>) obj; - // signal when restore is complete final CountDownLatch sessionLatch = new CountDownLatch(1); @@ -767,6 +741,56 @@ public class Persistence implements PolicySessionFeatureApi, ServerPoolApi { return sessionLatches; } + private PolicySession detmPolicySession(String sessionName) { + // [0]="<groupId>" [1]="<artifactId>", [2]="<sessionName>" + String[] nameSegments = sessionName.split(":"); + + // locate the 'PolicyContainer' and 'PolicySession' + if (nameSegments.length == 3) { + // step through all 'PolicyContainer' instances looking + // for a matching 'artifactId' & 'groupId' + for (PolicyContainer pc : PolicyContainer.getPolicyContainers()) { + if (nameSegments[1].equals(pc.getArtifactId()) + && nameSegments[0].equals(pc.getGroupId())) { + // 'PolicyContainer' matches -- try to fetch the session + return pc.getPolicySession(nameSegments[2]); + } + } + } + return null; + } + + private Map<?, ?> deserializeMap(String sessionName, ReceiverSessionBucketData rsbd, + PolicySession policySession) { + Object obj; + + try { + // deserialization needs to use the correct 'ClassLoader' + obj = Util.deserialize(Base64.getDecoder().decode(rsbd.encodedSerializedData), + policySession.getPolicyContainer().getClassLoader()); + } catch (IOException | ClassNotFoundException | IllegalArgumentException e) { + logger.error(RESTORE_BUCKET_ERROR + + "Failed to read data for session '{}'", + sessionName, e); + + // can't decode -- skip this session + return null; + } + + if (!(obj instanceof Map)) { + logger.error(RESTORE_BUCKET_ERROR + + "Session '{}' data has class {}, expected 'Map'", + sessionName, obj.getClass().getName()); + + // wrong object type decoded -- skip this session + return null; + } + + // if we reach this point, we have decoded the persistent data + + return (Map<?, ?>) obj; + } + private void restoreBucketLocks(Bucket bucket) { if (lockData != null) { Object obj = null; |