diff options
Diffstat (limited to 'mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java')
-rw-r--r-- | mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java | 135 |
1 files changed, 69 insertions, 66 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java index c95644b..00180a0 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java @@ -45,8 +45,7 @@ public class OwnershipAndCheckpoint{ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(OwnershipAndCheckpoint.class); private Lock checkpointLock; - private AtomicBoolean change; - private Map<Range, Pair<MriReference, Integer>> alreadyApplied; + private Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied; private Map<UUID,Long> ownershipBeginTime; private long timeoutInMs; @@ -54,8 +53,7 @@ public class OwnershipAndCheckpoint{ this(new HashMap<>(),Long.MAX_VALUE); } - public OwnershipAndCheckpoint(Map<Range, Pair<MriReference, Integer>> alreadyApplied, long timeoutInMs){ - change = new AtomicBoolean(true); + public OwnershipAndCheckpoint(Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied, long timeoutInMs){ checkpointLock = new ReentrantLock(); this.alreadyApplied = alreadyApplied; ownershipBeginTime = new HashMap<>(); @@ -130,20 +128,17 @@ public class OwnershipAndCheckpoint{ * @param di * @param extendedDag * @param ranges - * @param locks * @param ownOpId * @throws MDBCServiceException */ - public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, Set<Range> ranges, - Map<MusicRangeInformationRow, LockResult> locks, UUID ownOpId) throws MDBCServiceException { + public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, Set<Range> ranges, UUID ownOpId) + throws MDBCServiceException { if(ranges.isEmpty()){ return; } try { checkpointLock.lock(); - change.set(true); - Set<Range> rangesSet = new HashSet<>(ranges); - extendedDag.setAlreadyApplied(alreadyApplied, rangesSet); + extendedDag.setAlreadyApplied(alreadyApplied, ranges); applyRequiredChanges(mi, di, extendedDag, ranges, ownOpId); } catch(MDBCServiceException e){ @@ -163,18 +158,18 @@ public class OwnershipAndCheckpoint{ } } - private void disableForeignKeys(DBInterface di) throws MDBCServiceException { + private void disableForeignKeys(DBInterface dbi) throws MDBCServiceException { try { - di.disableForeignKeyChecks(); + dbi.disableForeignKeyChecks(); } catch (SQLException e) { throw new MDBCServiceException("Error disable foreign keys checks",e); } } - private void applyTxDigest(DBInterface di, StagingTable txDigest) + private void applyTxDigest(DBInterface dbi, StagingTable txDigest) throws MDBCServiceException { try { - di.applyTxDigest(txDigest); + dbi.applyTxDigest(txDigest); } catch (SQLException e) { throw new MDBCServiceException("Error applying tx digest in local SQL",e); } @@ -191,39 +186,28 @@ public class OwnershipAndCheckpoint{ if(rangesToWarmup.isEmpty()){ return; } - boolean ready = false; - change.set(true); - Set<Range> rangeSet = new HashSet<Range>(rangesToWarmup); Dag dag = new Dag(false); - while(!ready){ - if(change.get()){ - change.set(false); - final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, rangesToWarmup,false); - dag = Dag.getDag(rows,rangesToWarmup); - } - else if(!dag.applied()){ - DagNode node = dag.nextToApply(rangesToWarmup); - if(node!=null) { - Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet); - while (pair != null) { + final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, rangesToWarmup,false); + dag = Dag.getDag(rows,rangesToWarmup); + dag.setAlreadyApplied(alreadyApplied, rangesToWarmup); + while(!dag.applied()){ + DagNode node = dag.nextToApply(rangesToWarmup); + if(node!=null) { + Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(rangesToWarmup); + while (pair != null) { + checkpointLock.lock(); + try { disableForeignKeys(di); - checkpointLock.lock(); - if (change.get()) { - enableForeignKeys(di); - checkpointLock.unlock(); - break; - } else { - applyDigestAndUpdateDataStructures(mi, di, node, pair); - } - pair = node.nextNotAppliedTransaction(rangeSet); + applyDigestAndUpdateDataStructures(mi, di, node, pair.getLeft(), pair.getRight()); + pair = node.nextNotAppliedTransaction(rangesToWarmup); enableForeignKeys(di); + } catch (MDBCServiceException e) { checkpointLock.unlock(); + throw e; } + checkpointLock.unlock(); } } - else{ - ready = true; - } } } @@ -235,25 +219,54 @@ public class OwnershipAndCheckpoint{ * @param pair * @throws MDBCServiceException */ - private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface di, DagNode node, - Pair<MusicTxDigestId, List<Range>> pair) throws MDBCServiceException { + private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface dbi, DagNode node, + MusicTxDigestId digestId, Set<Range> ranges) throws MDBCServiceException { + if (alreadyReplayed(node, digestId)) { + return; + } + final StagingTable txDigest; try { - txDigest = mi.getTxDigest(pair.getKey()); + txDigest = mi.getTxDigest(digestId); } catch (MDBCServiceException e) { logger.warn("Transaction digest was not found, this could be caused by a failure of the previous owner" +"And would normally only happen as the last ID of the corresponding redo log. Please check that this is the" - +" case for txID "+pair.getKey().transactionId.toString()); + +" case for txID "+digestId.transactionId.toString()); return; } - applyTxDigest(di, txDigest); - for (Range r : pair.getValue()) { + applyTxDigest(dbi, txDigest); + for (Range r : ranges) { MusicRangeInformationRow row = node.getRow(); - alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index)); + alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), digestId)); - updateCheckpointLocations(mi, di, r, row.getPartitionIndex(), pair.getKey().index); + updateCheckpointLocations(mi, dbi, r, row.getPartitionIndex(), digestId); } } + + /** + * Determine if this musictxdigest id has already been replayed + * @param node + * @param redoLogIndex + * @return true if alreadyApplied is past this node/redolog, false if it hasn't been replayed + */ + public boolean alreadyReplayed(DagNode node, MusicTxDigestId txdigest) { + int index = node.getRow().getRedoLog().indexOf(txdigest); + for (Range range: node.getRangeSet()) { + Pair<MriReference, MusicTxDigestId> applied = alreadyApplied.get(range); + if (applied==null) { + return false; + } + MriReference appliedMriRef = applied.getLeft(); + MusicTxDigestId appliedDigest = applied.getRight(); + int appliedIndex = node.getRow().getRedoLog().indexOf(appliedDigest); + if (appliedMriRef==null || appliedMriRef.getTimestamp() < node.getTimestamp() + || (appliedMriRef.getTimestamp() == node.getTimestamp() + && appliedIndex < index)) { + return false; + } + } + return true; + } /** * Update external checkpoint markers in sql db and music @@ -263,9 +276,9 @@ public class OwnershipAndCheckpoint{ * @param partitionIndex * @param index */ - private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, int index) { - dbi.updateCheckpointLocations(r, Pair.of(partitionIndex, index)); - mi.updateCheckpointLocations(r, Pair.of(partitionIndex, index)); + private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, MusicTxDigestId txdigest) { + dbi.updateCheckpointLocations(r, Pair.of(partitionIndex, txdigest.index)); + mi.updateCheckpointLocations(r, Pair.of(partitionIndex, txdigest.index)); } /** @@ -279,15 +292,14 @@ public class OwnershipAndCheckpoint{ */ private void applyRequiredChanges(MusicInterface mi, DBInterface db, Dag extendedDag, Set<Range> ranges, UUID ownOpId) throws MDBCServiceException { - Set<Range> rangeSet = new HashSet<Range>(ranges); disableForeignKeys(db); while(!extendedDag.applied()){ DagNode node = extendedDag.nextToApply(ranges); if(node!=null) { - Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet); + Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(ranges); while (pair != null) { - applyDigestAndUpdateDataStructures(mi, db, node, pair); - pair = node.nextNotAppliedTransaction(rangeSet); + applyDigestAndUpdateDataStructures(mi, db, node, pair.getLeft(), pair.getRight()); + pair = node.nextNotAppliedTransaction(ranges); if (timeout(ownOpId)) { enableForeignKeys(db); throw new MDBCServiceException("Timeout apply changes to local dbi"); @@ -346,7 +358,7 @@ public class OwnershipAndCheckpoint{ } Set<Range> allRanges = currentlyOwn.getAllRanges(); //TODO: we shouldn't need to go back to music at this point - List<MusicRangeInformationRow> latestRows = extractRowsForRange(mi, new HashSet<>(allRanges), true); + List<MusicRangeInformationRow> latestRows = extractRowsForRange(mi, allRanges, true); currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,latestRows)); return mi.mergeLatestRowsIfNecessary(currentlyOwn,locksForOwnership,opId); } @@ -462,15 +474,6 @@ public class OwnershipAndCheckpoint{ } - - public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException { - Set<Range> snapshot = partition.getSnapshot(); - UUID row = partition.getMRIIndex(); - for(Range r : snapshot){ - alreadyApplied.put(r,Pair.of(new MriReference(row),-1)); - } - } - // \TODO merge with dag code private Map<Range,Set<DagNode>> getIsLatestPerRange(Dag dag, List<MusicRangeInformationRow> rows) throws MDBCServiceException { Map<Range,Set<DagNode>> rowsPerLatestRange = new HashMap<>(); @@ -495,7 +498,7 @@ public class OwnershipAndCheckpoint{ } - public Map<Range, Pair<MriReference, Integer>> getAlreadyApplied() { + public Map<Range, Pair<MriReference, MusicTxDigestId>> getAlreadyApplied() { return this.alreadyApplied; } |