diff options
Diffstat (limited to 'mdbc-server/src')
5 files changed, 104 insertions, 51 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java index 12c7c29..0793a67 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java @@ -564,6 +564,12 @@ public class MdbcConnection implements Connection { return this.dbi; } + /** + * Take ownership of ranges given, and replay the transactions + * @param ranges + * @return + * @throws MDBCServiceException + */ private DatabasePartition own(List<Range> ranges) throws MDBCServiceException { if(ranges==null||ranges.isEmpty()){ return null; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index cdf0140..8a2ef6f 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -2120,7 +2120,13 @@ public class MusicMixin implements MusicInterface { } } - private List<Range> getExtendedRanges(List<Range> range) throws MDBCServiceException{ + /** + * Get a list of ranges and their range dependencies + * @param range + * @return + * @throws MDBCServiceException + */ + private List<Range> getRangeDependencies(List<Range> range) throws MDBCServiceException{ Set<Range> extendedRange = new HashSet<>(); for(Range r: range){ extendedRange.add(r); @@ -2238,11 +2244,18 @@ public class MusicMixin implements MusicInterface { return rowsPerLatestRange; } + /** + * Take locking ownership of each range + * @param ranges - ranges that need to be owned + * @param partition - current partition owned + * @param opId + */ @Override public OwnershipReturn own(List<Range> ranges, DatabasePartition partition, UUID opId) throws MDBCServiceException { - if(ranges == null || ranges.isEmpty()) + if(ranges == null || ranges.isEmpty()) { return null; + } Map<UUID,LockResult> newLocks = new HashMap<>(); //Init timeout clock @@ -2251,62 +2264,75 @@ public class MusicMixin implements MusicInterface { return new OwnershipReturn(opId,partition.getLockId(),partition.getMRIIndex(),partition.getSnapshot(),null); } //Find - List<Range> extendedRanges = getExtendedRanges(ranges); + List<Range> rangesToOwn = getRangeDependencies(ranges); List<MusicRangeInformationRow> allMriRows = getAllMriRows(); - List<MusicRangeInformationRow> rows = ownAndCheck.getRows(allMriRows,extendedRanges, false); - Dag dag = Dag.getDag(rows,extendedRanges); - Dag prev = new Dag(); - while( (dag.isDifferent(prev) || !prev.isOwned() ) && + List<MusicRangeInformationRow> rows = ownAndCheck.extractRowsForRange(allMriRows,rangesToOwn, false); + Dag toOwn = Dag.getDag(rows,rangesToOwn); + Dag currentlyOwn = new Dag(); + while( (toOwn.isDifferent(currentlyOwn) || !currentlyOwn.isOwned() ) && !ownAndCheck.timeout(opId) ){ - while(dag.hasNextToOwn()){ - DagNode node = dag.nextToOwn(); - MusicRangeInformationRow row = node.getRow(); - UUID uuid = row.getPartitionIndex(); - if(partition.isLocked()&&partition.getMRIIndex().equals(uuid)|| - newLocks.containsKey(uuid) || - !row.getIsLatest()){ - dag.setOwn(node); - } - else{ - LockResult lockResult = null; - boolean owned = false; - while(!owned && !ownAndCheck.timeout(opId)){ - try { - LockRequest request = new LockRequest(musicRangeInformationTableName,uuid, - new ArrayList(node.getRangeSet())); - lockResult = waitForLock(request); - owned = true; - } - catch (MDBCServiceException e){ - logger.warn("Locking failed, retrying",e); - } - } - if(owned){ - dag.setOwn(node); - newLocks.put(uuid,lockResult); - } - else{ - break; - } - } - } - prev=dag; + takeOwnershipOfDag(partition, opId, newLocks, toOwn); + currentlyOwn=toOwn; //TODO instead of comparing dags, compare rows allMriRows = getAllMriRows(); - rows = ownAndCheck.getRows(allMriRows,extendedRanges,false); - dag = Dag.getDag(rows,extendedRanges); + rows = ownAndCheck.extractRowsForRange(allMriRows,rangesToOwn,false); + toOwn = Dag.getDag(rows,rangesToOwn); } - if(!prev.isOwned() || dag.isDifferent(prev)){ + if(!currentlyOwn.isOwned() || toOwn.isDifferent(currentlyOwn)){ releaseLocks(newLocks); ownAndCheck.stopOwnershipTimeoutClock(opId); logger.error("Error when owning a range: Timeout"); throw new MDBCServiceException("Ownership timeout"); } - Set<Range> allRanges = prev.getAllRanges(); - List<MusicRangeInformationRow> isLatestRows = ownAndCheck.getRows(allMriRows, new ArrayList<>(allRanges), true); - prev.setRowsPerLatestRange(getIsLatestPerRange(dag,isLatestRows)); - return mergeLatestRows(prev,rows,ranges,newLocks,opId); + Set<Range> allRanges = currentlyOwn.getAllRanges(); + List<MusicRangeInformationRow> isLatestRows = ownAndCheck.extractRowsForRange(allMriRows, new ArrayList<>(allRanges), true); + currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,isLatestRows)); + return mergeLatestRows(currentlyOwn,rows,ranges,newLocks,opId); + } + + /** + * Step through dag and take lock ownership of each range + * @param partition + * @param opId + * @param newLocks + * @param toOwn + * @throws MDBCServiceException + */ + private void takeOwnershipOfDag(DatabasePartition partition, UUID opId, Map<UUID, LockResult> newLocks, Dag toOwn) + throws MDBCServiceException { + while(toOwn.hasNextToOwn()){ + DagNode node = toOwn.nextToOwn(); + MusicRangeInformationRow row = node.getRow(); + UUID uuid = row.getPartitionIndex(); + if(partition.isLocked()&&partition.getMRIIndex().equals(uuid)|| + newLocks.containsKey(uuid) || + !row.getIsLatest()){ + toOwn.setOwn(node); + } + else{ + LockResult lockResult = null; + boolean owned = false; + while(!owned && !ownAndCheck.timeout(opId)){ + try { + LockRequest request = new LockRequest(musicRangeInformationTableName,uuid, + new ArrayList(node.getRangeSet())); + lockResult = waitForLock(request); + owned = true; + } + catch (MDBCServiceException e){ + logger.warn("Locking failed, retrying",e); + } + } + if(owned){ + toOwn.setOwn(node); + newLocks.put(uuid,lockResult); + } + else{ + break; + } + } + } } /** diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java index 02c5d7b..07a5fe6 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java @@ -65,6 +65,7 @@ public class Dag { private void createDag(List<MusicRangeInformationRow> rows, List<Range> ranges){ this.ranges = new ArrayList<>(ranges); Map<Range,DagNode> latestRow = new HashMap<>(); + //sort to make sure rows are in chronological order Collections.sort(rows, new MriRowComparator()); for(MusicRangeInformationRow row : rows){ if(!nodes.containsKey(row.getPartitionIndex())){ diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java index 6b1e566..f72b0ec 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java @@ -84,7 +84,14 @@ public class OwnershipAndCheckpoint{ return false; } - public List<MusicRangeInformationRow> getRows(List<MusicRangeInformationRow> allMriRows, List<Range> ranges, + /** + * Extracts all the rows that match any of the ranges. + * @param allMriRows + * @param ranges - ranges interested in + * @param onlyIsLatest - only return the "latest" rows + * @return + */ + public List<MusicRangeInformationRow> extractRowsForRange(List<MusicRangeInformationRow> allMriRows, List<Range> ranges, boolean onlyIsLatest){ List<MusicRangeInformationRow> rows = new ArrayList<>(); for(MusicRangeInformationRow row : allMriRows){ @@ -107,12 +114,22 @@ public class OwnershipAndCheckpoint{ return rows; } - private List<MusicRangeInformationRow> getRows(MusicInterface music, List<Range> ranges, boolean onlyIsLatest) + private List<MusicRangeInformationRow> extractRowsForRange(MusicInterface music, List<Range> ranges, boolean onlyIsLatest) throws MDBCServiceException { final List<MusicRangeInformationRow> allMriRows = music.getAllMriRows(); - return getRows(allMriRows,ranges,onlyIsLatest); + return extractRowsForRange(allMriRows,ranges,onlyIsLatest); } + /** + * make sure data is up to date for list of ranges + * @param mi + * @param di + * @param extendedDag + * @param ranges + * @param locks + * @param ownOpId + * @throws MDBCServiceException + */ public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, List<Range> ranges, Map<MusicRangeInformationRow, LockResult> locks, UUID ownOpId) throws MDBCServiceException { if(ranges.isEmpty()){ @@ -170,7 +187,7 @@ public class OwnershipAndCheckpoint{ while(!ready){ if(change.get()){ change.set(false); - final List<MusicRangeInformationRow> rows = getRows(mi, ranges,false); + final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, ranges,false); dag = Dag.getDag(rows,ranges); } else if(!dag.applied()){ diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriRowComparator.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriRowComparator.java index 281d763..1bdc2ac 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriRowComparator.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriRowComparator.java @@ -24,6 +24,9 @@ import java.util.Comparator; public class MriRowComparator implements Comparator<MusicRangeInformationRow> { + /** + * compare timestamps + */ @Override public int compare(MusicRangeInformationRow o1, MusicRangeInformationRow o2) { return Long.compare(o1.getTimestamp(),o2.getTimestamp()); |