From 77212c23b8ec060c3b15ab21c33502b1bd24e858 Mon Sep 17 00:00:00 2001 From: "Tschaen, Brendan" Date: Mon, 4 Mar 2019 15:49:00 -0500 Subject: Refactorization of musicmixin's own code Issue-ID: MUSIC-326 Change-Id: Ia07ff8e6e5c6bd9444f8967eae392a67bf08b7f2 Signed-off-by: Tschaen, Brendan --- .../java/org/onap/music/mdbc/MdbcConnection.java | 6 ++ .../org/onap/music/mdbc/mixins/MusicMixin.java | 120 +++++++++++++-------- .../java/org/onap/music/mdbc/ownership/Dag.java | 1 + .../mdbc/ownership/OwnershipAndCheckpoint.java | 25 ++++- .../onap/music/mdbc/tables/MriRowComparator.java | 3 + 5 files changed, 104 insertions(+), 51 deletions(-) diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java index 12c7c29..0793a67 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java @@ -564,6 +564,12 @@ public class MdbcConnection implements Connection { return this.dbi; } + /** + * Take ownership of ranges given, and replay the transactions + * @param ranges + * @return + * @throws MDBCServiceException + */ private DatabasePartition own(List ranges) throws MDBCServiceException { if(ranges==null||ranges.isEmpty()){ return null; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index cdf0140..8a2ef6f 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -2120,7 +2120,13 @@ public class MusicMixin implements MusicInterface { } } - private List getExtendedRanges(List range) throws MDBCServiceException{ + /** + * Get a list of ranges and their range dependencies + * @param range + * @return + * @throws MDBCServiceException + */ + private List getRangeDependencies(List range) throws MDBCServiceException{ Set extendedRange = new HashSet<>(); for(Range r: range){ extendedRange.add(r); @@ -2238,11 +2244,18 @@ public class MusicMixin implements MusicInterface { return rowsPerLatestRange; } + /** + * Take locking ownership of each range + * @param ranges - ranges that need to be owned + * @param partition - current partition owned + * @param opId + */ @Override public OwnershipReturn own(List ranges, DatabasePartition partition, UUID opId) throws MDBCServiceException { - if(ranges == null || ranges.isEmpty()) + if(ranges == null || ranges.isEmpty()) { return null; + } Map newLocks = new HashMap<>(); //Init timeout clock @@ -2251,62 +2264,75 @@ public class MusicMixin implements MusicInterface { return new OwnershipReturn(opId,partition.getLockId(),partition.getMRIIndex(),partition.getSnapshot(),null); } //Find - List extendedRanges = getExtendedRanges(ranges); + List rangesToOwn = getRangeDependencies(ranges); List allMriRows = getAllMriRows(); - List rows = ownAndCheck.getRows(allMriRows,extendedRanges, false); - Dag dag = Dag.getDag(rows,extendedRanges); - Dag prev = new Dag(); - while( (dag.isDifferent(prev) || !prev.isOwned() ) && + List rows = ownAndCheck.extractRowsForRange(allMriRows,rangesToOwn, false); + Dag toOwn = Dag.getDag(rows,rangesToOwn); + Dag currentlyOwn = new Dag(); + while( (toOwn.isDifferent(currentlyOwn) || !currentlyOwn.isOwned() ) && !ownAndCheck.timeout(opId) ){ - while(dag.hasNextToOwn()){ - DagNode node = dag.nextToOwn(); - MusicRangeInformationRow row = node.getRow(); - UUID uuid = row.getPartitionIndex(); - if(partition.isLocked()&&partition.getMRIIndex().equals(uuid)|| - newLocks.containsKey(uuid) || - !row.getIsLatest()){ - dag.setOwn(node); - } - else{ - LockResult lockResult = null; - boolean owned = false; - while(!owned && !ownAndCheck.timeout(opId)){ - try { - LockRequest request = new LockRequest(musicRangeInformationTableName,uuid, - new ArrayList(node.getRangeSet())); - lockResult = waitForLock(request); - owned = true; - } - catch (MDBCServiceException e){ - logger.warn("Locking failed, retrying",e); - } - } - if(owned){ - dag.setOwn(node); - newLocks.put(uuid,lockResult); - } - else{ - break; - } - } - } - prev=dag; + takeOwnershipOfDag(partition, opId, newLocks, toOwn); + currentlyOwn=toOwn; //TODO instead of comparing dags, compare rows allMriRows = getAllMriRows(); - rows = ownAndCheck.getRows(allMriRows,extendedRanges,false); - dag = Dag.getDag(rows,extendedRanges); + rows = ownAndCheck.extractRowsForRange(allMriRows,rangesToOwn,false); + toOwn = Dag.getDag(rows,rangesToOwn); } - if(!prev.isOwned() || dag.isDifferent(prev)){ + if(!currentlyOwn.isOwned() || toOwn.isDifferent(currentlyOwn)){ releaseLocks(newLocks); ownAndCheck.stopOwnershipTimeoutClock(opId); logger.error("Error when owning a range: Timeout"); throw new MDBCServiceException("Ownership timeout"); } - Set allRanges = prev.getAllRanges(); - List isLatestRows = ownAndCheck.getRows(allMriRows, new ArrayList<>(allRanges), true); - prev.setRowsPerLatestRange(getIsLatestPerRange(dag,isLatestRows)); - return mergeLatestRows(prev,rows,ranges,newLocks,opId); + Set allRanges = currentlyOwn.getAllRanges(); + List isLatestRows = ownAndCheck.extractRowsForRange(allMriRows, new ArrayList<>(allRanges), true); + currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,isLatestRows)); + return mergeLatestRows(currentlyOwn,rows,ranges,newLocks,opId); + } + + /** + * Step through dag and take lock ownership of each range + * @param partition + * @param opId + * @param newLocks + * @param toOwn + * @throws MDBCServiceException + */ + private void takeOwnershipOfDag(DatabasePartition partition, UUID opId, Map newLocks, Dag toOwn) + throws MDBCServiceException { + while(toOwn.hasNextToOwn()){ + DagNode node = toOwn.nextToOwn(); + MusicRangeInformationRow row = node.getRow(); + UUID uuid = row.getPartitionIndex(); + if(partition.isLocked()&&partition.getMRIIndex().equals(uuid)|| + newLocks.containsKey(uuid) || + !row.getIsLatest()){ + toOwn.setOwn(node); + } + else{ + LockResult lockResult = null; + boolean owned = false; + while(!owned && !ownAndCheck.timeout(opId)){ + try { + LockRequest request = new LockRequest(musicRangeInformationTableName,uuid, + new ArrayList(node.getRangeSet())); + lockResult = waitForLock(request); + owned = true; + } + catch (MDBCServiceException e){ + logger.warn("Locking failed, retrying",e); + } + } + if(owned){ + toOwn.setOwn(node); + newLocks.put(uuid,lockResult); + } + else{ + break; + } + } + } } /** diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java index 02c5d7b..07a5fe6 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java @@ -65,6 +65,7 @@ public class Dag { private void createDag(List rows, List ranges){ this.ranges = new ArrayList<>(ranges); Map latestRow = new HashMap<>(); + //sort to make sure rows are in chronological order Collections.sort(rows, new MriRowComparator()); for(MusicRangeInformationRow row : rows){ if(!nodes.containsKey(row.getPartitionIndex())){ diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java index 6b1e566..f72b0ec 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java @@ -84,7 +84,14 @@ public class OwnershipAndCheckpoint{ return false; } - public List getRows(List allMriRows, List ranges, + /** + * Extracts all the rows that match any of the ranges. + * @param allMriRows + * @param ranges - ranges interested in + * @param onlyIsLatest - only return the "latest" rows + * @return + */ + public List extractRowsForRange(List allMriRows, List ranges, boolean onlyIsLatest){ List rows = new ArrayList<>(); for(MusicRangeInformationRow row : allMriRows){ @@ -107,12 +114,22 @@ public class OwnershipAndCheckpoint{ return rows; } - private List getRows(MusicInterface music, List ranges, boolean onlyIsLatest) + private List extractRowsForRange(MusicInterface music, List ranges, boolean onlyIsLatest) throws MDBCServiceException { final List allMriRows = music.getAllMriRows(); - return getRows(allMriRows,ranges,onlyIsLatest); + return extractRowsForRange(allMriRows,ranges,onlyIsLatest); } + /** + * make sure data is up to date for list of ranges + * @param mi + * @param di + * @param extendedDag + * @param ranges + * @param locks + * @param ownOpId + * @throws MDBCServiceException + */ public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, List ranges, Map locks, UUID ownOpId) throws MDBCServiceException { if(ranges.isEmpty()){ @@ -170,7 +187,7 @@ public class OwnershipAndCheckpoint{ while(!ready){ if(change.get()){ change.set(false); - final List rows = getRows(mi, ranges,false); + final List rows = extractRowsForRange(mi, ranges,false); dag = Dag.getDag(rows,ranges); } else if(!dag.applied()){ diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriRowComparator.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriRowComparator.java index 281d763..1bdc2ac 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriRowComparator.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriRowComparator.java @@ -24,6 +24,9 @@ import java.util.Comparator; public class MriRowComparator implements Comparator { + /** + * compare timestamps + */ @Override public int compare(MusicRangeInformationRow o1, MusicRangeInformationRow o2) { return Long.compare(o1.getTimestamp(),o2.getTimestamp()); -- cgit 1.2.3-korg