aboutsummaryrefslogtreecommitdiffstats
path: root/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
diff options
context:
space:
mode:
Diffstat (limited to 'mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java')
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java135
1 files changed, 69 insertions, 66 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
index c95644b..00180a0 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
@@ -45,8 +45,7 @@ public class OwnershipAndCheckpoint{
private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(OwnershipAndCheckpoint.class);
private Lock checkpointLock;
- private AtomicBoolean change;
- private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
+ private Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied;
private Map<UUID,Long> ownershipBeginTime;
private long timeoutInMs;
@@ -54,8 +53,7 @@ public class OwnershipAndCheckpoint{
this(new HashMap<>(),Long.MAX_VALUE);
}
- public OwnershipAndCheckpoint(Map<Range, Pair<MriReference, Integer>> alreadyApplied, long timeoutInMs){
- change = new AtomicBoolean(true);
+ public OwnershipAndCheckpoint(Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied, long timeoutInMs){
checkpointLock = new ReentrantLock();
this.alreadyApplied = alreadyApplied;
ownershipBeginTime = new HashMap<>();
@@ -130,20 +128,17 @@ public class OwnershipAndCheckpoint{
* @param di
* @param extendedDag
* @param ranges
- * @param locks
* @param ownOpId
* @throws MDBCServiceException
*/
- public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, Set<Range> ranges,
- Map<MusicRangeInformationRow, LockResult> locks, UUID ownOpId) throws MDBCServiceException {
+ public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, Set<Range> ranges, UUID ownOpId)
+ throws MDBCServiceException {
if(ranges.isEmpty()){
return;
}
try {
checkpointLock.lock();
- change.set(true);
- Set<Range> rangesSet = new HashSet<>(ranges);
- extendedDag.setAlreadyApplied(alreadyApplied, rangesSet);
+ extendedDag.setAlreadyApplied(alreadyApplied, ranges);
applyRequiredChanges(mi, di, extendedDag, ranges, ownOpId);
}
catch(MDBCServiceException e){
@@ -163,18 +158,18 @@ public class OwnershipAndCheckpoint{
}
}
- private void disableForeignKeys(DBInterface di) throws MDBCServiceException {
+ private void disableForeignKeys(DBInterface dbi) throws MDBCServiceException {
try {
- di.disableForeignKeyChecks();
+ dbi.disableForeignKeyChecks();
} catch (SQLException e) {
throw new MDBCServiceException("Error disable foreign keys checks",e);
}
}
- private void applyTxDigest(DBInterface di, StagingTable txDigest)
+ private void applyTxDigest(DBInterface dbi, StagingTable txDigest)
throws MDBCServiceException {
try {
- di.applyTxDigest(txDigest);
+ dbi.applyTxDigest(txDigest);
} catch (SQLException e) {
throw new MDBCServiceException("Error applying tx digest in local SQL",e);
}
@@ -191,39 +186,28 @@ public class OwnershipAndCheckpoint{
if(rangesToWarmup.isEmpty()){
return;
}
- boolean ready = false;
- change.set(true);
- Set<Range> rangeSet = new HashSet<Range>(rangesToWarmup);
Dag dag = new Dag(false);
- while(!ready){
- if(change.get()){
- change.set(false);
- final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, rangesToWarmup,false);
- dag = Dag.getDag(rows,rangesToWarmup);
- }
- else if(!dag.applied()){
- DagNode node = dag.nextToApply(rangesToWarmup);
- if(node!=null) {
- Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet);
- while (pair != null) {
+ final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, rangesToWarmup,false);
+ dag = Dag.getDag(rows,rangesToWarmup);
+ dag.setAlreadyApplied(alreadyApplied, rangesToWarmup);
+ while(!dag.applied()){
+ DagNode node = dag.nextToApply(rangesToWarmup);
+ if(node!=null) {
+ Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(rangesToWarmup);
+ while (pair != null) {
+ checkpointLock.lock();
+ try {
disableForeignKeys(di);
- checkpointLock.lock();
- if (change.get()) {
- enableForeignKeys(di);
- checkpointLock.unlock();
- break;
- } else {
- applyDigestAndUpdateDataStructures(mi, di, node, pair);
- }
- pair = node.nextNotAppliedTransaction(rangeSet);
+ applyDigestAndUpdateDataStructures(mi, di, node, pair.getLeft(), pair.getRight());
+ pair = node.nextNotAppliedTransaction(rangesToWarmup);
enableForeignKeys(di);
+ } catch (MDBCServiceException e) {
checkpointLock.unlock();
+ throw e;
}
+ checkpointLock.unlock();
}
}
- else{
- ready = true;
- }
}
}
@@ -235,25 +219,54 @@ public class OwnershipAndCheckpoint{
* @param pair
* @throws MDBCServiceException
*/
- private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface di, DagNode node,
- Pair<MusicTxDigestId, List<Range>> pair) throws MDBCServiceException {
+ private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface dbi, DagNode node,
+ MusicTxDigestId digestId, Set<Range> ranges) throws MDBCServiceException {
+ if (alreadyReplayed(node, digestId)) {
+ return;
+ }
+
final StagingTable txDigest;
try {
- txDigest = mi.getTxDigest(pair.getKey());
+ txDigest = mi.getTxDigest(digestId);
} catch (MDBCServiceException e) {
logger.warn("Transaction digest was not found, this could be caused by a failure of the previous owner"
+"And would normally only happen as the last ID of the corresponding redo log. Please check that this is the"
- +" case for txID "+pair.getKey().transactionId.toString());
+ +" case for txID "+digestId.transactionId.toString());
return;
}
- applyTxDigest(di, txDigest);
- for (Range r : pair.getValue()) {
+ applyTxDigest(dbi, txDigest);
+ for (Range r : ranges) {
MusicRangeInformationRow row = node.getRow();
- alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index));
+ alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), digestId));
- updateCheckpointLocations(mi, di, r, row.getPartitionIndex(), pair.getKey().index);
+ updateCheckpointLocations(mi, dbi, r, row.getPartitionIndex(), digestId);
}
}
+
+ /**
+ * Determine if this musictxdigest id has already been replayed
+ * @param node
+ * @param redoLogIndex
+ * @return true if alreadyApplied is past this node/redolog, false if it hasn't been replayed
+ */
+ public boolean alreadyReplayed(DagNode node, MusicTxDigestId txdigest) {
+ int index = node.getRow().getRedoLog().indexOf(txdigest);
+ for (Range range: node.getRangeSet()) {
+ Pair<MriReference, MusicTxDigestId> applied = alreadyApplied.get(range);
+ if (applied==null) {
+ return false;
+ }
+ MriReference appliedMriRef = applied.getLeft();
+ MusicTxDigestId appliedDigest = applied.getRight();
+ int appliedIndex = node.getRow().getRedoLog().indexOf(appliedDigest);
+ if (appliedMriRef==null || appliedMriRef.getTimestamp() < node.getTimestamp()
+ || (appliedMriRef.getTimestamp() == node.getTimestamp()
+ && appliedIndex < index)) {
+ return false;
+ }
+ }
+ return true;
+ }
/**
* Update external checkpoint markers in sql db and music
@@ -263,9 +276,9 @@ public class OwnershipAndCheckpoint{
* @param partitionIndex
* @param index
*/
- private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, int index) {
- dbi.updateCheckpointLocations(r, Pair.of(partitionIndex, index));
- mi.updateCheckpointLocations(r, Pair.of(partitionIndex, index));
+ private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, MusicTxDigestId txdigest) {
+ dbi.updateCheckpointLocations(r, Pair.of(partitionIndex, txdigest.index));
+ mi.updateCheckpointLocations(r, Pair.of(partitionIndex, txdigest.index));
}
/**
@@ -279,15 +292,14 @@ public class OwnershipAndCheckpoint{
*/
private void applyRequiredChanges(MusicInterface mi, DBInterface db, Dag extendedDag, Set<Range> ranges, UUID ownOpId)
throws MDBCServiceException {
- Set<Range> rangeSet = new HashSet<Range>(ranges);
disableForeignKeys(db);
while(!extendedDag.applied()){
DagNode node = extendedDag.nextToApply(ranges);
if(node!=null) {
- Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet);
+ Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(ranges);
while (pair != null) {
- applyDigestAndUpdateDataStructures(mi, db, node, pair);
- pair = node.nextNotAppliedTransaction(rangeSet);
+ applyDigestAndUpdateDataStructures(mi, db, node, pair.getLeft(), pair.getRight());
+ pair = node.nextNotAppliedTransaction(ranges);
if (timeout(ownOpId)) {
enableForeignKeys(db);
throw new MDBCServiceException("Timeout apply changes to local dbi");
@@ -346,7 +358,7 @@ public class OwnershipAndCheckpoint{
}
Set<Range> allRanges = currentlyOwn.getAllRanges();
//TODO: we shouldn't need to go back to music at this point
- List<MusicRangeInformationRow> latestRows = extractRowsForRange(mi, new HashSet<>(allRanges), true);
+ List<MusicRangeInformationRow> latestRows = extractRowsForRange(mi, allRanges, true);
currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,latestRows));
return mi.mergeLatestRowsIfNecessary(currentlyOwn,locksForOwnership,opId);
}
@@ -462,15 +474,6 @@ public class OwnershipAndCheckpoint{
}
-
- public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException {
- Set<Range> snapshot = partition.getSnapshot();
- UUID row = partition.getMRIIndex();
- for(Range r : snapshot){
- alreadyApplied.put(r,Pair.of(new MriReference(row),-1));
- }
- }
-
// \TODO merge with dag code
private Map<Range,Set<DagNode>> getIsLatestPerRange(Dag dag, List<MusicRangeInformationRow> rows) throws MDBCServiceException {
Map<Range,Set<DagNode>> rowsPerLatestRange = new HashMap<>();
@@ -495,7 +498,7 @@ public class OwnershipAndCheckpoint{
}
- public Map<Range, Pair<MriReference, Integer>> getAlreadyApplied() {
+ public Map<Range, Pair<MriReference, MusicTxDigestId>> getAlreadyApplied() {
return this.alreadyApplied;
}