From 42214c84ef398fb65db4d84012de1e46c585f300 Mon Sep 17 00:00:00 2001 From: "Tschaen, Brendan" Date: Tue, 6 Aug 2019 13:37:29 -0400 Subject: Replay Transaction Updates Simplify replay logic, already applied datastructure Remove table from txdigest column Issue-ID: MUSIC-421 Signed-off-by: Tschaen, Brendan Change-Id: Ic757e6302e05d188704e625c76a77b106e000152 --- .../java/org/onap/music/mdbc/MdbcConnection.java | 7 +- .../java/org/onap/music/mdbc/StateManager.java | 3 +- .../org/onap/music/mdbc/mixins/MusicMixin.java | 52 +++----- .../org/onap/music/mdbc/mixins/MySQLMixin.java | 5 +- .../java/org/onap/music/mdbc/ownership/Dag.java | 20 +-- .../org/onap/music/mdbc/ownership/DagNode.java | 22 +++- .../mdbc/ownership/OwnershipAndCheckpoint.java | 135 +++++++++++---------- .../org/onap/music/mdbc/tables/MriReference.java | 3 + .../mdbc/tables/MusicRangeInformationRow.java | 2 +- .../onap/music/mdbc/tables/MusicTxDigestId.java | 4 + .../org/onap/music/mdbc/ownership/DagTest.java | 18 +-- .../mdbc/ownership/OwnershipAndCheckpointTest.java | 3 +- 12 files changed, 144 insertions(+), 130 deletions(-) diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java index 6f097dd..42864ea 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java @@ -193,7 +193,8 @@ public class MdbcConnection implements Connection { try { partition = mi.splitPartitionIfNecessary(partition, rangesUsed); } catch (MDBCServiceException e) { - logger.warn(EELFLoggerDelegate.errorLogger, "Failure to split partition, trying to continue", + logger.warn(EELFLoggerDelegate.errorLogger, + "Failure to split partition '" + partition.getMRIIndex() + "' trying to continue", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL); } @@ -541,7 +542,6 @@ public class MdbcConnection implements Connection { DatabasePartition tempPartition = own(scRanges, MDBCUtils.getOperationType(tableToQueryType)); if(tempPartition!=null && tempPartition != partition) { this.partition.updateDatabasePartition(tempPartition); - statemanager.getOwnAndCheck().reloadAlreadyApplied(this.partition); } dbi.preStatementHook(sql); } @@ -619,7 +619,8 @@ public class MdbcConnection implements Connection { MusicRangeInformationRow row = node.getRow(); Map lock = new HashMap<>(); lock.put(row, new LockResult(row.getPartitionIndex(), ownershipReturn.getOwnerId(), true, ranges)); - ownAndCheck.checkpoint(this.mi, this.dbi, dag, ranges, lock, ownershipReturn.getOwnershipId()); + ownAndCheck.checkpoint(this.mi, this.dbi, dag, ranges, ownershipReturn.getOwnershipId()); + //TODO: need to update pointer in alreadyapplied if a merge happened instead of in prestatement hook newPartition = new DatabasePartition(ownershipReturn.getRanges(), ownershipReturn.getRangeId(), ownershipReturn.getOwnerId()); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index 8d42370..b3ec61d 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -36,6 +36,7 @@ import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn; import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint; import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MusicTxDigestDaemon; +import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.TxCommitProgress; import java.io.IOException; @@ -92,7 +93,7 @@ public class StateManager { /** a set of ranges that should be periodically updated with latest information, if null all tables should be warmed up */ private Set rangesToWarmup; /** map of transactions that have already been applied/updated in this sites SQL db */ - private Map> alreadyApplied; + private Map> alreadyApplied; private OwnershipAndCheckpoint ownAndCheck; private Thread txDaemon ; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index e87f7e4..20e1d5d 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -57,6 +57,7 @@ import org.onap.music.main.ResultType; import org.onap.music.main.ReturnType; import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.MDBCUtils; +import org.onap.music.mdbc.MdbcConnection; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.StateManager; import org.onap.music.mdbc.TableInfo; @@ -1123,22 +1124,19 @@ public class MusicMixin implements MusicInterface { * Build a preparedQueryObject that appends a transaction to the mriTable * @param mriTable * @param uuid - * @param table * @param redoUuid * @return */ - private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, String table, UUID redoUuid){ + private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, UUID redoUuid){ PreparedQueryObject query = new PreparedQueryObject(); StringBuilder appendBuilder = new StringBuilder(); appendBuilder.append("UPDATE ") .append(music_ns) .append(".") .append(mriTable) - .append(" SET txredolog = txredolog +[('") - .append(table) - .append("',") + .append(" SET txredolog = txredolog +[") .append(redoUuid) - .append(")] WHERE rangeid = ") + .append("] WHERE rangeid = ") .append(uuid) .append(";"); query.appendQueryString(appendBuilder.toString()); @@ -1342,8 +1340,7 @@ public class MusicMixin implements MusicInterface { }; Callable appendCallable=()-> { try { - appendToRedoLog(music_ns, mriIndex, digestId.transactionId, lockId, musicTxDigestTableName, - musicRangeInformationTableName); + appendToRedoLog(music_ns, mriIndex, digestId.transactionId, lockId, musicRangeInformationTableName); return true; } catch (MDBCServiceException e) { logger.error(EELFLoggerDelegate.errorLogger, "Error creating and pushing tx digest to music",e); @@ -1369,20 +1366,12 @@ public class MusicMixin implements MusicInterface { if (progressKeeper != null) { progressKeeper.setRecordId(txId, digestId); } + Set ranges = partition.getSnapshot(); + + Map> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied(); for(Range r : ranges) { - Map> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied(); - if(!alreadyApplied.containsKey(r)){ - throw new MDBCServiceException("already applied data structure was not updated correctly and range " - +r+" is not contained"); - } - Pair rowAndIndex = alreadyApplied.get(r); - MriReference key = rowAndIndex.getKey(); - if(!mriIndex.equals(key.index)){ - throw new MDBCServiceException("already applied data structure was not updated correctly and range "+ - r+" is not pointing to row: "+mriIndex.toString()); - } - alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), rowAndIndex.getValue()+1)); + alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), digestId)); } } @@ -1482,13 +1471,11 @@ public class MusicMixin implements MusicInterface { static public MusicRangeInformationRow getMRIRowFromCassandraRow(Row newRow){ UUID partitionIndex = newRow.getUUID("rangeid"); - List log = newRow.getList("txredolog",TupleValue.class); + List log = newRow.getList("txredolog",UUID.class); List digestIds = new ArrayList<>(); int index=0; - for(TupleValue t: log){ - //final String tableName = t.getString(0); - final UUID id = t.getUUID(1); - digestIds.add(new MusicTxDigestId(partitionIndex,id,index++)); + for(UUID u: log){ + digestIds.add(new MusicTxDigestId(partitionIndex,u,index++)); } Set partitions = new HashSet<>(); Set tables = newRow.getSet("keys",String.class); @@ -1569,7 +1556,7 @@ public class MusicMixin implements MusicInterface { fields.append("prevmrirows set, "); fields.append("islatest boolean, "); //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly - fields.append("txredolog list>> "); + fields.append("txredolog list "); String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", namespace, tableName, fields, priKey); try { @@ -1703,15 +1690,12 @@ public class MusicMixin implements MusicInterface { @Override public void appendToRedoLog(UUID MRIIndex, String lockId, MusicTxDigestId newRecord) throws MDBCServiceException { logger.debug("Appending to redo log for partition " + MRIIndex + " txId=" + newRecord.transactionId); - appendToRedoLog(music_ns,MRIIndex,newRecord.transactionId,lockId,musicTxDigestTableName, - musicRangeInformationTableName); + appendToRedoLog(music_ns,MRIIndex,newRecord.transactionId,lockId,musicRangeInformationTableName); } - public void appendToRedoLog(String musicNamespace, UUID MRIIndex, UUID transactionId, String lockId, - String musicTxDigestTableName, String musicRangeInformationTableName) + public void appendToRedoLog(String musicNamespace, UUID MRIIndex, UUID transactionId, String lockId, String musicRangeInformationTableName) throws MDBCServiceException{ - PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MRIIndex, - musicTxDigestTableName, transactionId); + PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MRIIndex, transactionId); ReturnType returnType = MusicCore.criticalPut(musicNamespace, musicRangeInformationTableName, MRIIndex.toString(), appendQuery, lockId, null); //returnType.getExecutionInfo() @@ -2226,13 +2210,15 @@ public class MusicMixin implements MusicInterface { changeIsLatestToMRI(partition.getMRIIndex(), false, partition.getLockId()); - Map> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied(); + /* + Map> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied(); for (Range range: rangesUsed) { alreadyApplied.put(range, Pair.of(new MriReference(usedRow.getPartitionIndex()), -1)); } for (Range range: rangesNotUsed) { alreadyApplied.put(range, Pair.of(new MriReference(unusedRow.getPartitionIndex()), -1)); } + */ //release/update old partition info relinquish(unusedRow.getDBPartition()); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java index 3af6f0f..2c501dc 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java @@ -247,7 +247,10 @@ public class MySQLMixin implements DBInterface { logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set); Set rangeSet = new HashSet<>(); for (String table : set) { - rangeSet.add(new Range(table)); + if (getReservedTblNames().contains(table)) { + // Don't create triggers for the table the triggers write into!!! + rangeSet.add(new Range(table)); + } } return rangeSet; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java index 9d1685c..142cb34 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java @@ -32,6 +32,7 @@ import org.onap.music.mdbc.Range; import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MriRowComparator; import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.mdbc.tables.MusicTxDigestId; public class Dag { @@ -145,7 +146,6 @@ public class Dag { if(!readyInit){ initApplyDatastructures(); } - Set rangesSet = new HashSet<>(ranges); while(!toApplyNodes.isEmpty()){ DagNode nextNode = toApplyNodes.poll(); List outgoing = nextNode.getOutgoingEdges(); @@ -155,7 +155,7 @@ public class Dag { toApplyNodes.add(out); } } - if(!nextNode.wasApplied(rangesSet)){ + if(!nextNode.wasApplied(ranges)){ return nextNode; } } @@ -233,23 +233,23 @@ public class Dag { return toApplyNodes.isEmpty(); } - public void setAlreadyApplied(Map> alreadyApplied, Set ranges) + public void setAlreadyApplied(Map> alreadyApplied, Set ranges) throws MDBCServiceException { - for(Map.Entry node : nodes.entrySet()){ + for (DagNode node: nodes.values()) { Set intersection = new HashSet<>(ranges); - intersection.retainAll(node.getValue().getRangeSet()); + intersection.retainAll(node.getRangeSet()); for(Range r : intersection){ if(alreadyApplied.containsKey(r)){ - final Pair appliedPair = alreadyApplied.get(r); + final Pair appliedPair = alreadyApplied.get(r); final MriReference appliedRow = appliedPair.getKey(); - final int index = appliedPair.getValue(); + final int index = appliedPair.getValue().index; final long appliedTimestamp = appliedRow.getTimestamp(); - final long nodeTimestamp = node.getValue().getTimestamp(); + final long nodeTimestamp = node.getTimestamp(); if(appliedTimestamp > nodeTimestamp){ - setReady(node.getValue(),r); + setReady(node,r); } else if(appliedTimestamp == nodeTimestamp){ - setPartiallyReady(node.getValue(),r,index); + setPartiallyReady(node,r,index); } } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java index 78c68e1..5e4c899 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java @@ -30,6 +30,7 @@ import java.util.UUID; import org.apache.commons.lang3.tuple.Pair; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.MusicTxDigestId; @@ -73,6 +74,10 @@ public class DagNode { return owned; } + /** + * + * @return the row's MRI Index represented by this dagnode + */ public UUID getId(){ return row.getPartitionIndex(); } @@ -149,20 +154,25 @@ public class DagNode { currentIndex = currentIndex+1; } - public synchronized Pair> nextNotAppliedTransaction(Set ranges){ + /** + * + * @param ranges + * @return the index of the next transaction to replay and the ranges needed for this transaction + */ + public synchronized Pair> nextNotAppliedTransaction(Set ranges){ if(row.getRedoLog().isEmpty()) return null; if(!applyInit){ initializeApply(ranges); } final List redoLog = row.getRedoLog(); if(currentIndex < redoLog.size()){ - List responseRanges= new ArrayList<>(); + Set responseRanges= new HashSet<>(); startIndex.forEach((r, index) -> { if(index < currentIndex){ responseRanges.add(r); } }); - return Pair.of(redoLog.get(currentIndex++),responseRanges); + return Pair.of(row.getRedoLog().get(currentIndex++),responseRanges); } return null; } @@ -179,7 +189,7 @@ public class DagNode { if(row.getRedoLog().isEmpty()) return true; if(!applyInit){ initializeApply(ranges); - } + } return currentIndex >= row.getRedoLog().size(); } @@ -194,11 +204,13 @@ public class DagNode { if(o == null) return false; if(!(o instanceof DagNode)) return false; DagNode other = (DagNode) o; - return other.row.getPartitionIndex().equals(this.row.getPartitionIndex()); + return other.row.equals(this.row); } @Override public int hashCode(){ return row.getPartitionIndex().hashCode(); } + + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java index c95644b..00180a0 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java @@ -45,8 +45,7 @@ public class OwnershipAndCheckpoint{ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(OwnershipAndCheckpoint.class); private Lock checkpointLock; - private AtomicBoolean change; - private Map> alreadyApplied; + private Map> alreadyApplied; private Map ownershipBeginTime; private long timeoutInMs; @@ -54,8 +53,7 @@ public class OwnershipAndCheckpoint{ this(new HashMap<>(),Long.MAX_VALUE); } - public OwnershipAndCheckpoint(Map> alreadyApplied, long timeoutInMs){ - change = new AtomicBoolean(true); + public OwnershipAndCheckpoint(Map> alreadyApplied, long timeoutInMs){ checkpointLock = new ReentrantLock(); this.alreadyApplied = alreadyApplied; ownershipBeginTime = new HashMap<>(); @@ -130,20 +128,17 @@ public class OwnershipAndCheckpoint{ * @param di * @param extendedDag * @param ranges - * @param locks * @param ownOpId * @throws MDBCServiceException */ - public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, Set ranges, - Map locks, UUID ownOpId) throws MDBCServiceException { + public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, Set ranges, UUID ownOpId) + throws MDBCServiceException { if(ranges.isEmpty()){ return; } try { checkpointLock.lock(); - change.set(true); - Set rangesSet = new HashSet<>(ranges); - extendedDag.setAlreadyApplied(alreadyApplied, rangesSet); + extendedDag.setAlreadyApplied(alreadyApplied, ranges); applyRequiredChanges(mi, di, extendedDag, ranges, ownOpId); } catch(MDBCServiceException e){ @@ -163,18 +158,18 @@ public class OwnershipAndCheckpoint{ } } - private void disableForeignKeys(DBInterface di) throws MDBCServiceException { + private void disableForeignKeys(DBInterface dbi) throws MDBCServiceException { try { - di.disableForeignKeyChecks(); + dbi.disableForeignKeyChecks(); } catch (SQLException e) { throw new MDBCServiceException("Error disable foreign keys checks",e); } } - private void applyTxDigest(DBInterface di, StagingTable txDigest) + private void applyTxDigest(DBInterface dbi, StagingTable txDigest) throws MDBCServiceException { try { - di.applyTxDigest(txDigest); + dbi.applyTxDigest(txDigest); } catch (SQLException e) { throw new MDBCServiceException("Error applying tx digest in local SQL",e); } @@ -191,39 +186,28 @@ public class OwnershipAndCheckpoint{ if(rangesToWarmup.isEmpty()){ return; } - boolean ready = false; - change.set(true); - Set rangeSet = new HashSet(rangesToWarmup); Dag dag = new Dag(false); - while(!ready){ - if(change.get()){ - change.set(false); - final List rows = extractRowsForRange(mi, rangesToWarmup,false); - dag = Dag.getDag(rows,rangesToWarmup); - } - else if(!dag.applied()){ - DagNode node = dag.nextToApply(rangesToWarmup); - if(node!=null) { - Pair> pair = node.nextNotAppliedTransaction(rangeSet); - while (pair != null) { + final List rows = extractRowsForRange(mi, rangesToWarmup,false); + dag = Dag.getDag(rows,rangesToWarmup); + dag.setAlreadyApplied(alreadyApplied, rangesToWarmup); + while(!dag.applied()){ + DagNode node = dag.nextToApply(rangesToWarmup); + if(node!=null) { + Pair> pair = node.nextNotAppliedTransaction(rangesToWarmup); + while (pair != null) { + checkpointLock.lock(); + try { disableForeignKeys(di); - checkpointLock.lock(); - if (change.get()) { - enableForeignKeys(di); - checkpointLock.unlock(); - break; - } else { - applyDigestAndUpdateDataStructures(mi, di, node, pair); - } - pair = node.nextNotAppliedTransaction(rangeSet); + applyDigestAndUpdateDataStructures(mi, di, node, pair.getLeft(), pair.getRight()); + pair = node.nextNotAppliedTransaction(rangesToWarmup); enableForeignKeys(di); + } catch (MDBCServiceException e) { checkpointLock.unlock(); + throw e; } + checkpointLock.unlock(); } } - else{ - ready = true; - } } } @@ -235,25 +219,54 @@ public class OwnershipAndCheckpoint{ * @param pair * @throws MDBCServiceException */ - private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface di, DagNode node, - Pair> pair) throws MDBCServiceException { + private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface dbi, DagNode node, + MusicTxDigestId digestId, Set ranges) throws MDBCServiceException { + if (alreadyReplayed(node, digestId)) { + return; + } + final StagingTable txDigest; try { - txDigest = mi.getTxDigest(pair.getKey()); + txDigest = mi.getTxDigest(digestId); } catch (MDBCServiceException e) { logger.warn("Transaction digest was not found, this could be caused by a failure of the previous owner" +"And would normally only happen as the last ID of the corresponding redo log. Please check that this is the" - +" case for txID "+pair.getKey().transactionId.toString()); + +" case for txID "+digestId.transactionId.toString()); return; } - applyTxDigest(di, txDigest); - for (Range r : pair.getValue()) { + applyTxDigest(dbi, txDigest); + for (Range r : ranges) { MusicRangeInformationRow row = node.getRow(); - alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index)); + alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), digestId)); - updateCheckpointLocations(mi, di, r, row.getPartitionIndex(), pair.getKey().index); + updateCheckpointLocations(mi, dbi, r, row.getPartitionIndex(), digestId); } } + + /** + * Determine if this musictxdigest id has already been replayed + * @param node + * @param redoLogIndex + * @return true if alreadyApplied is past this node/redolog, false if it hasn't been replayed + */ + public boolean alreadyReplayed(DagNode node, MusicTxDigestId txdigest) { + int index = node.getRow().getRedoLog().indexOf(txdigest); + for (Range range: node.getRangeSet()) { + Pair applied = alreadyApplied.get(range); + if (applied==null) { + return false; + } + MriReference appliedMriRef = applied.getLeft(); + MusicTxDigestId appliedDigest = applied.getRight(); + int appliedIndex = node.getRow().getRedoLog().indexOf(appliedDigest); + if (appliedMriRef==null || appliedMriRef.getTimestamp() < node.getTimestamp() + || (appliedMriRef.getTimestamp() == node.getTimestamp() + && appliedIndex < index)) { + return false; + } + } + return true; + } /** * Update external checkpoint markers in sql db and music @@ -263,9 +276,9 @@ public class OwnershipAndCheckpoint{ * @param partitionIndex * @param index */ - private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, int index) { - dbi.updateCheckpointLocations(r, Pair.of(partitionIndex, index)); - mi.updateCheckpointLocations(r, Pair.of(partitionIndex, index)); + private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, MusicTxDigestId txdigest) { + dbi.updateCheckpointLocations(r, Pair.of(partitionIndex, txdigest.index)); + mi.updateCheckpointLocations(r, Pair.of(partitionIndex, txdigest.index)); } /** @@ -279,15 +292,14 @@ public class OwnershipAndCheckpoint{ */ private void applyRequiredChanges(MusicInterface mi, DBInterface db, Dag extendedDag, Set ranges, UUID ownOpId) throws MDBCServiceException { - Set rangeSet = new HashSet(ranges); disableForeignKeys(db); while(!extendedDag.applied()){ DagNode node = extendedDag.nextToApply(ranges); if(node!=null) { - Pair> pair = node.nextNotAppliedTransaction(rangeSet); + Pair> pair = node.nextNotAppliedTransaction(ranges); while (pair != null) { - applyDigestAndUpdateDataStructures(mi, db, node, pair); - pair = node.nextNotAppliedTransaction(rangeSet); + applyDigestAndUpdateDataStructures(mi, db, node, pair.getLeft(), pair.getRight()); + pair = node.nextNotAppliedTransaction(ranges); if (timeout(ownOpId)) { enableForeignKeys(db); throw new MDBCServiceException("Timeout apply changes to local dbi"); @@ -346,7 +358,7 @@ public class OwnershipAndCheckpoint{ } Set allRanges = currentlyOwn.getAllRanges(); //TODO: we shouldn't need to go back to music at this point - List latestRows = extractRowsForRange(mi, new HashSet<>(allRanges), true); + List latestRows = extractRowsForRange(mi, allRanges, true); currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,latestRows)); return mi.mergeLatestRowsIfNecessary(currentlyOwn,locksForOwnership,opId); } @@ -462,15 +474,6 @@ public class OwnershipAndCheckpoint{ } - - public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException { - Set snapshot = partition.getSnapshot(); - UUID row = partition.getMRIIndex(); - for(Range r : snapshot){ - alreadyApplied.put(r,Pair.of(new MriReference(row),-1)); - } - } - // \TODO merge with dag code private Map> getIsLatestPerRange(Dag dag, List rows) throws MDBCServiceException { Map> rowsPerLatestRange = new HashMap<>(); @@ -495,7 +498,7 @@ public class OwnershipAndCheckpoint{ } - public Map> getAlreadyApplied() { + public Map> getAlreadyApplied() { return this.alreadyApplied; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java index 8aad335..9383ac5 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java @@ -30,4 +30,7 @@ public final class MriReference { public long getTimestamp() { return index.timestamp();} + public String toString() { + return index.toString(); + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java index de711ef..8c95047 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java @@ -96,7 +96,7 @@ public final class MusicRangeInformationRow implements Comparable rangesSet = new HashSet<>(ranges); while(!dag.applied()){ DagNode node = dag.nextToApply(ranges); - Pair> pair = node.nextNotAppliedTransaction(rangesSet); + Pair> pair = node.nextNotAppliedTransaction(rangesSet); int transactionCounter = 0; while(pair!=null) { assertNotEquals(1,transactionCounter); @@ -178,9 +178,10 @@ public class DagTest { MusicTxDigestId id = row.getRedoLog().get(transactionCounter); assertEquals(id,pair.getKey()); assertEquals(0,pair.getKey().index); - List value = pair.getValue(); + Set value = pair.getValue(); assertEquals(1,value.size()); - assertEquals(new Range("schema.range1"),value.get(0)); + assertTrue(value.contains(new Range("schema.range1"))); + //assertEquals(new Range("schema.range1"),value.get(0)); pair = node.nextNotAppliedTransaction(rangesSet); transactionCounter++; } @@ -192,7 +193,7 @@ public class DagTest { @Test public void nextToApply2() throws InterruptedException, MDBCServiceException { - Map> alreadyApplied = new HashMap<>(); + Map> alreadyApplied = new HashMap<>(); List rows = new ArrayList<>(); Set ranges = new HashSet<>( Arrays.asList( new Range("schema.range1") @@ -207,7 +208,7 @@ public class DagTest { new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),1) )); MusicRangeInformationRow newRow = createNewRow(new HashSet<>(ranges), "", false, redo2); - alreadyApplied.put(new Range("schema.range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0)); + alreadyApplied.put(new Range("schema.range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), redo1.get(0))); rows.add(newRow); MILLISECONDS.sleep(10); List redo3 = new ArrayList<>(Arrays.asList( @@ -220,7 +221,7 @@ public class DagTest { int nodeCounter = 1; while(!dag.applied()){ DagNode node = dag.nextToApply(ranges); - Pair> pair = node.nextNotAppliedTransaction(rangesSet); + Pair> pair = node.nextNotAppliedTransaction(rangesSet); int transactionCounter = 0; while(pair!=null) { assertNotEquals(1,transactionCounter); @@ -228,9 +229,10 @@ public class DagTest { MusicTxDigestId id = row.getRedoLog().get(2-nodeCounter); assertEquals(id,pair.getKey()); assertEquals(2-nodeCounter,pair.getKey().index); - List value = pair.getValue(); + Set value = pair.getValue(); assertEquals(1,value.size()); - assertEquals(new Range("schema.range1"),value.get(0)); + assertTrue(value.contains(new Range("schema.range1"))); + //assertEquals(new Range("schema.range1"),value.get(0)); pair = node.nextNotAppliedTransaction(rangesSet); transactionCounter++; } diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java index 2443d1e..1c9eb11 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java @@ -151,7 +151,6 @@ public class OwnershipAndCheckpointTest { String sqlOperation = "INSERT INTO "+TABLE+" (PersonID,LastName,FirstName,Address,City) VALUES "+ "(1,'SAUREZ','ENRIQUE','GATECH','ATLANTA');"; StagingTable stagingTable = new StagingTable(); - ownAndCheck.reloadAlreadyApplied(partition); final Statement executeStatement = this.conn.createStatement(); executeStatement.execute(sqlOperation); this.conn.commit(); @@ -224,7 +223,7 @@ public class OwnershipAndCheckpointTest { locks.put(own.getDag().getNode(own.getRangeId()).getRow(), new LockResult(own.getRangeId(), own.getOwnerId(), true, ranges)); - ownAndCheck.checkpoint(musicMixin, mysqlMixin, own.getDag(), ranges, locks, ownOpId); + ownAndCheck.checkpoint(musicMixin, mysqlMixin, own.getDag(), ranges, ownOpId); } checkData(); -- cgit 1.2.3-korg