aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBharath Balasubramanian <bharathb@research.att.com>2019-09-04 15:45:00 +0000
committerGerrit Code Review <gerrit@onap.org>2019-09-04 15:45:00 +0000
commitd2b5c18e008c4be333ff0f5c47d1d8136802f6b6 (patch)
treed687a5bb030ff0246522c116991a14f51f6874cb
parent84c322b7d05bb820af38421fbda75e8dc9817c69 (diff)
parent42214c84ef398fb65db4d84012de1e46c585f300 (diff)
Merge "Replay Transaction Updates"
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java7
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java3
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java52
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java5
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java20
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java22
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java135
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java3
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java2
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java4
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java18
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java3
12 files changed, 144 insertions, 130 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
index 6f097dd..42864ea 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
@@ -193,7 +193,8 @@ public class MdbcConnection implements Connection {
try {
partition = mi.splitPartitionIfNecessary(partition, rangesUsed);
} catch (MDBCServiceException e) {
- logger.warn(EELFLoggerDelegate.errorLogger, "Failure to split partition, trying to continue",
+ logger.warn(EELFLoggerDelegate.errorLogger,
+ "Failure to split partition '" + partition.getMRIIndex() + "' trying to continue",
AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
}
@@ -541,7 +542,6 @@ public class MdbcConnection implements Connection {
DatabasePartition tempPartition = own(scRanges, MDBCUtils.getOperationType(tableToQueryType));
if(tempPartition!=null && tempPartition != partition) {
this.partition.updateDatabasePartition(tempPartition);
- statemanager.getOwnAndCheck().reloadAlreadyApplied(this.partition);
}
dbi.preStatementHook(sql);
}
@@ -619,7 +619,8 @@ public class MdbcConnection implements Connection {
MusicRangeInformationRow row = node.getRow();
Map<MusicRangeInformationRow, LockResult> lock = new HashMap<>();
lock.put(row, new LockResult(row.getPartitionIndex(), ownershipReturn.getOwnerId(), true, ranges));
- ownAndCheck.checkpoint(this.mi, this.dbi, dag, ranges, lock, ownershipReturn.getOwnershipId());
+ ownAndCheck.checkpoint(this.mi, this.dbi, dag, ranges, ownershipReturn.getOwnershipId());
+ //TODO: need to update pointer in alreadyapplied if a merge happened instead of in prestatement hook
newPartition = new DatabasePartition(ownershipReturn.getRanges(), ownershipReturn.getRangeId(),
ownershipReturn.getOwnerId());
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index f8a18d9..5b2b8df 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -36,6 +36,7 @@ import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn;
import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicTxDigestDaemon;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.TxCommitProgress;
import java.io.IOException;
@@ -92,7 +93,7 @@ public class StateManager {
/** a set of ranges that should be periodically updated with latest information, if null all tables should be warmed up */
private Set<Range> rangesToWarmup;
/** map of transactions that have already been applied/updated in this sites SQL db */
- private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
+ private Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied;
private OwnershipAndCheckpoint ownAndCheck;
private Thread txDaemon ;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index e87f7e4..20e1d5d 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -57,6 +57,7 @@ import org.onap.music.main.ResultType;
import org.onap.music.main.ReturnType;
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.MDBCUtils;
+import org.onap.music.mdbc.MdbcConnection;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.StateManager;
import org.onap.music.mdbc.TableInfo;
@@ -1123,22 +1124,19 @@ public class MusicMixin implements MusicInterface {
* Build a preparedQueryObject that appends a transaction to the mriTable
* @param mriTable
* @param uuid
- * @param table
* @param redoUuid
* @return
*/
- private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, String table, UUID redoUuid){
+ private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, UUID redoUuid){
PreparedQueryObject query = new PreparedQueryObject();
StringBuilder appendBuilder = new StringBuilder();
appendBuilder.append("UPDATE ")
.append(music_ns)
.append(".")
.append(mriTable)
- .append(" SET txredolog = txredolog +[('")
- .append(table)
- .append("',")
+ .append(" SET txredolog = txredolog +[")
.append(redoUuid)
- .append(")] WHERE rangeid = ")
+ .append("] WHERE rangeid = ")
.append(uuid)
.append(";");
query.appendQueryString(appendBuilder.toString());
@@ -1342,8 +1340,7 @@ public class MusicMixin implements MusicInterface {
};
Callable<Boolean> appendCallable=()-> {
try {
- appendToRedoLog(music_ns, mriIndex, digestId.transactionId, lockId, musicTxDigestTableName,
- musicRangeInformationTableName);
+ appendToRedoLog(music_ns, mriIndex, digestId.transactionId, lockId, musicRangeInformationTableName);
return true;
} catch (MDBCServiceException e) {
logger.error(EELFLoggerDelegate.errorLogger, "Error creating and pushing tx digest to music",e);
@@ -1369,20 +1366,12 @@ public class MusicMixin implements MusicInterface {
if (progressKeeper != null) {
progressKeeper.setRecordId(txId, digestId);
}
+
Set<Range> ranges = partition.getSnapshot();
+
+ Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied();
for(Range r : ranges) {
- Map<Range, Pair<MriReference, Integer>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied();
- if(!alreadyApplied.containsKey(r)){
- throw new MDBCServiceException("already applied data structure was not updated correctly and range "
- +r+" is not contained");
- }
- Pair<MriReference, Integer> rowAndIndex = alreadyApplied.get(r);
- MriReference key = rowAndIndex.getKey();
- if(!mriIndex.equals(key.index)){
- throw new MDBCServiceException("already applied data structure was not updated correctly and range "+
- r+" is not pointing to row: "+mriIndex.toString());
- }
- alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), rowAndIndex.getValue()+1));
+ alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), digestId));
}
}
@@ -1482,13 +1471,11 @@ public class MusicMixin implements MusicInterface {
static public MusicRangeInformationRow getMRIRowFromCassandraRow(Row newRow){
UUID partitionIndex = newRow.getUUID("rangeid");
- List<TupleValue> log = newRow.getList("txredolog",TupleValue.class);
+ List<UUID> log = newRow.getList("txredolog",UUID.class);
List<MusicTxDigestId> digestIds = new ArrayList<>();
int index=0;
- for(TupleValue t: log){
- //final String tableName = t.getString(0);
- final UUID id = t.getUUID(1);
- digestIds.add(new MusicTxDigestId(partitionIndex,id,index++));
+ for(UUID u: log){
+ digestIds.add(new MusicTxDigestId(partitionIndex,u,index++));
}
Set<Range> partitions = new HashSet<>();
Set<String> tables = newRow.getSet("keys",String.class);
@@ -1569,7 +1556,7 @@ public class MusicMixin implements MusicInterface {
fields.append("prevmrirows set<uuid>, ");
fields.append("islatest boolean, ");
//TODO: Frozen is only needed for old versions of cassandra, please update correspondingly
- fields.append("txredolog list<frozen<tuple<text,uuid>>> ");
+ fields.append("txredolog list<uuid> ");
String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));",
namespace, tableName, fields, priKey);
try {
@@ -1703,15 +1690,12 @@ public class MusicMixin implements MusicInterface {
@Override
public void appendToRedoLog(UUID MRIIndex, String lockId, MusicTxDigestId newRecord) throws MDBCServiceException {
logger.debug("Appending to redo log for partition " + MRIIndex + " txId=" + newRecord.transactionId);
- appendToRedoLog(music_ns,MRIIndex,newRecord.transactionId,lockId,musicTxDigestTableName,
- musicRangeInformationTableName);
+ appendToRedoLog(music_ns,MRIIndex,newRecord.transactionId,lockId,musicRangeInformationTableName);
}
- public void appendToRedoLog(String musicNamespace, UUID MRIIndex, UUID transactionId, String lockId,
- String musicTxDigestTableName, String musicRangeInformationTableName)
+ public void appendToRedoLog(String musicNamespace, UUID MRIIndex, UUID transactionId, String lockId, String musicRangeInformationTableName)
throws MDBCServiceException{
- PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MRIIndex,
- musicTxDigestTableName, transactionId);
+ PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MRIIndex, transactionId);
ReturnType returnType = MusicCore.criticalPut(musicNamespace, musicRangeInformationTableName, MRIIndex.toString(),
appendQuery, lockId, null);
//returnType.getExecutionInfo()
@@ -2226,13 +2210,15 @@ public class MusicMixin implements MusicInterface {
changeIsLatestToMRI(partition.getMRIIndex(), false, partition.getLockId());
- Map<Range, Pair<MriReference, Integer>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied();
+ /*
+ Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied();
for (Range range: rangesUsed) {
alreadyApplied.put(range, Pair.of(new MriReference(usedRow.getPartitionIndex()), -1));
}
for (Range range: rangesNotUsed) {
alreadyApplied.put(range, Pair.of(new MriReference(unusedRow.getPartitionIndex()), -1));
}
+ */
//release/update old partition info
relinquish(unusedRow.getDBPartition());
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
index 3af6f0f..2c501dc 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
@@ -247,7 +247,10 @@ public class MySQLMixin implements DBInterface {
logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set);
Set<Range> rangeSet = new HashSet<>();
for (String table : set) {
- rangeSet.add(new Range(table));
+ if (getReservedTblNames().contains(table)) {
+ // Don't create triggers for the table the triggers write into!!!
+ rangeSet.add(new Range(table));
+ }
}
return rangeSet;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
index 9d1685c..142cb34 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
@@ -32,6 +32,7 @@ import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MriRowComparator;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
public class Dag {
@@ -145,7 +146,6 @@ public class Dag {
if(!readyInit){
initApplyDatastructures();
}
- Set<Range> rangesSet = new HashSet<>(ranges);
while(!toApplyNodes.isEmpty()){
DagNode nextNode = toApplyNodes.poll();
List<DagNode> outgoing = nextNode.getOutgoingEdges();
@@ -155,7 +155,7 @@ public class Dag {
toApplyNodes.add(out);
}
}
- if(!nextNode.wasApplied(rangesSet)){
+ if(!nextNode.wasApplied(ranges)){
return nextNode;
}
}
@@ -233,23 +233,23 @@ public class Dag {
return toApplyNodes.isEmpty();
}
- public void setAlreadyApplied(Map<Range, Pair<MriReference,Integer>> alreadyApplied, Set<Range> ranges)
+ public void setAlreadyApplied(Map<Range, Pair<MriReference,MusicTxDigestId>> alreadyApplied, Set<Range> ranges)
throws MDBCServiceException {
- for(Map.Entry<UUID,DagNode> node : nodes.entrySet()){
+ for (DagNode node: nodes.values()) {
Set<Range> intersection = new HashSet<>(ranges);
- intersection.retainAll(node.getValue().getRangeSet());
+ intersection.retainAll(node.getRangeSet());
for(Range r : intersection){
if(alreadyApplied.containsKey(r)){
- final Pair<MriReference, Integer> appliedPair = alreadyApplied.get(r);
+ final Pair<MriReference, MusicTxDigestId> appliedPair = alreadyApplied.get(r);
final MriReference appliedRow = appliedPair.getKey();
- final int index = appliedPair.getValue();
+ final int index = appliedPair.getValue().index;
final long appliedTimestamp = appliedRow.getTimestamp();
- final long nodeTimestamp = node.getValue().getTimestamp();
+ final long nodeTimestamp = node.getTimestamp();
if(appliedTimestamp > nodeTimestamp){
- setReady(node.getValue(),r);
+ setReady(node,r);
}
else if(appliedTimestamp == nodeTimestamp){
- setPartiallyReady(node.getValue(),r,index);
+ setPartiallyReady(node,r,index);
}
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java
index 78c68e1..5e4c899 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java
@@ -30,6 +30,7 @@ import java.util.UUID;
import org.apache.commons.lang3.tuple.Pair;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.MusicTxDigestId;
@@ -73,6 +74,10 @@ public class DagNode {
return owned;
}
+ /**
+ *
+ * @return the row's MRI Index represented by this dagnode
+ */
public UUID getId(){
return row.getPartitionIndex();
}
@@ -149,20 +154,25 @@ public class DagNode {
currentIndex = currentIndex+1;
}
- public synchronized Pair<MusicTxDigestId, List<Range>> nextNotAppliedTransaction(Set<Range> ranges){
+ /**
+ *
+ * @param ranges
+ * @return the index of the next transaction to replay and the ranges needed for this transaction
+ */
+ public synchronized Pair<MusicTxDigestId, Set<Range>> nextNotAppliedTransaction(Set<Range> ranges){
if(row.getRedoLog().isEmpty()) return null;
if(!applyInit){
initializeApply(ranges);
}
final List<MusicTxDigestId> redoLog = row.getRedoLog();
if(currentIndex < redoLog.size()){
- List<Range> responseRanges= new ArrayList<>();
+ Set<Range> responseRanges= new HashSet<>();
startIndex.forEach((r, index) -> {
if(index < currentIndex){
responseRanges.add(r);
}
});
- return Pair.of(redoLog.get(currentIndex++),responseRanges);
+ return Pair.of(row.getRedoLog().get(currentIndex++),responseRanges);
}
return null;
}
@@ -179,7 +189,7 @@ public class DagNode {
if(row.getRedoLog().isEmpty()) return true;
if(!applyInit){
initializeApply(ranges);
- }
+ }
return currentIndex >= row.getRedoLog().size();
}
@@ -194,11 +204,13 @@ public class DagNode {
if(o == null) return false;
if(!(o instanceof DagNode)) return false;
DagNode other = (DagNode) o;
- return other.row.getPartitionIndex().equals(this.row.getPartitionIndex());
+ return other.row.equals(this.row);
}
@Override
public int hashCode(){
return row.getPartitionIndex().hashCode();
}
+
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
index c95644b..00180a0 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
@@ -45,8 +45,7 @@ public class OwnershipAndCheckpoint{
private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(OwnershipAndCheckpoint.class);
private Lock checkpointLock;
- private AtomicBoolean change;
- private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
+ private Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied;
private Map<UUID,Long> ownershipBeginTime;
private long timeoutInMs;
@@ -54,8 +53,7 @@ public class OwnershipAndCheckpoint{
this(new HashMap<>(),Long.MAX_VALUE);
}
- public OwnershipAndCheckpoint(Map<Range, Pair<MriReference, Integer>> alreadyApplied, long timeoutInMs){
- change = new AtomicBoolean(true);
+ public OwnershipAndCheckpoint(Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied, long timeoutInMs){
checkpointLock = new ReentrantLock();
this.alreadyApplied = alreadyApplied;
ownershipBeginTime = new HashMap<>();
@@ -130,20 +128,17 @@ public class OwnershipAndCheckpoint{
* @param di
* @param extendedDag
* @param ranges
- * @param locks
* @param ownOpId
* @throws MDBCServiceException
*/
- public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, Set<Range> ranges,
- Map<MusicRangeInformationRow, LockResult> locks, UUID ownOpId) throws MDBCServiceException {
+ public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, Set<Range> ranges, UUID ownOpId)
+ throws MDBCServiceException {
if(ranges.isEmpty()){
return;
}
try {
checkpointLock.lock();
- change.set(true);
- Set<Range> rangesSet = new HashSet<>(ranges);
- extendedDag.setAlreadyApplied(alreadyApplied, rangesSet);
+ extendedDag.setAlreadyApplied(alreadyApplied, ranges);
applyRequiredChanges(mi, di, extendedDag, ranges, ownOpId);
}
catch(MDBCServiceException e){
@@ -163,18 +158,18 @@ public class OwnershipAndCheckpoint{
}
}
- private void disableForeignKeys(DBInterface di) throws MDBCServiceException {
+ private void disableForeignKeys(DBInterface dbi) throws MDBCServiceException {
try {
- di.disableForeignKeyChecks();
+ dbi.disableForeignKeyChecks();
} catch (SQLException e) {
throw new MDBCServiceException("Error disable foreign keys checks",e);
}
}
- private void applyTxDigest(DBInterface di, StagingTable txDigest)
+ private void applyTxDigest(DBInterface dbi, StagingTable txDigest)
throws MDBCServiceException {
try {
- di.applyTxDigest(txDigest);
+ dbi.applyTxDigest(txDigest);
} catch (SQLException e) {
throw new MDBCServiceException("Error applying tx digest in local SQL",e);
}
@@ -191,39 +186,28 @@ public class OwnershipAndCheckpoint{
if(rangesToWarmup.isEmpty()){
return;
}
- boolean ready = false;
- change.set(true);
- Set<Range> rangeSet = new HashSet<Range>(rangesToWarmup);
Dag dag = new Dag(false);
- while(!ready){
- if(change.get()){
- change.set(false);
- final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, rangesToWarmup,false);
- dag = Dag.getDag(rows,rangesToWarmup);
- }
- else if(!dag.applied()){
- DagNode node = dag.nextToApply(rangesToWarmup);
- if(node!=null) {
- Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet);
- while (pair != null) {
+ final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, rangesToWarmup,false);
+ dag = Dag.getDag(rows,rangesToWarmup);
+ dag.setAlreadyApplied(alreadyApplied, rangesToWarmup);
+ while(!dag.applied()){
+ DagNode node = dag.nextToApply(rangesToWarmup);
+ if(node!=null) {
+ Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(rangesToWarmup);
+ while (pair != null) {
+ checkpointLock.lock();
+ try {
disableForeignKeys(di);
- checkpointLock.lock();
- if (change.get()) {
- enableForeignKeys(di);
- checkpointLock.unlock();
- break;
- } else {
- applyDigestAndUpdateDataStructures(mi, di, node, pair);
- }
- pair = node.nextNotAppliedTransaction(rangeSet);
+ applyDigestAndUpdateDataStructures(mi, di, node, pair.getLeft(), pair.getRight());
+ pair = node.nextNotAppliedTransaction(rangesToWarmup);
enableForeignKeys(di);
+ } catch (MDBCServiceException e) {
checkpointLock.unlock();
+ throw e;
}
+ checkpointLock.unlock();
}
}
- else{
- ready = true;
- }
}
}
@@ -235,25 +219,54 @@ public class OwnershipAndCheckpoint{
* @param pair
* @throws MDBCServiceException
*/
- private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface di, DagNode node,
- Pair<MusicTxDigestId, List<Range>> pair) throws MDBCServiceException {
+ private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface dbi, DagNode node,
+ MusicTxDigestId digestId, Set<Range> ranges) throws MDBCServiceException {
+ if (alreadyReplayed(node, digestId)) {
+ return;
+ }
+
final StagingTable txDigest;
try {
- txDigest = mi.getTxDigest(pair.getKey());
+ txDigest = mi.getTxDigest(digestId);
} catch (MDBCServiceException e) {
logger.warn("Transaction digest was not found, this could be caused by a failure of the previous owner"
+"And would normally only happen as the last ID of the corresponding redo log. Please check that this is the"
- +" case for txID "+pair.getKey().transactionId.toString());
+ +" case for txID "+digestId.transactionId.toString());
return;
}
- applyTxDigest(di, txDigest);
- for (Range r : pair.getValue()) {
+ applyTxDigest(dbi, txDigest);
+ for (Range r : ranges) {
MusicRangeInformationRow row = node.getRow();
- alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index));
+ alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), digestId));
- updateCheckpointLocations(mi, di, r, row.getPartitionIndex(), pair.getKey().index);
+ updateCheckpointLocations(mi, dbi, r, row.getPartitionIndex(), digestId);
}
}
+
+ /**
+ * Determine if this musictxdigest id has already been replayed
+ * @param node
+ * @param redoLogIndex
+ * @return true if alreadyApplied is past this node/redolog, false if it hasn't been replayed
+ */
+ public boolean alreadyReplayed(DagNode node, MusicTxDigestId txdigest) {
+ int index = node.getRow().getRedoLog().indexOf(txdigest);
+ for (Range range: node.getRangeSet()) {
+ Pair<MriReference, MusicTxDigestId> applied = alreadyApplied.get(range);
+ if (applied==null) {
+ return false;
+ }
+ MriReference appliedMriRef = applied.getLeft();
+ MusicTxDigestId appliedDigest = applied.getRight();
+ int appliedIndex = node.getRow().getRedoLog().indexOf(appliedDigest);
+ if (appliedMriRef==null || appliedMriRef.getTimestamp() < node.getTimestamp()
+ || (appliedMriRef.getTimestamp() == node.getTimestamp()
+ && appliedIndex < index)) {
+ return false;
+ }
+ }
+ return true;
+ }
/**
* Update external checkpoint markers in sql db and music
@@ -263,9 +276,9 @@ public class OwnershipAndCheckpoint{
* @param partitionIndex
* @param index
*/
- private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, int index) {
- dbi.updateCheckpointLocations(r, Pair.of(partitionIndex, index));
- mi.updateCheckpointLocations(r, Pair.of(partitionIndex, index));
+ private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, MusicTxDigestId txdigest) {
+ dbi.updateCheckpointLocations(r, Pair.of(partitionIndex, txdigest.index));
+ mi.updateCheckpointLocations(r, Pair.of(partitionIndex, txdigest.index));
}
/**
@@ -279,15 +292,14 @@ public class OwnershipAndCheckpoint{
*/
private void applyRequiredChanges(MusicInterface mi, DBInterface db, Dag extendedDag, Set<Range> ranges, UUID ownOpId)
throws MDBCServiceException {
- Set<Range> rangeSet = new HashSet<Range>(ranges);
disableForeignKeys(db);
while(!extendedDag.applied()){
DagNode node = extendedDag.nextToApply(ranges);
if(node!=null) {
- Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet);
+ Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(ranges);
while (pair != null) {
- applyDigestAndUpdateDataStructures(mi, db, node, pair);
- pair = node.nextNotAppliedTransaction(rangeSet);
+ applyDigestAndUpdateDataStructures(mi, db, node, pair.getLeft(), pair.getRight());
+ pair = node.nextNotAppliedTransaction(ranges);
if (timeout(ownOpId)) {
enableForeignKeys(db);
throw new MDBCServiceException("Timeout apply changes to local dbi");
@@ -346,7 +358,7 @@ public class OwnershipAndCheckpoint{
}
Set<Range> allRanges = currentlyOwn.getAllRanges();
//TODO: we shouldn't need to go back to music at this point
- List<MusicRangeInformationRow> latestRows = extractRowsForRange(mi, new HashSet<>(allRanges), true);
+ List<MusicRangeInformationRow> latestRows = extractRowsForRange(mi, allRanges, true);
currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,latestRows));
return mi.mergeLatestRowsIfNecessary(currentlyOwn,locksForOwnership,opId);
}
@@ -462,15 +474,6 @@ public class OwnershipAndCheckpoint{
}
-
- public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException {
- Set<Range> snapshot = partition.getSnapshot();
- UUID row = partition.getMRIIndex();
- for(Range r : snapshot){
- alreadyApplied.put(r,Pair.of(new MriReference(row),-1));
- }
- }
-
// \TODO merge with dag code
private Map<Range,Set<DagNode>> getIsLatestPerRange(Dag dag, List<MusicRangeInformationRow> rows) throws MDBCServiceException {
Map<Range,Set<DagNode>> rowsPerLatestRange = new HashMap<>();
@@ -495,7 +498,7 @@ public class OwnershipAndCheckpoint{
}
- public Map<Range, Pair<MriReference, Integer>> getAlreadyApplied() {
+ public Map<Range, Pair<MriReference, MusicTxDigestId>> getAlreadyApplied() {
return this.alreadyApplied;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java
index 8aad335..9383ac5 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java
@@ -30,4 +30,7 @@ public final class MriReference {
public long getTimestamp() { return index.timestamp();}
+ public String toString() {
+ return index.toString();
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java
index de711ef..8c95047 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java
@@ -96,7 +96,7 @@ public final class MusicRangeInformationRow implements Comparable<MusicRangeInfo
if(o == null) return false;
if(!(o instanceof MusicRangeInformationRow)) return false;
MusicRangeInformationRow other = (MusicRangeInformationRow) o;
- return other.getPartitionIndex().equals(this.getPartitionIndex());
+ return other.getPartitionIndex().equals(this.getPartitionIndex()) && other.getRedoLog().equals(this.getRedoLog());
}
@Override
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
index db9e455..8544b47 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
@@ -55,4 +55,8 @@ public final class MusicTxDigestId {
public int hashCode(){
return transactionId.hashCode();
}
+
+ public String toString() {
+ return this.transactionId.toString();
+ }
}
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
index ee50dca..afe378e 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
@@ -170,7 +170,7 @@ public class DagTest {
HashSet<Range> rangesSet = new HashSet<>(ranges);
while(!dag.applied()){
DagNode node = dag.nextToApply(ranges);
- Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangesSet);
+ Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(rangesSet);
int transactionCounter = 0;
while(pair!=null) {
assertNotEquals(1,transactionCounter);
@@ -178,9 +178,10 @@ public class DagTest {
MusicTxDigestId id = row.getRedoLog().get(transactionCounter);
assertEquals(id,pair.getKey());
assertEquals(0,pair.getKey().index);
- List<Range> value = pair.getValue();
+ Set<Range> value = pair.getValue();
assertEquals(1,value.size());
- assertEquals(new Range("schema.range1"),value.get(0));
+ assertTrue(value.contains(new Range("schema.range1")));
+ //assertEquals(new Range("schema.range1"),value.get(0));
pair = node.nextNotAppliedTransaction(rangesSet);
transactionCounter++;
}
@@ -192,7 +193,7 @@ public class DagTest {
@Test
public void nextToApply2() throws InterruptedException, MDBCServiceException {
- Map<Range, Pair<MriReference, Integer>> alreadyApplied = new HashMap<>();
+ Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = new HashMap<>();
List<MusicRangeInformationRow> rows = new ArrayList<>();
Set<Range> ranges = new HashSet<>( Arrays.asList(
new Range("schema.range1")
@@ -207,7 +208,7 @@ public class DagTest {
new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),1)
));
MusicRangeInformationRow newRow = createNewRow(new HashSet<>(ranges), "", false, redo2);
- alreadyApplied.put(new Range("schema.range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0));
+ alreadyApplied.put(new Range("schema.range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), redo1.get(0)));
rows.add(newRow);
MILLISECONDS.sleep(10);
List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList(
@@ -220,7 +221,7 @@ public class DagTest {
int nodeCounter = 1;
while(!dag.applied()){
DagNode node = dag.nextToApply(ranges);
- Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangesSet);
+ Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(rangesSet);
int transactionCounter = 0;
while(pair!=null) {
assertNotEquals(1,transactionCounter);
@@ -228,9 +229,10 @@ public class DagTest {
MusicTxDigestId id = row.getRedoLog().get(2-nodeCounter);
assertEquals(id,pair.getKey());
assertEquals(2-nodeCounter,pair.getKey().index);
- List<Range> value = pair.getValue();
+ Set<Range> value = pair.getValue();
assertEquals(1,value.size());
- assertEquals(new Range("schema.range1"),value.get(0));
+ assertTrue(value.contains(new Range("schema.range1")));
+ //assertEquals(new Range("schema.range1"),value.get(0));
pair = node.nextNotAppliedTransaction(rangesSet);
transactionCounter++;
}
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java
index 2443d1e..1c9eb11 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java
@@ -151,7 +151,6 @@ public class OwnershipAndCheckpointTest {
String sqlOperation = "INSERT INTO "+TABLE+" (PersonID,LastName,FirstName,Address,City) VALUES "+
"(1,'SAUREZ','ENRIQUE','GATECH','ATLANTA');";
StagingTable stagingTable = new StagingTable();
- ownAndCheck.reloadAlreadyApplied(partition);
final Statement executeStatement = this.conn.createStatement();
executeStatement.execute(sqlOperation);
this.conn.commit();
@@ -224,7 +223,7 @@ public class OwnershipAndCheckpointTest {
locks.put(own.getDag().getNode(own.getRangeId()).getRow(),
new LockResult(own.getRangeId(), own.getOwnerId(), true,
ranges));
- ownAndCheck.checkpoint(musicMixin, mysqlMixin, own.getDag(), ranges, locks, ownOpId);
+ ownAndCheck.checkpoint(musicMixin, mysqlMixin, own.getDag(), ranges, ownOpId);
}
checkData();