diff options
7 files changed, 126 insertions, 127 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java index e5a3252..0693a97 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java @@ -45,10 +45,10 @@ public class TestUtils { List<Range> ranges = new ArrayList<>(); ranges.add(range); DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null); - MusicRangeInformationRow newRow = new MusicRangeInformationRow(uuid,dbPartition, new ArrayList<>(), "", - mdbcServerName, true); + new MusicRangeInformationRow(dbPartition, new ArrayList<>(), true); + MusicRangeInformationRow newRow = new MusicRangeInformationRow(dbPartition, new ArrayList<>(), true); DatabasePartition partition=null; - partition = mixin.createMusicRangeInformation(newRow); + partition = mixin.createLockedMRIRow(newRow); return partition; } @@ -76,7 +76,7 @@ public class TestUtils { public static HashSet<String> getMriColNames(){ return new HashSet<>( - Arrays.asList("rangeid","keys","txredolog","ownerid","metricprocessid") + Arrays.asList("rangeid","keys","txredolog","prevmrirows") ); } @@ -99,8 +99,7 @@ public class TestUtils { throw new Exception("Codec registry for cluster is invalid"); } expectedTypes.put("txredolog",DataType.list(TupleType.of(currentVer,registry,DataType.text(),DataType.uuid()))); - expectedTypes.put("ownerid",DataType.text()); - expectedTypes.put("metricprocessid",DataType.text()); + expectedTypes.put("prevmrirow",DataType.set(DataType.uuid())); return expectedTypes; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java index a05e583..8e3f20c 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java @@ -34,7 +34,7 @@ public class LockResult{ private long backOffPeriodms; public LockResult(boolean succesful, UUID rowId, String lockId, boolean newLock, List<Range> ranges){ - this.successful = true; + this.successful = succesful; this.musicRangeInformationIndex = rowId; this.lockId=lockId; this.newLock=newLock; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java index 3d0cc0a..4ae4413 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java @@ -211,12 +211,12 @@ public interface MusicInterface { List<Range> getRangeDependencies(List<Range> range) throws MDBCServiceException; /** - * This function is used to create a new row in the MRI table + * This function is used to create a new locked row in the MRI table * @param info the information used to create the row * @return the new partition object that contain the new information used to create the row * @throws MDBCServiceException */ - DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException; + DatabasePartition createLockedMRIRow(MusicRangeInformationRow info) throws MDBCServiceException; /** * This function is used to create all the required music dependencies diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index f22daa3..1bdb022 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -1486,9 +1486,8 @@ public class MusicMixin implements MusicInterface { for (String table:tables){ partitions.add(new Range(table)); } - return new MusicRangeInformationRow(partitionIndex, new DatabasePartition(partitions, partitionIndex, ""), - digestIds, newRow.getString("ownerid"),newRow.getString("metricprocessid"), - newRow.getBool("islatest")); + return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex, ""), + digestIds, newRow.getBool("islatest"), newRow.getSet("prevmrirows", UUID.class)); } public RangeDependency getRangeDependenciesFromCassandraRow(Row newRow){ @@ -1558,9 +1557,8 @@ public class MusicMixin implements MusicInterface { StringBuilder fields = new StringBuilder(); fields.append("rangeid uuid, "); fields.append("keys set<text>, "); - fields.append("ownerid text, "); + fields.append("prevmrirows set<uuid>, "); fields.append("islatest boolean, "); - fields.append("metricprocessid text, "); //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly fields.append("txredolog list<frozen<tuple<text,uuid>>> "); String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", @@ -1575,7 +1573,7 @@ public class MusicMixin implements MusicInterface { @Override - public DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException { + public DatabasePartition createLockedMRIRow(MusicRangeInformationRow info) throws MDBCServiceException { DatabasePartition newPartition = info.getDBPartition(); String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+newPartition.getMRIIndex().toString(); @@ -1590,9 +1588,9 @@ public class MusicMixin implements MusicInterface { "for key "+fullyQualifiedMriKey) ; } logger.info("Creating MRI " + newPartition.getMRIIndex() + " for ranges " + newPartition.getSnapshot()); - createEmptyMriRow(this.music_ns,this.musicRangeInformationTableName,newPartition.getMRIIndex(),info.getMetricProcessId(), - lockId, newPartition.getSnapshot(),info.getIsLatest()); - info.setOwnerId(lockId); + newPartition.setLockId(lockId); + + createEmptyMriRow(info); return newPartition; } @@ -1620,54 +1618,49 @@ public class MusicMixin implements MusicInterface { MusicCore.eventualPut(query); } - - private UUID createEmptyMriRow(List<Range> rangesCopy) { - //TODO: THis should call one of the other createMRIRows - UUID id = generateUniqueKey(); - StringBuilder insert = new StringBuilder("INSERT INTO ") - .append(this.music_ns) - .append('.') - .append(this.musicRangeInformationTableName) - .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ") - .append("(") - .append(id) - .append(",{"); - boolean first=true; - for (Range r: rangesCopy) { - if(first){ first=false; } - else { - insert.append(','); - } - insert.append("'").append(r.toString()).append("'"); - } - insert.append("},'") - .append("") - .append("','") - .append("") - .append("',[]);"); - PreparedQueryObject query = new PreparedQueryObject(); - query.appendQueryString(insert.toString()); - MusicCore.eventualPut(query); - return id; - } - - /** * Creates a new empty MRI row * @param processId id of the process that is going to own initially this. * @return uuid associated to the new row */ - private UUID createEmptyMriRow(String processId, String lockId, List<Range> ranges) - throws MDBCServiceException { - UUID id = MDBCUtils.generateTimebasedUniqueKey(); - logger.info("Creating MRI "+ id + " for ranges " + ranges); - return createEmptyMriRow(this.music_ns,this.musicRangeInformationTableName,id,processId,lockId,ranges,true); + public void createEmptyMriRow(MusicRangeInformationRow rowToCreate) throws MDBCServiceException { + StringBuilder insert = new StringBuilder("INSERT INTO ") + .append(this.music_ns) + .append('.') + .append(this.musicRangeInformationTableName) + .append(" (rangeid,keys,islatest,prevmrirows,txredolog) VALUES ") + .append("(") + .append(rowToCreate.getPartitionIndex()) + .append(",{"); + String sep = ""; + for (Range r: rowToCreate.getDBPartition().getSnapshot()) { + insert.append(sep).append("'").append(r.toString()).append("'"); + sep = ","; + } + insert.append("},").append(rowToCreate.getIsLatest()) + .append(",{"); + sep = ""; + for (UUID prevIndex: rowToCreate.getPrevRowIndexes()) { + insert.append(sep).append(prevIndex); + sep = ","; + } + insert.append("},[]);"); + PreparedQueryObject query = new PreparedQueryObject(); + query.appendQueryString(insert.toString()); + try { + executeMusicLockedPut(this.music_ns,this.musicRangeInformationTableName, + rowToCreate.getPartitionIndex().toString(),query, + rowToCreate.getDBPartition().getLockId(),null); + } catch (MDBCServiceException e) { + throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e); + } } - + /** * Creates a new empty MRI row * @param processId id of the process that is going to own initially this. * @return uuid associated to the new row + * @deprecated */ public static UUID createEmptyMriRow(String musicNamespace, String mriTableName, UUID id, String processId, String lockId, List<Range> ranges, boolean isLatest) @@ -1676,25 +1669,18 @@ public class MusicMixin implements MusicInterface { .append(musicNamespace) .append('.') .append(mriTableName) - .append(" (rangeid,keys,ownerid,islatest,metricprocessid,txredolog) VALUES ") + .append(" (rangeid,keys,islatest,prevmrirows,txredolog) VALUES ") .append("(") .append(id) .append(",{"); - boolean first=true; + String sep = ""; for (Range r: ranges) { - if(first){ first=false; } - else { - insert.append(','); - } - insert.append("'").append(r.toString()).append("'"); + insert.append(sep).append("'").append(r.toString()).append("'"); + sep = ","; } - insert.append("},'") - .append((lockId==null)?"":lockId) - .append("',") - .append(isLatest) - .append(",'") - .append(processId) - .append("',[]);"); + insert.append("},").append(isLatest) + .append(",{"); + insert.append("},[]);"); PreparedQueryObject query = new PreparedQueryObject(); query.appendQueryString(insert.toString()); try { @@ -2115,8 +2101,8 @@ public class MusicMixin implements MusicInterface { rangesAndDependents.getValue()==null || rangesAndDependents.getValue().size() == 0){ return; } - MusicRangeInformationRow r = createAndAssignLock(rangesAndDependents.getKey()); - locks.put(r.getPartitionIndex(),new LockResult(r.getPartitionIndex(),r.getOwnerId(),true,rangesAndDependents.getKey())); + MusicRangeInformationRow r = createAndAssignLock(rangesAndDependents.getKey(), rows); + locks.put(r.getPartitionIndex(),new LockResult(r.getPartitionIndex(),r.getDBPartition().getLockId(),true,rangesAndDependents.getKey())); latestDag.addNewNode(r,new ArrayList<>(rangesAndDependents.getValue())); } @@ -2133,15 +2119,28 @@ public class MusicMixin implements MusicInterface { return returnInfo; } - private MusicRangeInformationRow createAndAssignLock(List<Range> ranges) throws MDBCServiceException { + private MusicRangeInformationRow createAndAssignLock(List<Range> ranges, List<MusicRangeInformationRow> latestRows) throws MDBCServiceException { UUID newUUID = MDBCUtils.generateTimebasedUniqueKey(); DatabasePartition newPartition = new DatabasePartition(ranges,newUUID,null); - MusicRangeInformationRow newRow = new MusicRangeInformationRow(newUUID,newPartition,new ArrayList<>(), - null,getMyHostId(),true); - createMusicRangeInformation(newRow); + MusicRangeInformationRow newRow = new MusicRangeInformationRow(newPartition,new ArrayList<>(), + true, extractPreviousPartitions(latestRows)); + createLockedMRIRow(newRow); return newRow; } + /** + * Create a set of previous partitions to their uuids + * @param latestRows + * @return + */ + private Set<UUID> extractPreviousPartitions(List<MusicRangeInformationRow> latestRows) { + Set<UUID> prevMRIRow = new HashSet<>(); + for (MusicRangeInformationRow mriRow: latestRows) { + prevMRIRow.add(mriRow.getPartitionIndex()); + } + return prevMRIRow; + } + @Override public OwnershipReturn mergeLatestRowsIfNecessary(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges, Map<UUID, LockResult> locks, UUID ownershipId) throws MDBCServiceException { @@ -2157,16 +2156,16 @@ public class MusicMixin implements MusicInterface { } List<MusicRangeInformationRow> changed = setReadOnlyAnyDoubleRow(extendedDag, latestRows,locks); releaseLocks(changed, locks); - MusicRangeInformationRow row = createAndAssignLock(ranges); + MusicRangeInformationRow row = createAndAssignLock(ranges, latestRows); latestRows.add(row); - locks.put(row.getPartitionIndex(),new LockResult(row.getPartitionIndex(),row.getOwnerId(),true,ranges)); + locks.put(row.getPartitionIndex(),new LockResult(row.getPartitionIndex(),row.getDBPartition().getLockId(),true,ranges)); extendedDag.addNewNodeWithSearch(row,ranges); Pair<List<Range>, Set<DagNode>> missing = extendedDag.getIncompleteRangesAndDependents(); if(missing.getKey().size()!=0 && missing.getValue().size()!=0) { - MusicRangeInformationRow newRow = createAndAssignLock(missing.getKey()); + MusicRangeInformationRow newRow = createAndAssignLock(missing.getKey(), latestRows); latestRows.add(newRow); - locks.put(newRow.getPartitionIndex(), new LockResult(newRow.getPartitionIndex(), newRow.getOwnerId(), true, - missing.getKey())); + locks.put(newRow.getPartitionIndex(), new LockResult(newRow.getPartitionIndex(), row.getDBPartition().getLockId(), + true, missing.getKey())); extendedDag.addNewNode(newRow, new ArrayList<>(missing.getValue())); } changed = setReadOnlyAnyDoubleRow(extendedDag, latestRows,locks); @@ -2271,25 +2270,29 @@ public class MusicMixin implements MusicInterface { private static Row executeMusicLockedGet(String keyspace, String table, PreparedQueryObject cqlObject, String primaryKey, String lock) throws MDBCServiceException{ - ResultSet result; - if(lock != null && !lock.isEmpty()) { - try { - result = MusicCore.criticalGet(keyspace, table, primaryKey, cqlObject, lock); - } catch (MusicServiceException e) { - e.printStackTrace(); - throw new MDBCServiceException("Error executing critical get", e); + ResultSet result = null; + int triesRemaining = 3; + while (result==null && triesRemaining>0) { + if(lock != null && !lock.isEmpty()) { + try { + result = MusicCore.criticalGet(keyspace, table, primaryKey, cqlObject, lock); + } catch (MusicServiceException e) { + e.printStackTrace(); + throw new MDBCServiceException("Error executing critical get", e); + } } - } - else{ - try { - result = MusicCore.atomicGet(keyspace,table,primaryKey,cqlObject); - } catch (MusicServiceException|MusicLockingException|MusicQueryException e) { - e.printStackTrace(); - throw new MDBCServiceException("Error executing atomic get", e); + else{ + try { + result = MusicCore.atomicGet(keyspace,table,primaryKey,cqlObject); + } catch (MusicServiceException|MusicLockingException|MusicQueryException e) { + e.printStackTrace(); + throw new MDBCServiceException("Error executing atomic get", e); + } } + triesRemaining--; } if(result==null){ - throw new MDBCServiceException("Error executing atomic get for primary key: "+primaryKey); + throw new MDBCServiceException("Error executing atomic get for primary key: " + primaryKey + " and lock: " + lock); } if(result.isExhausted()){ return null; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java index 2c5af2c..6e6ade6 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java @@ -19,28 +19,34 @@ */ package org.onap.music.mdbc.tables; -import java.sql.Timestamp; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.UUID; import org.onap.music.mdbc.DatabasePartition; public final class MusicRangeInformationRow implements Comparable<MusicRangeInformationRow>{ private final DatabasePartition dbPartition; - private final UUID partitionIndex; private final List<MusicTxDigestId> redoLog; - private String ownerId; - private final String metricProcessId; private boolean isLatest; - - public MusicRangeInformationRow (UUID partitionIndex, DatabasePartition dbPartition, List<MusicTxDigestId> redoLog, - String ownerId, String metricProcessId, boolean isLatest) { - this.partitionIndex=partitionIndex; - this.dbPartition = dbPartition; - this.redoLog = redoLog; - this.ownerId = ownerId; - this.metricProcessId = metricProcessId; - this.isLatest = isLatest; + private Set<UUID> prevRowIndexes; + + public MusicRangeInformationRow (DatabasePartition dbPartition, List<MusicTxDigestId> redoLog, + boolean isLatest) { + this.dbPartition = dbPartition; + this.redoLog = redoLog; + this.isLatest = isLatest; + this.prevRowIndexes = new HashSet<>(); + } + + public MusicRangeInformationRow (DatabasePartition dbPartition, List<MusicTxDigestId> redoLog, + boolean isLatest, Set<UUID> prevPartitions) { + this.dbPartition = dbPartition; + this.redoLog = redoLog; + this.isLatest = isLatest; + this.prevRowIndexes = prevPartitions; } public UUID getPartitionIndex() { @@ -59,21 +65,13 @@ public final class MusicRangeInformationRow implements Comparable<MusicRangeInfo return redoLog; } - public String getOwnerId() { - return ownerId; - } - - public String getMetricProcessId() { - return metricProcessId; - } - public long getTimestamp(){ - return partitionIndex.timestamp(); - } - - public void setOwnerId(String newOwnerId){ - this.ownerId=newOwnerId; + return dbPartition.getMRIIndex().timestamp(); } + + public Set<UUID> getPrevRowIndexes() { + return this.prevRowIndexes; + } @Override public int compareTo(MusicRangeInformationRow o) { @@ -95,6 +93,6 @@ public final class MusicRangeInformationRow implements Comparable<MusicRangeInfo @Override public int hashCode(){ - return partitionIndex.hashCode(); + return dbPartition.hashCode(); } } diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java index e1dcc81..f2bbdcd 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java @@ -142,11 +142,10 @@ public class MusicMixinTest { private DatabasePartition addRow(List<Range> ranges,boolean isLatest){ final UUID uuid = MDBCUtils.generateTimebasedUniqueKey(); DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null); - MusicRangeInformationRow newRow = new MusicRangeInformationRow(uuid,dbPartition, new ArrayList<>(), "", - MdbcTestUtils.getServerName(), isLatest); + MusicRangeInformationRow newRow = new MusicRangeInformationRow(dbPartition, new ArrayList<>(), isLatest); DatabasePartition partition=null; try { - partition = mixin.createMusicRangeInformation(newRow); + partition = mixin.createLockedMRIRow(newRow); } catch (MDBCServiceException e) { fail("failure when creating new row"); } diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java index 9e6161a..fa5583c 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java @@ -52,7 +52,7 @@ public class DagTest { List<MusicTxDigestId> redoLog) { UUID id = MDBCUtils.generateTimebasedUniqueKey(); DatabasePartition dbPartition = new DatabasePartition(ranges, id, lockid); - return new MusicRangeInformationRow(id, dbPartition, redoLog, lockid, "id", isLatest); + return new MusicRangeInformationRow(dbPartition, redoLog, isLatest); } @Test |