diff options
Diffstat (limited to 'mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java')
-rw-r--r-- | mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java | 37 |
1 files changed, 12 insertions, 25 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index 5808a20..a24ada2 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -1274,12 +1274,8 @@ public class MusicMixin implements MusicInterface { addTxDigest(digestId, serializedTransactionDigest); } - /** - * Writes the transaction information to metric's txDigest and musicRangeInformation table - * This officially commits the transaction globally - */ @Override - public void commitLog(DatabasePartition partition,Set<Range> eventualRanges, StagingTable transactionDigest, + public MusicTxDigestId commitLog(DatabasePartition partition,Set<Range> eventualRanges, StagingTable transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException { // first deal with commit for eventually consistent tables @@ -1287,18 +1283,18 @@ public class MusicMixin implements MusicInterface { if(partition==null){ logger.warn("Trying tcommit log with null partition"); - return; + return null; } Set<Range> snapshot = partition.getSnapshot(); if(snapshot==null || snapshot.isEmpty()){ logger.warn("Trying to commit log with empty ranges"); - return; + return null; } //Add creation type of transaction digest if(transactionDigest == null || transactionDigest.isEmpty()) { - return; + return null; } UUID mriIndex = partition.getMRIIndex(); @@ -1347,12 +1343,7 @@ public class MusicMixin implements MusicInterface { progressKeeper.setRecordId(txId, digestId); } - Set<Range> ranges = partition.getSnapshot(); - - Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied(); - for(Range r : ranges) { - alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), digestId)); - } + return digestId; } private void filterAndAddEventualTxDigest(Set<Range> eventualRanges, @@ -1775,13 +1766,9 @@ public class MusicMixin implements MusicInterface { } public static void createMusicMdbcCheckpointTable(String namespace, String checkpointTable) throws MDBCServiceException { - String priKey = "txid"; - StringBuilder fields = new StringBuilder(); - fields.append("txid uuid, "); - fields.append("compressed boolean, "); - fields.append("transactiondigest blob ");//notice lack of ',' String cql = - String.format("CREATE TABLE IF NOT EXISTS %s.%s (mdbcnode UUID, mridigest UUID, digestindex int, PRIMARY KEY (mdbcnode));", + String.format("CREATE TABLE IF NOT EXISTS %s.%s (mdbcnode text, range text, mridigest UUID," + + "digestid UUID, PRIMARY KEY (mdbcnode, range));", namespace, checkpointTable); try { executeMusicWriteQuery(namespace,checkpointTable,cql); @@ -2566,10 +2553,10 @@ public class MusicMixin implements MusicInterface { } @Override - public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) throws MDBCServiceException { - String cql = String.format("INSERT INTO %s.%s (mdbcnode, mridigest, digestindex) VALUES (" - + this.myId + ", " + playbackPointer.getLeft() + ", " + playbackPointer.getRight() + ");", - music_ns, this.musicMdbcCheckpointsTableName); + public void updateCheckpointLocations(Range r, Pair<MriReference, MusicTxDigestId> playbackPointer) { + String cql = String.format("INSERT INTO %s.%s (mdbcnode, range, mridigest, digestid) VALUES ('%s', '%s', %s, %s);", + music_ns, this.musicMdbcCheckpointsTableName, this.stateManager.getMdbcServerName(), r.getTable(), + playbackPointer.getLeft().getIndex(), playbackPointer.getRight().transactionId); PreparedQueryObject pQueryObject = new PreparedQueryObject(); pQueryObject.appendQueryString(cql); try { @@ -2577,7 +2564,7 @@ public class MusicMixin implements MusicInterface { } catch (MusicServiceException e) { logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to update the checkpoint location", e); } catch (MusicQueryException e) { - throw new MDBCServiceException(e); + logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to update the checkpoint location with query", e); } } |