diff options
Diffstat (limited to 'mdbc-server/src/main/java/org/onap')
10 files changed, 128 insertions, 58 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java index 7ce1d71..1707c03 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java @@ -62,6 +62,7 @@ import org.onap.music.mdbc.query.QueryProcessor; import org.onap.music.mdbc.query.SQLOperation; import org.onap.music.mdbc.query.SQLOperationType; import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.TxCommitProgress; @@ -203,7 +204,8 @@ public class MdbcConnection implements Connection { try { logger.debug(EELFLoggerDelegate.applicationLogger, " commit "); // transaction was committed -- add all the updates into the REDO-Log in MUSIC - mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper); + MusicTxDigestId digestCreated = mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper); + statemanager.getOwnAndCheck().updateAlreadyApplied(mi, dbi, partition.getSnapshot(), partition.getMRIIndex(), digestCreated); } catch (MDBCServiceException e) { //If the commit fail, then a new commitId should be used logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index 04ac789..fb39637 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -128,12 +128,13 @@ public class StateManager { MDBCUtils.writeLocksOnly = (writeLocksOnly==null) ? Configuration.WRITE_LOCK_ONLY_DEFAULT : Boolean.parseBoolean(writeLocksOnly); initMusic(); - initSqlDatabase(); - initTxDaemonThread(); + Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = initSqlDatabase(); + String t = info.getProperty(Configuration.KEY_OWNERSHIP_TIMEOUT); long timeout = (t == null) ? Configuration.DEFAULT_OWNERSHIP_TIMEOUT : Integer.parseInt(t); - alreadyApplied = new ConcurrentHashMap<>(); ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout); + + initTxDaemonThread(); } protected String cleanSqlUrl(String url){ @@ -165,7 +166,12 @@ public class StateManager { this.mdbcConnections = new HashMap<>(); } - protected void initSqlDatabase() throws MDBCServiceException { + /** + * Do everything necessary to initialize the sql database + * @return the current checkpoint location of this database, if restarting + * @throws MDBCServiceException + */ + protected Map<Range, Pair<MriReference, MusicTxDigestId>> initSqlDatabase() throws MDBCServiceException { if(!this.sqlDBUrl.toLowerCase().startsWith("jdbc:postgresql")) { try { Connection sqlConnection = DriverManager.getConnection(this.sqlDBUrl, this.info); @@ -183,16 +189,21 @@ public class StateManager { } } - // Verify the tables in MUSIC match the tables in the database - // and create triggers on any tables that need them + Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyAppliedToDb = null; try { MdbcConnection mdbcConn = (MdbcConnection) openConnection("init"); mdbcConn.initDatabase(); + alreadyAppliedToDb = mdbcConn.getDBInterface().getCheckpointLocations(); closeConnection("init"); } catch (QueryException e) { - logger.error("Error syncrhonizing tables"); + logger.error("Error initializing sql database tables"); logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL); } + + if (alreadyAppliedToDb==null) { + alreadyAppliedToDb = new ConcurrentHashMap<>(); + } + return alreadyAppliedToDb; } /** diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java index 15dd456..cba699f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java @@ -31,6 +31,8 @@ import org.apache.commons.lang3.tuple.Pair; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; +import org.onap.music.mdbc.tables.MriReference; +import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.StagingTable; /** @@ -151,7 +153,12 @@ public interface DBInterface { * @param r * @param playbackPointer */ - public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer); + public void updateCheckpointLocations(Range r, Pair<MriReference, MusicTxDigestId> playbackPointer); + /** + * Get current locations of this database's already applied locations + * @return + */ + public Map<Range, Pair<MriReference, MusicTxDigestId>> getCheckpointLocations(); /** * Initialize the SQL database by creating any tables necessary diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java index c1936f6..b8ac563 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java @@ -183,14 +183,19 @@ public interface MusicInterface { * Commits the corresponding REDO-log into MUSIC * Transaction is committed -- add all the updates into the REDO-Log in MUSIC * + * This officially commits the transaction globally + * + * + * * @param partition information related to ownership of partitions, used to verify ownership when commiting the Tx * @param eventualRanges * @param transactionDigest digest of the transaction that is being committed into the Redo log in music. * @param txId id associated with the log being send * @param progressKeeper data structure that is used to handle to detect failures, and know what to do + * @return digest that was created for this transaction commit * @throws MDBCServiceException */ - void commitLog(DatabasePartition partition, Set<Range> eventualRanges, StagingTable transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException; + public MusicTxDigestId commitLog(DatabasePartition partition, Set<Range> eventualRanges, StagingTable transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException; /** @@ -369,7 +374,7 @@ public interface MusicInterface { * @param playbackPointer * @throws MDBCServiceException */ - public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) throws MDBCServiceException; + public void updateCheckpointLocations(Range r, Pair<MriReference, MusicTxDigestId> playbackPointer); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index 5808a20..a24ada2 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -1274,12 +1274,8 @@ public class MusicMixin implements MusicInterface { addTxDigest(digestId, serializedTransactionDigest); } - /** - * Writes the transaction information to metric's txDigest and musicRangeInformation table - * This officially commits the transaction globally - */ @Override - public void commitLog(DatabasePartition partition,Set<Range> eventualRanges, StagingTable transactionDigest, + public MusicTxDigestId commitLog(DatabasePartition partition,Set<Range> eventualRanges, StagingTable transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException { // first deal with commit for eventually consistent tables @@ -1287,18 +1283,18 @@ public class MusicMixin implements MusicInterface { if(partition==null){ logger.warn("Trying tcommit log with null partition"); - return; + return null; } Set<Range> snapshot = partition.getSnapshot(); if(snapshot==null || snapshot.isEmpty()){ logger.warn("Trying to commit log with empty ranges"); - return; + return null; } //Add creation type of transaction digest if(transactionDigest == null || transactionDigest.isEmpty()) { - return; + return null; } UUID mriIndex = partition.getMRIIndex(); @@ -1347,12 +1343,7 @@ public class MusicMixin implements MusicInterface { progressKeeper.setRecordId(txId, digestId); } - Set<Range> ranges = partition.getSnapshot(); - - Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied(); - for(Range r : ranges) { - alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), digestId)); - } + return digestId; } private void filterAndAddEventualTxDigest(Set<Range> eventualRanges, @@ -1775,13 +1766,9 @@ public class MusicMixin implements MusicInterface { } public static void createMusicMdbcCheckpointTable(String namespace, String checkpointTable) throws MDBCServiceException { - String priKey = "txid"; - StringBuilder fields = new StringBuilder(); - fields.append("txid uuid, "); - fields.append("compressed boolean, "); - fields.append("transactiondigest blob ");//notice lack of ',' String cql = - String.format("CREATE TABLE IF NOT EXISTS %s.%s (mdbcnode UUID, mridigest UUID, digestindex int, PRIMARY KEY (mdbcnode));", + String.format("CREATE TABLE IF NOT EXISTS %s.%s (mdbcnode text, range text, mridigest UUID," + + "digestid UUID, PRIMARY KEY (mdbcnode, range));", namespace, checkpointTable); try { executeMusicWriteQuery(namespace,checkpointTable,cql); @@ -2566,10 +2553,10 @@ public class MusicMixin implements MusicInterface { } @Override - public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) throws MDBCServiceException { - String cql = String.format("INSERT INTO %s.%s (mdbcnode, mridigest, digestindex) VALUES (" - + this.myId + ", " + playbackPointer.getLeft() + ", " + playbackPointer.getRight() + ");", - music_ns, this.musicMdbcCheckpointsTableName); + public void updateCheckpointLocations(Range r, Pair<MriReference, MusicTxDigestId> playbackPointer) { + String cql = String.format("INSERT INTO %s.%s (mdbcnode, range, mridigest, digestid) VALUES ('%s', '%s', %s, %s);", + music_ns, this.musicMdbcCheckpointsTableName, this.stateManager.getMdbcServerName(), r.getTable(), + playbackPointer.getLeft().getIndex(), playbackPointer.getRight().transactionId); PreparedQueryObject pQueryObject = new PreparedQueryObject(); pQueryObject.appendQueryString(cql); try { @@ -2577,7 +2564,7 @@ public class MusicMixin implements MusicInterface { } catch (MusicServiceException e) { logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to update the checkpoint location", e); } catch (MusicQueryException e) { - throw new MDBCServiceException(e); + logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to update the checkpoint location with query", e); } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java index 2c501dc..b544b94 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java @@ -35,6 +35,7 @@ import java.util.Properties; import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang3.tuple.Pair; import org.json.JSONObject; import org.onap.music.exceptions.MDBCServiceException; @@ -44,6 +45,8 @@ import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; import org.onap.music.mdbc.query.SQLOperation; +import org.onap.music.mdbc.tables.MriReference; +import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.Operation; import org.onap.music.mdbc.tables.StagingTable; import net.sf.jsqlparser.JSQLParserException; @@ -85,7 +88,7 @@ public class MySQLMixin implements DBInterface { + "CONNECTION_ID INT, PRIMARY KEY (IX));"; private static final String CKPT_TBL = "MDBC_CHECKPOINT"; private static final String CREATE_CKPT_SQL = - "CREATE TABLE IF NOT EXISTS " + CKPT_TBL + " (RANGENAME VARCHAR(64) PRIMARY KEY, MRIROW VARCHAR(36), DIGESTINDEX INT);"; + "CREATE TABLE IF NOT EXISTS " + CKPT_TBL + " (RANGENAME VARCHAR(64) PRIMARY KEY, MRIROW VARCHAR(36), DIGESTID VARCHAR(36));"; private final MusicInterface mi; private final int connId; @@ -247,7 +250,7 @@ public class MySQLMixin implements DBInterface { logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set); Set<Range> rangeSet = new HashSet<>(); for (String table : set) { - if (getReservedTblNames().contains(table)) { + if (!getReservedTblNames().contains(table)) { // Don't create triggers for the table the triggers write into!!! rangeSet.add(new Range(table)); } @@ -1145,12 +1148,12 @@ public class MySQLMixin implements DBInterface { } @Override - public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) { - String query = "UPDATE " + CKPT_TBL + " SET MRIROW=?, DIGESTINDEX=? where RANGENAME=?;"; + public void updateCheckpointLocations(Range r, Pair<MriReference, MusicTxDigestId> playbackPointer) { + String query = "UPDATE " + CKPT_TBL + " SET MRIROW=?, DIGESTID=? where RANGENAME=?;"; try { PreparedStatement stmt = jdbcConn.prepareStatement(query); - stmt.setString(1, playbackPointer.getLeft().toString()); - stmt.setInt(2, playbackPointer.getRight()); + stmt.setString(1, playbackPointer.getLeft().getIndex().toString()); + stmt.setString(2, playbackPointer.getRight().transactionId.toString()); stmt.setString(3, r.getTable()); stmt.execute(); stmt.close(); @@ -1160,6 +1163,30 @@ public class MySQLMixin implements DBInterface { } @Override + public Map<Range, Pair<MriReference, MusicTxDigestId>> getCheckpointLocations() { + Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = new ConcurrentHashMap<>(); + try { + Statement stmt = jdbcConn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT * FROM " + CKPT_TBL + ";"); + while (rs.next()) { + Range r = new Range(rs.getString("RANGENAME")); + String mrirow = rs.getString("MRIROW"); + String txId = rs.getString("DIGESTID"); + if (mrirow!=null) { + logger.info(EELFLoggerDelegate.applicationLogger, + "Previously checkpointed: " + r.getTable() + " at (" + mrirow + ", " + txId + ")"); + alreadyApplied.put(r, Pair.of(new MriReference(mrirow), new MusicTxDigestId(mrirow, txId, -1))); + } + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Unable to get replay checkpoint location", e); + } + + return alreadyApplied; + } + + @Override public void initTables() { try { Statement stmt = jdbcConn.createStatement(); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java index a4706fd..15c7620 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java @@ -42,6 +42,8 @@ import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; import org.onap.music.mdbc.mixins.MySQLMixin.StagingTableUpdateRunnable; +import org.onap.music.mdbc.tables.MriReference; +import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.Operation; import org.onap.music.mdbc.query.SQLOperation; import org.onap.music.mdbc.tables.StagingTable; @@ -1065,7 +1067,12 @@ public class PostgresMixin implements DBInterface { } @Override - public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) { + public void updateCheckpointLocations(Range r, Pair<MriReference, MusicTxDigestId> playbackPointer) { + throw new org.apache.commons.lang.NotImplementedException(); + } + + @Override + public Map<Range, Pair<MriReference, MusicTxDigestId>> getCheckpointLocations() { throw new org.apache.commons.lang.NotImplementedException(); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java index 8da2817..3ea1497 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java @@ -237,12 +237,8 @@ public class OwnershipAndCheckpoint{ return; } applyTxDigest(dbi, txDigest); - for (Range r : ranges) { - MusicRangeInformationRow row = node.getRow(); - alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), digestId)); - - updateCheckpointLocations(mi, dbi, r, row.getPartitionIndex(), digestId); - } + MusicRangeInformationRow row = node.getRow(); + updateAlreadyApplied(mi, dbi, ranges, row.getPartitionIndex(), digestId); } /** @@ -260,10 +256,10 @@ public class OwnershipAndCheckpoint{ } MriReference appliedMriRef = applied.getLeft(); MusicTxDigestId appliedDigest = applied.getRight(); - int appliedIndex = node.getRow().getRedoLog().indexOf(appliedDigest); + appliedDigest.index = node.getRow().getRedoLog().indexOf(appliedDigest); if (appliedMriRef==null || appliedMriRef.getTimestamp() < node.getTimestamp() || (appliedMriRef.getTimestamp() == node.getTimestamp() - && appliedIndex < index)) { + && appliedDigest.index < index)) { return false; } } @@ -275,13 +271,13 @@ public class OwnershipAndCheckpoint{ * @param mi * @param di * @param r - * @param partitionIndex + * @param mriRef * @param index * @throws MDBCServiceException */ - private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, MusicTxDigestId txdigest) throws MDBCServiceException { - dbi.updateCheckpointLocations(r, Pair.of(partitionIndex, txdigest.index)); - mi.updateCheckpointLocations(r, Pair.of(partitionIndex, txdigest.index)); + private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, MriReference mriRef, MusicTxDigestId txdigest) { + dbi.updateCheckpointLocations(r, Pair.of(mriRef, txdigest)); + mi.updateCheckpointLocations(r, Pair.of(mriRef, txdigest)); } /** @@ -519,6 +515,18 @@ public class OwnershipAndCheckpoint{ public Map<Range, Pair<MriReference, MusicTxDigestId>> getAlreadyApplied() { return this.alreadyApplied; - } + } + public void updateAlreadyApplied(MusicInterface mi, DBInterface dbi, Set<Range> ranges, UUID mriIndex, MusicTxDigestId digestId) { + for (Range r: ranges) { + updateAlreadyApplied(mi, dbi, r, mriIndex, digestId); + } + } + + public void updateAlreadyApplied(MusicInterface mi, DBInterface dbi, Range r, UUID mriIndex, MusicTxDigestId digestId) { + MriReference mriRef = new MriReference(mriIndex); + alreadyApplied.put(r, Pair.of(mriRef, digestId)); + updateCheckpointLocations(mi, dbi, r, mriRef, digestId); + } + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java index 9383ac5..3c15487 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java @@ -28,8 +28,18 @@ public final class MriReference { this.index= index; } - public long getTimestamp() { return index.timestamp();} + public MriReference(String mrirow) { + index = UUID.fromString(mrirow); + } + public long getTimestamp() { + return index.timestamp(); + } + + public UUID getIndex() { + return this.index; + } + public String toString() { return index.toString(); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java index 8544b47..59eb97e 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java @@ -24,13 +24,19 @@ import java.util.UUID; public final class MusicTxDigestId { public final UUID mriId; public final UUID transactionId; - public final int index; + public int index; public MusicTxDigestId(UUID mriRowId, UUID digestId, int index) { this.mriId=mriRowId; this.transactionId= digestId; this.index=index; } + + public MusicTxDigestId(String mriRowId, String digestId, int index) { + this.mriId = UUID.fromString(mriRowId); + this.transactionId = UUID.fromString(digestId); + this.index = index; + } public MusicTxDigestId(UUID digestId, int index) { this.mriId = null; |