diff options
28 files changed, 1479 insertions, 1195 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java index c8cad47..314248f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java @@ -39,7 +39,7 @@ public class DatabasePartition { private UUID musicRangeInformationIndex;//Index that can be obtained either from private String lockId; - protected List<Range> ranges; + protected Set<Range> ranges; private boolean ready; @@ -49,14 +49,14 @@ public class DatabasePartition { */ public DatabasePartition() { - this(new ArrayList<Range>(),null,""); + this(new HashSet<Range>(),null,""); } public DatabasePartition(UUID mriIndex) { - this(new ArrayList<Range>(), mriIndex,""); + this(new HashSet<Range>(), mriIndex,""); } - - public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String lockId) { + + public DatabasePartition(Set<Range> knownRanges, UUID mriIndex, String lockId) { if(mriIndex==null){ ready = false; } @@ -139,9 +139,9 @@ public class DatabasePartition { * Get all the ranges that are currently owned * @return ranges */ - public synchronized List<Range> getSnapshot() { - List<Range> newRange = new ArrayList<>(); - for(Range r : ranges){ + public synchronized Set<Range> getSnapshot() { + Set<Range> newRange = new HashSet<>(); + for (Range r: ranges){ newRange.add(r.clone()); } return newRange; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java index a02e6d0..b60062e 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Base64; import java.util.Deque; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -98,12 +99,12 @@ public class MDBCUtils { return prop; } - public static List<Range> getTables(Map<String,List<SQLOperation>> queryParsed){ + public static Set<Range> getTables(Map<String,List<SQLOperation>> queryParsed){ return getTables(null, queryParsed); } - public static List<Range> getTables(String defaultDatabaseName, Map<String,List<SQLOperation>> queryParsed){ - List<Range> ranges = new ArrayList<>(); + public static Set<Range> getTables(String defaultDatabaseName, Map<String,List<SQLOperation>> queryParsed){ + Set<Range> ranges = new HashSet<>(); for(String table: queryParsed.keySet()){ String[] parts = table.split("\\."); if(parts.length==2){ diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java index 61ce6bd..2294673 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java @@ -34,10 +34,15 @@ import java.sql.SQLXML; import java.sql.Savepoint; import java.sql.Statement; import java.sql.Struct; -import java.util.*; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.Executor; - -import org.apache.commons.lang3.NotImplementedException; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.exceptions.QueryException; import org.onap.music.logging.EELFLoggerDelegate; @@ -55,7 +60,6 @@ import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint; import org.onap.music.mdbc.query.QueryProcessor; import org.onap.music.mdbc.query.SQLOperation; import org.onap.music.mdbc.query.SQLOperationType; -import org.onap.music.mdbc.tables.MusicTxDigestDaemon; import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.TxCommitProgress; @@ -501,21 +505,29 @@ public class MdbcConnection implements Connection { * @param sql the SQL statement that is about to be executed */ public void preStatementHook(final String sql) throws MDBCServiceException, SQLException { + + // some debug specific logic + if(sql.startsWith("DEBUG")) { + // if the SQL follows this convention: "DEBUG:TABLE_A,TABLE_B", + // DAG information pertaining to the tables will get printed + throw new SQLException("\nThis call was made for debugging purposes only\n" + statemanager.getOwnAndCheck().getDebugInfo(mi,sql.split(":")[1])); + } + //TODO: verify ownership of keys here //Parse tables from the sql query Map<String, List<SQLOperation>> tableToQueryType = QueryProcessor.parseSqlQuery(sql, table_set); //Check ownership of keys String defaultSchema = dbi.getSchema(); - List<Range> queryTables = MDBCUtils.getTables(defaultSchema, tableToQueryType); + Set<Range> queryTables = MDBCUtils.getTables(defaultSchema, tableToQueryType); if (this.partition!=null) { - List<Range> snapshot = this.partition.getSnapshot(); + Set<Range> snapshot = this.partition.getSnapshot(); if(snapshot!=null){ queryTables.addAll(snapshot); } } // filter out ranges that fall under Eventually consistent // category as these tables do not need ownership - List<Range> scQueryTables = filterEveTables(queryTables); + Set<Range> scQueryTables = filterEveTables(queryTables); DatabasePartition tempPartition = own(scQueryTables, MDBCUtils.getOperationType(tableToQueryType)); if(tempPartition!=null && tempPartition != partition) { this.partition.updateDatabasePartition(tempPartition); @@ -525,7 +537,7 @@ public class MdbcConnection implements Connection { } - private List<Range> filterEveTables(List<Range> queryTables) { + private Set<Range> filterEveTables(Set<Range> queryTables) { queryTables.removeAll(statemanager.getEventualRanges()); return queryTables; } @@ -539,6 +551,11 @@ public class MdbcConnection implements Connection { dbi.postStatementHook(sql, transactionDigest); } + public void initDatabase() throws QueryException { + dbi.initTables(); + createTriggers(); + } + /** * Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the * proxy first starts, and whenever there is the possibility that tables were created or dropped. It is synchronized @@ -553,6 +570,7 @@ public class MdbcConnection implements Connection { logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName); try { dbi.createSQLTriggers(tableName); + mi.createPartitionIfNeeded(new Range(tableName)); table_set.add(tableName.toUpperCase()); } catch (Exception e) { logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); @@ -573,7 +591,7 @@ public class MdbcConnection implements Connection { * @return * @throws MDBCServiceException */ - private DatabasePartition own(List<Range> ranges, SQLOperationType lockType) throws MDBCServiceException { + private DatabasePartition own(Set<Range> ranges, SQLOperationType lockType) throws MDBCServiceException { if(ranges==null||ranges.isEmpty()){ return null; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java index 41aed26..82a5d16 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java @@ -22,7 +22,7 @@ package org.onap.music.mdbc; import java.io.Serializable; import java.util.List; import java.util.Objects; - +import java.util.Set; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.mdbc.mixins.MusicMixin; @@ -83,7 +83,7 @@ public class Range implements Cloneable{ } - public static boolean overlaps(List<Range> ranges, String table){ + public static boolean overlaps(Set<Range> ranges, String table) { //\TODO check if parallel stream makes sense here return ranges.stream().map((Range r) -> r.table.toUpperCase().equals(table.toUpperCase())).anyMatch((Boolean b) -> b); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index e284103..66c8fa9 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -86,7 +86,7 @@ public class StateManager { private String mdbcServerName; private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition private final Lock eventualLock = new ReentrantLock(); - private List<Range> eventualRanges; + private Set<Range> eventualRanges; /** lock for warmupRanges */ private final Lock warmupLock = new ReentrantLock(); /** a set of ranges that should be periodically updated with latest information, if null all tables should be warmed up */ @@ -182,7 +182,7 @@ public class StateManager { // and create triggers on any tables that need them try { MdbcConnection mdbcConn = (MdbcConnection) openConnection("init"); - mdbcConn.createTriggers(); + mdbcConn.initDatabase(); closeConnection("init"); } catch (QueryException e) { logger.error("Error syncrhonizing tables"); @@ -255,24 +255,24 @@ public class StateManager { * Get a list of ranges that are eventually consistent * @return */ - public List<Range> getEventualRanges() { + public Set<Range> getEventualRanges() { eventualLock.lock(); - List<Range> returnArray; + Set<Range> returnSet; try { if(eventualRanges!=null){ - returnArray = new ArrayList<>(eventualRanges); + returnSet = new HashSet<>(eventualRanges); } else{ - returnArray= new ArrayList<>(); + returnSet= new HashSet<>(); } } finally{ eventualLock.unlock(); } - return returnArray; + return returnSet; } - public void setEventualRanges(List<Range> eventualRanges) { + public void setEventualRanges(Set<Range> eventualRanges) { eventualLock.lock(); try { this.eventualRanges = eventualRanges; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java index 0693a97..496f48d 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java @@ -42,7 +42,7 @@ public class TestUtils { public static DatabasePartition createBasicRow(Range range, MusicInterface mixin, String mdbcServerName) throws MDBCServiceException { final UUID uuid = MDBCUtils.generateTimebasedUniqueKey(); - List<Range> ranges = new ArrayList<>(); + Set<Range> ranges = new HashSet<>(); ranges.add(range); DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null); new MusicRangeInformationRow(dbPartition, new ArrayList<>(), true); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java index 0021bcc..bbd3f35 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java @@ -20,6 +20,7 @@ package org.onap.music.mdbc.configurations; import java.util.List; +import java.util.Set; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.mdbc.Range; @@ -30,18 +31,18 @@ public class Eventual { private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Eventual.class); - protected List<Range> ranges; + protected Set<Range> ranges; - public Eventual(List<Range> ranges) { + public Eventual(Set<Range> ranges) { super(); this.ranges = ranges; } - public List<Range> getRanges() { + public Set<Range> getRanges() { return ranges; } - public void setRanges(List<Range> ranges) { + public void setRanges(Set<Range> ranges) { this.ranges = ranges; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java index 391ee1a..be8217b 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java @@ -49,17 +49,17 @@ public class NodeConfiguration { this.sqlDatabaseName = sqlDatabaseName; } - protected List<Range> toRanges(List<String> tables){ - List<Range> newRange = new ArrayList<>(); + protected Set<Range> toRanges(List<String> tables){ + Set<Range> newRange = new HashSet<>(); for(String table: tables) { newRange.add(new Range(table)); } return newRange; } - protected List<Range> toRanges(String tables){ + protected Set<Range> toRanges(String tables){ if(tables.isEmpty()){ - return new ArrayList<>(); + return new HashSet<>(); } String[] tablesArray=tables.split(","); return toRanges(new ArrayList<>(Arrays.asList(tablesArray))); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java index 0598271..1c9d07c 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java @@ -38,6 +38,7 @@ import java.io.FileNotFoundException; import java.io.FileReader; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.UUID; public class TablesConfiguration { @@ -113,7 +114,7 @@ public class TablesConfiguration { final ResultSet resultSet = MusicCore.quorumGet(checkRowsInTable); while(resultSet!=null && !resultSet.isExhausted()){ final MusicRangeInformationRow mriRowFromCassandraRow = MusicMixin.getMRIRowFromCassandraRow(resultSet.one()); - List<Range> ranges = mriRowFromCassandraRow.getDBPartition().getSnapshot(); + Set<Range> ranges = mriRowFromCassandraRow.getDBPartition().getSnapshot(); for(Range range: partition.getTables()) { if (Range.overlaps(ranges,range.getTable())){ throw new MDBCServiceException("MRI row already exists"); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java index fea329d..fac9f36 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java @@ -106,11 +106,13 @@ public class MdbcTestClient { e.printStackTrace();
}
- final String insertSQL = "INSERT INTO Persons VALUES (1, 'Martinez', 'Juan', 'KACB', 'ATLANTA');";
- final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=2;";
- final String insertSQL2 = "INSERT INTO Persons VALUES (2, 'Smith', 'JOHN', 'GNOC', 'BEDMINSTER');";
+ final String insertSQL = "INSERT INTO Persons VALUES (1, 'Smith', 'Juan', 'KACB', 'ATLANTA');";
+ final String insertSQL1 = "INSERT INTO Persons2 VALUES (1, 'Smith', 'Juan', 'KACB', 'ATLANTA');";
+ final String insertSQL2 = "INSERT INTO Persons3 VALUES (2, 'Smith', 'JOHN', 'GNOC', 'BEDMINSTER');";
final String insertSQL3 = "UPDATE Persons SET FirstName='JOSH' WHERE LastName='Smith';";
- final String insertSQL4 = "UPDATE Persons SET FirstName='JOHN' WHERE LastName='Smith';";
+ final String insertSQL4 = "UPDATE Persons2 SET FirstName='JOHN' WHERE LastName='Smith';";
+ final String insertSQL5 = "UPDATE Persons SET FirstName='JOHN' WHERE LastName='Smith';";
+ final String insertSQL6 = "UPDATE Persons3 SET FirstName='JOHN' WHERE LastName='Smith';";
final String selectSQL1 = "SELECT * FROM Persons;";
@@ -123,13 +125,55 @@ public class MdbcTestClient { }
try {
- //execute = insertStmt.execute(insertSQL);
- //execute = insertStmt.execute(insertSQL1);
- //execute = insertStmt.execute(insertSQL2);
- //execute = insertStmt.execute(insertSQL3);
- //execute = insertStmt.execute(insertSQL4);
+ /*
+ * insert into 1
+ * insert into 2
+ * insert into 3
+ * insert into 1,2
+ * insert into 1,3
+ */
+ execute = insertStmt.execute(insertSQL);
+ connection.commit();
+
+ connection.close();
+ connection = DriverManager.getConnection("jdbc:avatica:remote:url=" + "http://localhost:30000/test"+ ";serialization=protobuf");
+ connection.setAutoCommit(false);
+ insertStmt = connection.createStatement();
+
+ execute = insertStmt.execute(insertSQL1);
+ connection.commit();
+
+ connection.close();
+ connection = DriverManager.getConnection("jdbc:avatica:remote:url=" + "http://localhost:30000/test"+ ";serialization=protobuf");
+ connection.setAutoCommit(false);
+ insertStmt = connection.createStatement();
+
+ execute = insertStmt.execute(insertSQL2);
+ connection.commit();
+
+ connection.close();
+ connection = DriverManager.getConnection("jdbc:avatica:remote:url=" + "http://localhost:30000/test"+ ";serialization=protobuf");
+ connection.setAutoCommit(false);
+ insertStmt = connection.createStatement();
+
+ System.out.println("1,2");
+ execute = insertStmt.execute(insertSQL3);
+ execute = insertStmt.execute(insertSQL4);
+ connection.commit();
+
+ connection.close();
+ connection = DriverManager.getConnection("jdbc:avatica:remote:url=" + "http://localhost:30000/test"+ ";serialization=protobuf");
+ connection.setAutoCommit(false);
+ insertStmt = connection.createStatement();
+
+ System.out.println("1,3,2");
+
+ execute = insertStmt.execute(insertSQL5);
+ execute = insertStmt.execute(insertSQL6);
+ execute = insertStmt.execute(insertSQL4);
+ connection.commit();
- ///*
+ /*
ResultSet rs = insertStmt.executeQuery(selectSQL1);
while (rs.next()) {
System.out.printf("%d, %s, %s\n", rs.getInt("PersonID"), rs.getString("FirstName"), rs.getString("LastName"));
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java index 063ea3f..745307c 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java @@ -26,7 +26,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; - +import java.util.UUID; +import org.apache.commons.lang3.tuple.Pair; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; @@ -133,15 +134,27 @@ public interface DBInterface { * @throws SQLException if replay cannot occur correctly * @throws MDBCServiceException */ - void replayTransaction(StagingTable digest, List<Range> ranges) throws SQLException, MDBCServiceException; + void replayTransaction(StagingTable digest, Set<Range> ranges) throws SQLException, MDBCServiceException; void disableForeignKeyChecks() throws SQLException; void enableForeignKeyChecks() throws SQLException; - void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException; + void applyTxDigest(StagingTable txDigest, Set<Range> ranges) throws SQLException, MDBCServiceException; Connection getSQLConnection(); String getSchema(); + + /** + * Update pointer to where this server has successfully replayed transactions + * @param r + * @param playbackPointer + */ + public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer); + + /** + * Initialize the SQL database by creating any tables necessary + */ + public void initTables(); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java index 8e3f20c..5e8ba87 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java @@ -21,6 +21,7 @@ package org.onap.music.mdbc.mixins; import java.util.List; +import java.util.Set; import java.util.UUID; import org.onap.music.mdbc.Range; @@ -28,12 +29,12 @@ public class LockResult{ private boolean successful; private UUID musicRangeInformationIndex; private String lockId; - private List<Range> ranges; + private Set<Range> ranges; private boolean newLock; /** back off time in milliseconds */ private long backOffPeriodms; - public LockResult(boolean succesful, UUID rowId, String lockId, boolean newLock, List<Range> ranges){ + public LockResult(boolean succesful, UUID rowId, String lockId, boolean newLock, Set<Range> ranges){ this.successful = succesful; this.musicRangeInformationIndex = rowId; this.lockId=lockId; @@ -48,7 +49,7 @@ public class LockResult{ * @param ranges */ @Deprecated - public LockResult(UUID rowId, String lockId, boolean newLock, List<Range> ranges){ + public LockResult(UUID rowId, String lockId, boolean newLock, Set<Range> ranges){ this.successful = true; this.musicRangeInformationIndex = rowId; this.lockId=lockId; @@ -74,7 +75,7 @@ public class LockResult{ return musicRangeInformationIndex; } - public List<Range> getRanges() { + public Set<Range> getRanges() { return ranges; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java index 4ae4413..637cb15 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java @@ -22,7 +22,7 @@ package org.onap.music.mdbc.mixins; import com.datastax.driver.core.ResultSet; import java.nio.ByteBuffer; import java.util.*; - +import org.apache.commons.lang3.tuple.Pair; import org.json.JSONObject; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.exceptions.MusicLockingException; @@ -46,24 +46,24 @@ import org.onap.music.mdbc.tables.*; public interface MusicInterface { class OwnershipReturn{ private final UUID ownershipId; - private final String ownerId; + private final String lockId; private final UUID rangeId; - private final List<Range> ranges; + private final Set<Range> ranges; private final Dag dag; - public OwnershipReturn(UUID ownershipId, String ownerId, UUID rangeId, List<Range> ranges, Dag dag){ + public OwnershipReturn(UUID ownershipId, String ownerId, UUID rangeId, Set<Range> ranges, Dag dag){ this.ownershipId=ownershipId; - this.ownerId=ownerId; + this.lockId=ownerId; this.rangeId=rangeId; this.ranges=ranges; this.dag=dag; } public String getOwnerId(){ - return ownerId; + return lockId; } public UUID getRangeId(){ return rangeId; } - public List<Range> getRanges(){ return ranges; } + public Set<Range> getRanges(){ return ranges; } public Dag getDag(){return dag;} public UUID getOwnershipId() { return ownershipId; } } @@ -189,7 +189,7 @@ public interface MusicInterface { * @param progressKeeper data structure that is used to handle to detect failures, and know what to do * @throws MDBCServiceException */ - void commitLog(DatabasePartition partition, List<Range> eventualRanges, StagingTable transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException; + void commitLog(DatabasePartition partition, Set<Range> eventualRanges, StagingTable transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException; /** @@ -208,7 +208,7 @@ public interface MusicInterface { */ RangeDependency getMusicRangeDependency(Range baseRange) throws MDBCServiceException; - List<Range> getRangeDependencies(List<Range> range) throws MDBCServiceException; + public Set<Range> getRangeDependencies(Set<Range> range) throws MDBCServiceException; /** * This function is used to create a new locked row in the MRI table @@ -329,16 +329,28 @@ public interface MusicInterface { * * Does not merge rows if a single previous row is sufficient to match new partition needed * - * @param extendedDag - * @param latestRows - * @param ranges - * @param locks + * @param currentlyOwned + * @param locksForOwnership * @param ownershipId * @return * @throws MDBCServiceException */ - OwnershipReturn mergeLatestRowsIfNecessary(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges, - Map<UUID, LockResult> locks, UUID ownershipId) throws MDBCServiceException; + OwnershipReturn mergeLatestRowsIfNecessary(Dag currentlyOwned, Map<UUID, LockResult> locksForOwnership, UUID ownershipId) + throws MDBCServiceException; + + /** + * Create ranges in MRI table, if not already present + * @param range to add into mri table + */ + public void createPartitionIfNeeded(Range rangeToCreate) throws MDBCServiceException; + + /** + * Update pointer to where this server has successfully replayed transactions + * This is an eventual operation for minimal performance hits + * @param r + * @param playbackPointer + */ + public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index 1bdb022..5581573 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -24,6 +24,7 @@ import java.io.Reader; import java.nio.ByteBuffer; import java.sql.Types; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -124,7 +125,9 @@ public class MusicMixin implements MusicInterface { private String musicEventualTxDigestTableName = "musicevetxdigest"; public static final String musicRangeInformationTableName = "musicrangeinformation"; private String musicRangeDependencyTableName = "musicrangedependency"; - private String musicNodeInfoTableName = "nodeinfo"; + private String musicNodeInfoTableName = "musicnodeinfo"; + /** Table mapping mdbc nodes to their current checkpoint status */ + private String musicMdbcCheckpointsTableName = "musicmdbccheckpoints"; private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicMixin.class); @@ -339,9 +342,10 @@ public class MusicMixin implements MusicInterface { createMusicNodeInfoTable(); createMusicRangeInformationTable(this.music_ns,this.musicRangeInformationTableName); createMusicRangeDependencyTable(this.music_ns,this.musicRangeDependencyTableName); + createMusicMdbcCheckpointTable(); } catch(MDBCServiceException e){ - logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC"); + logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC: " + e.getErrorMessage()); } } @@ -1194,7 +1198,7 @@ public class MusicMixin implements MusicInterface { return pendingRows; } - private List<Range> lockRow(LockRequest request,Map.Entry<UUID, List<Range>> pending,Map<UUID, String> currentLockRef, + private List<Range> lockRow(LockRequest request,Map.Entry<UUID, Set<Range>> pending,Map<UUID, String> currentLockRef, String fullyQualifiedKey, String lockId, List<Range> pendingToLock, Map<UUID, LockResult> alreadyHeldLocks) throws MDBCServiceException{ @@ -1290,7 +1294,7 @@ public class MusicMixin implements MusicInterface { * This officially commits the transaction globally */ @Override - public void commitLog(DatabasePartition partition,List<Range> eventualRanges, StagingTable transactionDigest, + public void commitLog(DatabasePartition partition,Set<Range> eventualRanges, StagingTable transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException { // first deal with commit for eventually consistent tables @@ -1301,7 +1305,7 @@ public class MusicMixin implements MusicInterface { return; } - List<Range> snapshot = partition.getSnapshot(); + Set<Range> snapshot = partition.getSnapshot(); if(snapshot==null || snapshot.isEmpty()){ logger.warn("Trying to commit log with empty ranges"); return; @@ -1360,7 +1364,7 @@ public class MusicMixin implements MusicInterface { if (progressKeeper != null) { progressKeeper.setRecordId(txId, digestId); } - List<Range> ranges = partition.getSnapshot(); + Set<Range> ranges = partition.getSnapshot(); for(Range r : ranges) { Map<Range, Pair<MriReference, Integer>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied(); if(!alreadyApplied.containsKey(r)){ @@ -1377,7 +1381,7 @@ public class MusicMixin implements MusicInterface { } } - private void filterAndAddEventualTxDigest(List<Range> eventualRanges, + private void filterAndAddEventualTxDigest(Set<Range> eventualRanges, StagingTable transactionDigest, String txId, TxCommitProgress progressKeeper) throws MDBCServiceException { @@ -1481,7 +1485,7 @@ public class MusicMixin implements MusicInterface { final UUID id = t.getUUID(1); digestIds.add(new MusicTxDigestId(partitionIndex,id,index++)); } - List<Range> partitions = new ArrayList<>(); + Set<Range> partitions = new HashSet<>(); Set<String> tables = newRow.getSet("keys",String.class); for (String table:tables){ partitions.add(new Range(table)); @@ -1795,6 +1799,27 @@ public class MusicMixin implements MusicInterface { throw(e); } } + + private void createMusicMdbcCheckpointTable() throws MDBCServiceException { + createMusicMdbcCheckpointTable(this.music_ns, this.musicMdbcCheckpointsTableName); + } + + public static void createMusicMdbcCheckpointTable(String namespace, String checkpointTable) throws MDBCServiceException { + String priKey = "txid"; + StringBuilder fields = new StringBuilder(); + fields.append("txid uuid, "); + fields.append("compressed boolean, "); + fields.append("transactiondigest blob ");//notice lack of ',' + String cql = + String.format("CREATE TABLE IF NOT EXISTS %s.%s (mdbcnode UUID, mridigest UUID, digestindex int, PRIMARY KEY (mdbcnode));", + namespace, checkpointTable); + try { + executeMusicWriteQuery(namespace,checkpointTable,cql); + } catch (MDBCServiceException e) { + logger.error("Initialization error: Failure to create redo records table"); + throw(e); + } + } /** * Writes the transaction history to the txDigest @@ -2048,7 +2073,7 @@ public class MusicMixin implements MusicInterface { * @throws MDBCServiceException */ @Override - public List<Range> getRangeDependencies(List<Range> range) throws MDBCServiceException{ + public Set<Range> getRangeDependencies(Set<Range> range) throws MDBCServiceException{ Set<Range> extendedRange = new HashSet<>(); for(Range r: range){ extendedRange.add(r); @@ -2057,7 +2082,7 @@ public class MusicMixin implements MusicInterface { extendedRange.addAll(dependencies.dependentRanges()); } } - return new ArrayList<>(extendedRange); + return extendedRange; } @Override @@ -2089,25 +2114,28 @@ public class MusicMixin implements MusicInterface { /** * fixes the DAG in case the previous owner failed while trying to own the row * @param latestDag - * @param rows - * @param ranges * @param locks * @throws MDBCServiceException */ - private void recoverFromFailureAndUpdateDag(Dag latestDag,List<MusicRangeInformationRow> rows,List<Range> ranges, - Map<UUID,LockResult> locks) throws MDBCServiceException{ - Pair<List<Range>,Set<DagNode>> rangesAndDependents = latestDag.getIncompleteRangesAndDependents(); + private void recoverFromFailureAndUpdateDag(Dag latestDag, Map<UUID,LockResult> locks) throws MDBCServiceException { + Pair<Set<Range>, Set<DagNode>> rangesAndDependents = latestDag.getIncompleteRangesAndDependents(); if(rangesAndDependents.getKey()==null || rangesAndDependents.getKey().size()==0 || rangesAndDependents.getValue()==null || rangesAndDependents.getValue().size() == 0){ return; } - MusicRangeInformationRow r = createAndAssignLock(rangesAndDependents.getKey(), rows); + + Set<UUID> prevPartitions = new HashSet<>(); + for (DagNode dagnode: rangesAndDependents.getRight()) { + prevPartitions.add(dagnode.getId()); + } + + MusicRangeInformationRow r = createAndAssignLock(rangesAndDependents.getKey(), prevPartitions); locks.put(r.getPartitionIndex(),new LockResult(r.getPartitionIndex(),r.getDBPartition().getLockId(),true,rangesAndDependents.getKey())); latestDag.addNewNode(r,new ArrayList<>(rangesAndDependents.getValue())); } - private List<MusicRangeInformationRow> setReadOnlyAnyDoubleRow(Dag latestDag,List<MusicRangeInformationRow> rows, Map<UUID,LockResult> locks) + private List<MusicRangeInformationRow> setReadOnlyAnyDoubleRow(Dag latestDag,Map<UUID,LockResult> locks) throws MDBCServiceException{ List<MusicRangeInformationRow> returnInfo = new ArrayList<>(); List<DagNode> toDisable = latestDag.getOldestDoubles(); @@ -2119,15 +2147,6 @@ public class MusicMixin implements MusicInterface { return returnInfo; } - private MusicRangeInformationRow createAndAssignLock(List<Range> ranges, List<MusicRangeInformationRow> latestRows) throws MDBCServiceException { - UUID newUUID = MDBCUtils.generateTimebasedUniqueKey(); - DatabasePartition newPartition = new DatabasePartition(ranges,newUUID,null); - MusicRangeInformationRow newRow = new MusicRangeInformationRow(newPartition,new ArrayList<>(), - true, extractPreviousPartitions(latestRows)); - createLockedMRIRow(newRow); - return newRow; - } - /** * Create a set of previous partitions to their uuids * @param latestRows @@ -2142,37 +2161,45 @@ public class MusicMixin implements MusicInterface { } @Override - public OwnershipReturn mergeLatestRowsIfNecessary(Dag extendedDag, List<MusicRangeInformationRow> latestRows, - List<Range> ranges, Map<UUID, LockResult> locks, UUID ownershipId) throws MDBCServiceException { - recoverFromFailureAndUpdateDag(extendedDag,latestRows,ranges,locks); - - if (latestRows.size()==1 && latestRows.get(0).getDBPartition().contains(ranges)) { - //reuse current row if possible - MusicRangeInformationRow row = latestRows.get(0); - LockResult lockresult = locks.get(row.getPartitionIndex()); - if (lockresult!=null) { - return new OwnershipReturn(ownershipId, lockresult.getLockId(), row.getPartitionIndex(), ranges, extendedDag); + public OwnershipReturn mergeLatestRowsIfNecessary(Dag currentlyOwned, Map<UUID, LockResult> locksForOwnership, UUID ownershipId) throws MDBCServiceException { + recoverFromFailureAndUpdateDag(currentlyOwned,locksForOwnership); + + if (locksForOwnership.keySet().size()==1) { + //reuse if overlapping single partition, no merge necessary + for (UUID uuid: locksForOwnership.keySet()) { + return new OwnershipReturn(ownershipId, locksForOwnership.get(uuid).getLockId(), uuid, + currentlyOwned.getNode(uuid).getRangeSet(), currentlyOwned); } } - List<MusicRangeInformationRow> changed = setReadOnlyAnyDoubleRow(extendedDag, latestRows,locks); - releaseLocks(changed, locks); - MusicRangeInformationRow row = createAndAssignLock(ranges, latestRows); - latestRows.add(row); - locks.put(row.getPartitionIndex(),new LockResult(row.getPartitionIndex(),row.getDBPartition().getLockId(),true,ranges)); - extendedDag.addNewNodeWithSearch(row,ranges); - Pair<List<Range>, Set<DagNode>> missing = extendedDag.getIncompleteRangesAndDependents(); - if(missing.getKey().size()!=0 && missing.getValue().size()!=0) { - MusicRangeInformationRow newRow = createAndAssignLock(missing.getKey(), latestRows); - latestRows.add(newRow); - locks.put(newRow.getPartitionIndex(), new LockResult(newRow.getPartitionIndex(), row.getDBPartition().getLockId(), - true, missing.getKey())); - extendedDag.addNewNode(newRow, new ArrayList<>(missing.getValue())); - } - changed = setReadOnlyAnyDoubleRow(extendedDag, latestRows,locks); - releaseLocks(changed,locks); - releaseAllLocksExcept(row.getPartitionIndex(),locks); - LockResult ownRow = locks.get(row.getPartitionIndex()); - return new OwnershipReturn(ownershipId, ownRow.getLockId(), ownRow.getIndex(),ranges,extendedDag); + + //merge is necessary + List<MusicRangeInformationRow> changed = setReadOnlyAnyDoubleRow(currentlyOwned, locksForOwnership); + releaseLocks(changed, locksForOwnership); + + Set<Range> ranges = extractRangesToOwn(currentlyOwned, locksForOwnership.keySet()); + + MusicRangeInformationRow createdRow = createAndAssignLock(ranges, locksForOwnership.keySet()); + currentlyOwned.addNewNodeWithSearch(createdRow, ranges); + changed = setReadOnlyAnyDoubleRow(currentlyOwned, locksForOwnership); + releaseLocks(locksForOwnership); + return new OwnershipReturn(ownershipId, createdRow.getDBPartition().getLockId(), createdRow.getPartitionIndex(), + createdRow.getDBPartition().getSnapshot(), currentlyOwned); + } + + private MusicRangeInformationRow createAndAssignLock(Set<Range> ranges, Set<UUID> prevPartitions) throws MDBCServiceException { + UUID newUUID = MDBCUtils.generateTimebasedUniqueKey(); + DatabasePartition newPartition = new DatabasePartition(ranges,newUUID,null); + MusicRangeInformationRow row = new MusicRangeInformationRow(newPartition, true, prevPartitions); + createLockedMRIRow(row); + return row; + } + + private Set<Range> extractRangesToOwn(Dag currentlyOwned, Set<UUID> UUIDs) { + HashSet<Range> ranges = new HashSet<>(); + for (UUID uuid: UUIDs) { + ranges.addAll(currentlyOwned.getNode(uuid).getRow().getDBPartition().getSnapshot()); + } + return ranges; } /** @@ -2466,9 +2493,41 @@ public class MusicMixin implements MusicInterface { } } + @Deprecated //used only in testing, should use other method instead public StateManager getStateManager() { return stateManager; } + + @Override + public void createPartitionIfNeeded(Range rangeToCreate) throws MDBCServiceException { + List<MusicRangeInformationRow> allRows = getAllMriRows(); + for (MusicRangeInformationRow row: allRows) { + if (row.getDBPartition().getSnapshot().contains(rangeToCreate)) { + //range already in MRI row, do not re-create + return; + } + } + + MusicRangeInformationRow mriRow = + createAndAssignLock(new HashSet<Range>(Arrays.asList(rangeToCreate)), new HashSet<UUID>()); + //TODO: should make sure we didn't create 2 new rows simultaneously, while we still own the lock + unlockKeyInMusic(musicRangeInformationTableName, mriRow.getPartitionIndex().toString(), + mriRow.getDBPartition().getLockId()); + } + + @Override + public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) { + String cql = String.format("INSERT INTO %s.%s (mdbcnode, mridigest, digestindex) VALUES (" + + this.myId + ", " + playbackPointer.getLeft() + ", " + playbackPointer.getRight() + ");", + music_ns, this.musicMdbcCheckpointsTableName); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + try { + MusicCore.nonKeyRelatedPut(pQueryObject,"eventual"); + } catch (MusicServiceException e) { + logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to update the checkpoint location", e); + } + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java index 1faf281..ec91ceb 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java @@ -34,10 +34,10 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.TreeSet; - +import java.util.UUID; +import org.apache.commons.lang3.tuple.Pair; import org.json.JSONObject; import org.json.JSONTokener; - import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.mdbc.Configuration; @@ -48,7 +48,6 @@ import org.onap.music.mdbc.query.SQLOperation; import org.onap.music.mdbc.query.SQLOperationType; import org.onap.music.mdbc.tables.Operation; import org.onap.music.mdbc.tables.StagingTable; - import net.sf.jsqlparser.JSQLParserException; import net.sf.jsqlparser.parser.CCJSqlParserUtil; import net.sf.jsqlparser.statement.delete.Delete; @@ -58,166 +57,184 @@ import sun.reflect.generics.reflectiveObjects.NotImplementedException; /** * This class provides the methods that MDBC needs in order to mirror data to/from a - * <a href="https://dev.mysql.com/">MySQL</a> or <a href="http://mariadb.org/">MariaDB</a> database instance. - * This class uses the <code>JSON_OBJECT()</code> database function, which means it requires the following - * minimum versions of either database: + * <a href="https://dev.mysql.com/">MySQL</a> or <a href="http://mariadb.org/">MariaDB</a> database instance. This class + * uses the <code>JSON_OBJECT()</code> database function, which means it requires the following minimum versions of + * either database: * <table summary=""> - * <tr><th>DATABASE</th><th>VERSION</th></tr> - * <tr><td>MySQL</td><td>5.7.8</td></tr> - * <tr><td>MariaDB</td><td>10.2.3 (Note: 10.2.3 is currently (July 2017) a <i>beta</i> release)</td></tr> + * <tr> + * <th>DATABASE</th> + * <th>VERSION</th> + * </tr> + * <tr> + * <td>MySQL</td> + * <td>5.7.8</td> + * </tr> + * <tr> + * <td>MariaDB</td> + * <td>10.2.3 (Note: 10.2.3 is currently (July 2017) a <i>beta</i> release)</td> + * </tr> * </table> * * @author Robert P. Eby */ public class MySQLMixin implements DBInterface { - private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MySQLMixin.class); - - public static final String MIXIN_NAME = "mysql"; - public static final String TRANS_TBL = "MDBC_TRANSLOG"; - private static final String CREATE_TBL_SQL = - "CREATE TABLE IF NOT EXISTS "+TRANS_TBL+ - " (IX INT AUTO_INCREMENT, OP CHAR(1), SCHEMANAME VARCHAR(255), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA BLOB, " + - "CONNECTION_ID INT, PRIMARY KEY (IX));"; - - private final MusicInterface mi; - private final int connId; - private final String dbName; - private final Connection jdbcConn; - private final Map<String, TableInfo> tables; - private PreparedStatement deleteStagingStatement; - private boolean server_tbl_created = false; - private boolean useAsyncStagingUpdate = false; - private Object stagingHandlerLock = new Object(); - private AsyncUpdateHandler stagingHandler = null; - private StagingTable currentStaging=null; - - public MySQLMixin() { - this.mi = null; - this.connId = 0; - this.dbName = null; - this.jdbcConn = null; - this.tables = null; - this.deleteStagingStatement = null; - } - public MySQLMixin(MusicInterface mi, String url, Connection conn, Properties info) throws SQLException { - this.mi = mi; - this.connId = generateConnID(conn); - this.dbName = getDBName(conn); - this.jdbcConn = conn; - this.tables = new HashMap<String, TableInfo>(); - useAsyncStagingUpdate = Boolean.parseBoolean(info.getProperty(Configuration.KEY_ASYNC_STAGING_TABLE_UPDATE, - Configuration.ASYNC_STAGING_TABLE_UPDATE)); - this.deleteStagingStatement = getStagingDeletePreparedStatement(); - } - - class StagingTableUpdateRunnable implements Runnable{ - - private MySQLMixin mixin; - private StagingTable staging; - - StagingTableUpdateRunnable(MySQLMixin mixin, StagingTable staging){ - this.mixin=mixin; - this.staging=staging; - } - - @Override - public void run() { - try { - this.mixin.updateStagingTable(staging); - } catch (NoSuchFieldException|MDBCServiceException e) { - this.mixin.logger.error("Error when updating the staging table"); - } - } - } - - private PreparedStatement getStagingDeletePreparedStatement() throws SQLException { - return jdbcConn.prepareStatement("DELETE FROM "+TRANS_TBL+" WHERE (IX BETWEEN ? AND ? ) AND " + - "CONNECTION_ID = ?;"); - } - - // This is used to generate a unique connId for this connection to the DB. - private int generateConnID(Connection conn) { - int rv = (int) System.currentTimeMillis(); // random-ish - try { - Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT CONNECTION_ID() AS IX"); - if (rs.next()) { - rv = rs.getInt("IX"); - } - stmt.close(); - } catch (SQLException e) { - logger.error(EELFLoggerDelegate.errorLogger,"generateConnID: problem generating a connection ID!"); - } - return rv; - } - - /** - * Get the name of this DBnterface mixin object. - * @return the name - */ - @Override - public String getMixinName() { - return MIXIN_NAME; - } - - @Override - public void close() { - // nothing yet - } - - /** - * Determines the db name associated with the connection - * This is the private/internal method that actually determines the name - * @param conn - * @return - */ - private String getDBName(Connection conn) { - String dbname = "mdbc"; //default name - try { - Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT DATABASE() AS DB"); - if (rs.next()) { - dbname = rs.getString("DB"); - } - stmt.close(); - } catch (SQLException e) { - logger.error(EELFLoggerDelegate.errorLogger, "getDBName: problem getting database name from mysql"); - } - return dbname; - } - - @Override - public String getDatabaseName() { - return this.dbName; - } - - @Override - public String getSchema() {return this.dbName;} - - - @Override - public Set<String> getSQLTableSet() { - Set<String> set = new TreeSet<String>(); - String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'"; - try { - Statement stmt = jdbcConn.createStatement(); - ResultSet rs = stmt.executeQuery(sql); - while (rs.next()) { - String s = rs.getString("TABLE_NAME"); - set.add(s); - } - stmt.close(); - } catch (SQLException e) { - logger.error(EELFLoggerDelegate.errorLogger,"getSQLTableSet: "+e); - } - logger.debug(EELFLoggerDelegate.applicationLogger,"getSQLTableSet returning: "+ set); - return set; - } - - @Override + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MySQLMixin.class); + + public static final String MIXIN_NAME = "mysql"; + public static final String TRANS_TBL = "MDBC_TRANSLOG"; + private static final String CREATE_TBL_SQL = "CREATE TABLE IF NOT EXISTS " + TRANS_TBL + + " (IX INT AUTO_INCREMENT, OP CHAR(1), SCHEMANAME VARCHAR(255), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA BLOB, " + + "CONNECTION_ID INT, PRIMARY KEY (IX));"; + private static final String CKPT_TBL = "MDBC_CHECKPOINT"; + private static final String CREATE_CKPT_SQL = + "CREATE TABLE IF NOT EXISTS " + CKPT_TBL + " (RANGENAME VARCHAR(64) PRIMARY KEY, MRIROW VARCHAR(36), DIGESTINDEX INT);"; + + private final MusicInterface mi; + private final int connId; + private final String dbName; + private final Connection jdbcConn; + private final Map<String, TableInfo> tables; + private PreparedStatement deleteStagingStatement; + private boolean useAsyncStagingUpdate = false; + private Object stagingHandlerLock = new Object(); + private AsyncUpdateHandler stagingHandler = null; + private StagingTable currentStaging = null; + + public MySQLMixin() { + this.mi = null; + this.connId = 0; + this.dbName = null; + this.jdbcConn = null; + this.tables = null; + this.deleteStagingStatement = null; + } + + public MySQLMixin(MusicInterface mi, String url, Connection conn, Properties info) throws SQLException { + this.mi = mi; + this.connId = generateConnID(conn); + this.dbName = getDBName(conn); + this.jdbcConn = conn; + this.tables = new HashMap<String, TableInfo>(); + useAsyncStagingUpdate = Boolean.parseBoolean(info.getProperty(Configuration.KEY_ASYNC_STAGING_TABLE_UPDATE, + Configuration.ASYNC_STAGING_TABLE_UPDATE)); + this.deleteStagingStatement = getStagingDeletePreparedStatement(); + } + + class StagingTableUpdateRunnable implements Runnable { + + private MySQLMixin mixin; + private StagingTable staging; + + StagingTableUpdateRunnable(MySQLMixin mixin, StagingTable staging) { + this.mixin = mixin; + this.staging = staging; + } + + @Override + public void run() { + try { + this.mixin.updateStagingTable(staging); + } catch (NoSuchFieldException | MDBCServiceException e) { + this.mixin.logger.error("Error when updating the staging table"); + } + } + } + + private PreparedStatement getStagingDeletePreparedStatement() throws SQLException { + return jdbcConn.prepareStatement( + "DELETE FROM " + TRANS_TBL + " WHERE (IX BETWEEN ? AND ? ) AND " + "CONNECTION_ID = ?;"); + } + + // This is used to generate a unique connId for this connection to the DB. + private int generateConnID(Connection conn) { + int rv = (int) System.currentTimeMillis(); // random-ish + try { + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT CONNECTION_ID() AS IX"); + if (rs.next()) { + rv = rs.getInt("IX"); + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "generateConnID: problem generating a connection ID!"); + } + return rv; + } + + /** + * Get the name of this DBnterface mixin object. + * + * @return the name + */ + @Override + public String getMixinName() { + return MIXIN_NAME; + } + + @Override + public void close() { + // nothing yet + } + + /** + * Determines the db name associated with the connection This is the private/internal method that actually + * determines the name + * + * @param conn + * @return + */ + private String getDBName(Connection conn) { + String dbname = "mdbc"; // default name + try { + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT DATABASE() AS DB"); + if (rs.next()) { + dbname = rs.getString("DB"); + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "getDBName: problem getting database name from mysql"); + } + return dbname; + } + + @Override + public String getDatabaseName() { + return this.dbName; + } + + @Override + public String getSchema() { + return this.dbName; + } + + + @Deprecated + @Override + public Set<String> getSQLTableSet() { + Set<String> set = new TreeSet<String>(); + String sql = + "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'"; + try { + Statement stmt = jdbcConn.createStatement(); + ResultSet rs = stmt.executeQuery(sql); + while (rs.next()) { + String s = rs.getString("TABLE_NAME"); + set.add(s); + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "getSQLTableSet: " + e); + } + logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set); + return set; + } + + @Override public Set<Range> getSQLRangeSet() { Set<String> set = new TreeSet<String>(); - String sql = "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'"; + String sql = + "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'"; try { Statement stmt = jdbcConn.createStatement(); ResultSet rs = stmt.executeQuery(sql); @@ -227,766 +244,727 @@ public class MySQLMixin implements DBInterface { } stmt.close(); } catch (SQLException e) { - logger.error(EELFLoggerDelegate.errorLogger,"getSQLTableSet: "+e); + logger.error(EELFLoggerDelegate.errorLogger, "getSQLTableSet: " + e); } - logger.debug(EELFLoggerDelegate.applicationLogger,"getSQLTableSet returning: "+ set); + logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set); Set<Range> rangeSet = new HashSet<>(); - for (String table: set) { + for (String table : set) { rangeSet.add(new Range(table)); } return rangeSet; } - -/* -mysql> describe tables; -+-----------------+---------------------+------+-----+---------+-------+ -| Field | Type | Null | Key | Default | Extra | -+-----------------+---------------------+------+-----+---------+-------+ -| TABLE_CATALOG | varchar(512) | NO | | | | -| TABLE_SCHEMA | varchar(64) | NO | | | | -| TABLE_NAME | varchar(64) | NO | | | | -| TABLE_TYPE | varchar(64) | NO | | | | -| ENGINE | varchar(64) | YES | | NULL | | -| VERSION | bigint(21) unsigned | YES | | NULL | | -| ROW_FORMAT | varchar(10) | YES | | NULL | | -| TABLE_ROWS | bigint(21) unsigned | YES | | NULL | | -| AVG_ROW_LENGTH | bigint(21) unsigned | YES | | NULL | | -| DATA_LENGTH | bigint(21) unsigned | YES | | NULL | | -| MAX_DATA_LENGTH | bigint(21) unsigned | YES | | NULL | | -| INDEX_LENGTH | bigint(21) unsigned | YES | | NULL | | -| DATA_FREE | bigint(21) unsigned | YES | | NULL | | -| AUTO_INCREMENT | bigint(21) unsigned | YES | | NULL | | -| CREATE_TIME | datetime | YES | | NULL | | -| UPDATE_TIME | datetime | YES | | NULL | | -| CHECK_TIME | datetime | YES | | NULL | | -| TABLE_COLLATION | varchar(32) | YES | | NULL | | -| CHECKSUM | bigint(21) unsigned | YES | | NULL | | -| CREATE_OPTIONS | varchar(255) | YES | | NULL | | -| TABLE_COMMENT | varchar(2048) | NO | | | | -+-----------------+---------------------+------+-----+---------+-------+ - */ - /** - * Return a TableInfo object for the specified table. - * This method first looks in a cache of previously constructed TableInfo objects for the table. - * If not found, it queries the INFORMATION_SCHEMA.COLUMNS table to obtain the column names, types, and indexes of the table. - * It creates a new TableInfo object with the results. - * @param tableName the table to look up - * @return a TableInfo object containing the info we need, or null if the table does not exist - */ - @Override - public TableInfo getTableInfo(String tableName) { - TableInfo ti = tables.get(tableName); - if (ti == null) { - try { - final String[] split = tableName.split("\\."); - String tbl = (split.length==2)?split[1]:tableName; - String localSchema = (split.length==2)?split[0]:getSchema(); - StringBuilder sql=new StringBuilder(); - sql.append("SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA="); - if(localSchema==null) { - sql.append("DATABASE() AND TABLE_NAME='"); - } - else { - sql.append("'").append(localSchema).append("' AND TABLE_NAME='"); - } - sql.append(tbl).append("';"); - ResultSet rs = executeSQLRead(sql.toString()); - if (rs != null) { - ti = new TableInfo(); - while (rs.next()) { - String name = rs.getString("COLUMN_NAME"); - String type = rs.getString("DATA_TYPE"); - String ckey = rs.getString("COLUMN_KEY"); - ti.columns.add(name); - ti.coltype.add(mapDatatypeNameToType(type)); - ti.iskey.add(ckey != null && !ckey.equals("")); - } - rs.getStatement().close(); - } else { - logger.error(EELFLoggerDelegate.errorLogger,"Cannot retrieve table info for table "+tableName+" from MySQL."); - } - } catch (SQLException e) { - logger.error(EELFLoggerDelegate.errorLogger,"Cannot retrieve table info for table "+tableName+" from MySQL: "+e); - return null; - } - tables.put(tableName, ti); - } - return ti; - } - // Map MySQL data type names to the java.sql.Types equivalent - private int mapDatatypeNameToType(String nm) { - switch (nm) { - case "tinyint": return Types.TINYINT; - case "smallint": return Types.SMALLINT; - case "mediumint": - case "int": return Types.INTEGER; - case "bigint": return Types.BIGINT; - case "decimal": - case "numeric": return Types.DECIMAL; - case "float": return Types.FLOAT; - case "double": return Types.DOUBLE; - case "date": - case "datetime": return Types.DATE; - case "time": return Types.TIME; - case "timestamp": return Types.TIMESTAMP; - case "char": return Types.CHAR; - case "text": - case "varchar": return Types.VARCHAR; - case "mediumblob": - case "longblob": - case "blob": return Types.BLOB; - default: - logger.error(EELFLoggerDelegate.errorLogger,"unrecognized and/or unsupported data type "+nm); - return Types.VARCHAR; - } - } - @Override - public void createSQLTriggers(String table) { - final String[] split = table.split("\\."); - String schemaName = (split.length==2)?split[0]:getSchema(); - String tableName = (split.length==2)?split[1]:table; - - if (tableName.equals(TRANS_TBL)) - // Don't create triggers for the table the triggers write into!!! - return; - try { - if (!server_tbl_created) { - try { - Statement stmt = jdbcConn.createStatement(); - stmt.execute(CREATE_TBL_SQL); - stmt.close(); - - logger.info(EELFLoggerDelegate.applicationLogger,"createSQLTriggers: Server side dirty table created."); - server_tbl_created = true; - } catch (SQLException e) { - logger.error(EELFLoggerDelegate.errorLogger,"createSQLTriggers: problem creating the "+TRANS_TBL+" table!"); - } - } - - // Give the triggers a way to find this MSM - for (String name : getTriggerNames(tableName)) { - logger.info(EELFLoggerDelegate.applicationLogger,"ADD trigger "+name+" to msm_map"); - //\TODO fix this is an error - //msm.register(name); - } - // No SELECT trigger - executeSQLWrite(generateTrigger(schemaName,tableName, "INSERT")); - executeSQLWrite(generateTrigger(schemaName,tableName, "UPDATE")); - //\TODO: save key row instead of the whole row for delete - executeSQLWrite(generateTrigger(schemaName,tableName, "DELETE")); - } catch (SQLException e) { - if (e.getMessage().equals("Trigger already exists") || e.getMessage().endsWith("already exists")){ - //only warn if trigger already exists - logger.warn(EELFLoggerDelegate.applicationLogger, "createSQLTriggers" + e); - } else { - logger.error(EELFLoggerDelegate.errorLogger,"createSQLTriggers: "+e); - } - } - } -/* -CREATE TRIGGER `triggername` BEFORE UPDATE ON `table` -FOR EACH ROW BEGIN -INSERT INTO `log_table` ( `field1` `field2`, ...) VALUES ( NEW.`field1`, NEW.`field2`, ...) ; -END; - -OLD.field refers to the old value -NEW.field refers to the new value -*/ - private String generateTrigger(String schema, String tableName, String op) { - boolean isdelete = op.equals("DELETE"); - boolean isinsert = op.equals("INSERT"); - boolean isupdate = op.equals("UPDATE"); - TableInfo ti = getTableInfo(tableName); - StringBuilder newJson = new StringBuilder("JSON_OBJECT("); // JSON_OBJECT(key, val, key, val) page 1766 - StringBuilder keyJson = new StringBuilder("JSON_OBJECT("); // JSON_OBJECT(key, val, key, val) page 1766 - String pfx = ""; - String kfx = ""; - for (String col : ti.columns) { - newJson.append(pfx) - .append("'").append(col).append("', ") - .append(isdelete ? "OLD." : "NEW.") - .append(col); - if (ti.iskey(col) || !ti.hasKey()) { - keyJson.append(kfx) - .append("'").append(col).append("', ") - .append(isinsert ? "NEW." : "OLD.") - .append(col); - kfx = ", "; - } - pfx = ", "; - } - newJson.append(")"); - keyJson.append(")"); - //\TODO check if using mysql driver, so instead check the exception - //\TODO add conditional for update, if primary key is still the same, use null in the KEYDATA col - StringBuilder sb = new StringBuilder() - .append("CREATE TRIGGER ") // IF NOT EXISTS not supported by MySQL! - .append(String.format("%s_%s", op.substring(0, 1), tableName)) - .append(" AFTER ") - .append(op) - .append(" ON ") - .append(tableName) - .append(" FOR EACH ROW INSERT INTO ") - .append(TRANS_TBL) - .append(" (SCHEMANAME, TABLENAME, OP, KEYDATA, ROWDATA, CONNECTION_ID) VALUES('") - .append( (schema==null)?this.getSchema():schema ) - .append("', '") - .append(tableName) - .append("', ") - .append(isdelete ? "'D'" : (op.equals("INSERT") ? "'I'" : "'U'")) - .append(", ") - .append( (keyJson.length()>"JSON_OBJECT()".length()) ? keyJson.toString() : "NULL") - .append(", ") - .append(newJson.toString()) - .append(", ") - .append("CONNECTION_ID()") - .append(")"); - return sb.toString(); - } - private String[] getTriggerNames(String tableName) { - return new String[] { - "I_" + tableName, // INSERT trigger - "U_" + tableName, // UPDATE trigger - "D_" + tableName // DELETE trigger - }; - } - - @Override - public void dropSQLTriggers(String tableName) { - try { - for (String name : getTriggerNames(tableName)) { - logger.info(EELFLoggerDelegate.applicationLogger,"REMOVE trigger "+name+" from msmmap"); - executeSQLWrite("DROP TRIGGER IF EXISTS " +name); - //\TODO Fix this is an error - //msm.unregister(name); - } - } catch (SQLException e) { - logger.error(EELFLoggerDelegate.errorLogger,"dropSQLTriggers: "+e); - } - } - - @Override - public void insertRowIntoSqlDb(String tableName, Map<String, Object> map) { - TableInfo ti = getTableInfo(tableName); - String sql = ""; - if (rowExists(tableName, ti, map)) { - // Update - Construct the what and where strings for the DB write - StringBuilder what = new StringBuilder(); - StringBuilder where = new StringBuilder(); - String pfx = ""; - String pfx2 = ""; - for (int i = 0; i < ti.columns.size(); i++) { - String col = ti.columns.get(i); - String val = Utils.getStringValue(map.get(col)); - if (ti.iskey.get(i)) { - where.append(pfx).append(col).append("=").append(val); - pfx = " AND "; - } else { - what.append(pfx2).append(col).append("=").append(val); - pfx2 = ", "; - } - } - sql = String.format("UPDATE %s SET %s WHERE %s", tableName, what.toString(), where.toString()); - } else { - // Construct the value string and column name string for the DB write - StringBuilder fields = new StringBuilder(); - StringBuilder values = new StringBuilder(); - String pfx = ""; - for (String col : ti.columns) { - fields.append(pfx).append(col); - values.append(pfx).append(Utils.getStringValue(map.get(col))); - pfx = ", "; - } - sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString()); - } - try { - executeSQLWrite(sql); - } catch (SQLException e1) { - logger.error(EELFLoggerDelegate.errorLogger,"executeSQLWrite: "+e1); - } - // TODO - remove any entries from MDBC_TRANSLOG corresponding to this update - // SELECT IX, OP, KEYDATA FROM MDBC_TRANS_TBL WHERE CONNID = "+connId AND TABLENAME = tblname - } - - private boolean rowExists(String tableName, TableInfo ti, Map<String, Object> map) { - StringBuilder where = new StringBuilder(); - String pfx = ""; - for (int i = 0; i < ti.columns.size(); i++) { - if (ti.iskey.get(i)) { - String col = ti.columns.get(i); - String val = Utils.getStringValue(map.get(col)); - where.append(pfx).append(col).append("=").append(val); - pfx = " AND "; - } - } - String sql = String.format("SELECT * FROM %s WHERE %s", tableName, where.toString()); - ResultSet rs = executeSQLRead(sql); - try { - boolean rv = rs.next(); - rs.close(); - return rv; - } catch (SQLException e) { - return false; - } - } - - - @Override - public void deleteRowFromSqlDb(String tableName, Map<String, Object> map) { - TableInfo ti = getTableInfo(tableName); - StringBuilder where = new StringBuilder(); - String pfx = ""; - for (int i = 0; i < ti.columns.size(); i++) { - if (ti.iskey.get(i)) { - String col = ti.columns.get(i); - Object val = map.get(col); - where.append(pfx).append(col).append("=").append(Utils.getStringValue(val)); - pfx = " AND "; - } - } - try { - String sql = String.format("DELETE FROM %s WHERE %s", tableName, where.toString()); - executeSQLWrite(sql); - } catch (SQLException e) { - e.printStackTrace(); - } - } - - /** - * This method executes a read query in the SQL database. Methods that call this method should be sure - * to call resultset.getStatement().close() when done in order to free up resources. - * @param sql the query to run - * @return a ResultSet containing the rows returned from the query - */ - @Override - public ResultSet executeSQLRead(String sql) { - logger.debug(EELFLoggerDelegate.applicationLogger,"executeSQLRead"); - logger.debug("Executing SQL read:"+ sql); - ResultSet rs = null; - try { - Statement stmt = jdbcConn.createStatement(); - rs = stmt.executeQuery(sql); - } catch (SQLException e) { - logger.error(EELFLoggerDelegate.errorLogger,"executeSQLRead"+e); - } - return rs; - } - - @Override - public void preCommitHook() { - synchronized (stagingHandlerLock){ - //\TODO check if this can potentially block forever in certain scenarios - if(stagingHandler!=null){ - stagingHandler.waitForAllPendingUpdates(); - } - } - } - - /** - * This method executes a write query in the sql database. - * @param sql the SQL to be sent to MySQL - * @throws SQLException if an underlying JDBC method throws an exception - */ - protected void executeSQLWrite(String sql) throws SQLException { - logger.debug(EELFLoggerDelegate.applicationLogger, "Executing SQL write:"+ sql); - - Statement stmt = jdbcConn.createStatement(); - stmt.execute(sql); - stmt.close(); - } - - /** - * Code to be run within the DB driver before a SQL statement is executed. This is where tables - * can be synchronized before a SELECT, for those databases that do not support SELECT triggers. - * @param sql the SQL statement that is about to be executed - * @return list of keys that will be updated, if they can't be determined afterwards (i.e. sql table doesn't have primary key) - */ - @Override - public void preStatementHook(final String sql) { - if (sql == null) { - return; - } - String cmd = sql.trim().toLowerCase(); - if (cmd.startsWith("select")) { - String[] parts = sql.trim().split(" "); - Set<String> set = getSQLTableSet(); - for (String part : parts) { - if (set.contains(part.toUpperCase())) { - // Found a candidate table name in the SELECT SQL -- update this table - //msm.readDirtyRowsAndUpdateDb(part); - } - } - } - } - - /** - * Code to be run within the DB driver after a SQL statement has been executed. This is where remote - * statement actions can be copied back to Cassandra/MUSIC. - * @param sql the SQL statement that was executed - */ - @Override - public void postStatementHook(final String sql,StagingTable transactionDigest) { - if (sql != null) { - String[] parts = sql.trim().split(" "); - String cmd = parts[0].toLowerCase(); - if ("delete".equals(cmd) || "insert".equals(cmd) || "update".equals(cmd)) { - if (useAsyncStagingUpdate) { - synchronized (stagingHandlerLock){ - if(stagingHandler==null||currentStaging!=transactionDigest){ - Runnable newRunnable = new StagingTableUpdateRunnable(this, transactionDigest); - currentStaging=transactionDigest; - stagingHandler=new AsyncUpdateHandler(newRunnable); - } - //else we can keep using the current staging Handler - } - stagingHandler.processNewUpdate(); - } else { - - try { - this.updateStagingTable(transactionDigest); - } catch (NoSuchFieldException | MDBCServiceException e) { - // TODO Auto-generated catch block - this.logger.error("Error updating the staging table"); - } - } - } - } - } - - private SQLOperation toOpEnum(String operation) throws NoSuchFieldException { - switch (operation.toLowerCase()) { - case "i": - return SQLOperation.INSERT; - case "d": - return SQLOperation.DELETE; - case "u": - return SQLOperation.UPDATE; - case "s": - return SQLOperation.SELECT; - default: - logger.error(EELFLoggerDelegate.errorLogger,"Invalid operation selected: ["+operation+"]"); - throw new NoSuchFieldException("Invalid operation enum"); - } - - } - /** - * Copy data that is in transaction table into music interface - * @param transactionDigests - * @throws NoSuchFieldException - */ - private void updateStagingTable(StagingTable transactionDigests) - throws NoSuchFieldException, MDBCServiceException { - // copy from DB.MDBC_TRANSLOG where connid == myconnid - // then delete from MDBC_TRANSLOG - String sql2 = "SELECT IX, SCHEMANAME, TABLENAME, OP, ROWDATA, KEYDATA FROM " + TRANS_TBL +" WHERE CONNECTION_ID = " + this.connId; - Integer biggestIx = Integer.MIN_VALUE; - Integer smallestIx = Integer.MAX_VALUE; - try { - ResultSet rs = executeSQLRead(sql2); - Set<Integer> rows = new TreeSet<Integer>(); - while (rs.next()) { - int ix = rs.getInt("IX"); - biggestIx = Integer.max(biggestIx,ix); - smallestIx = Integer.min(smallestIx,ix); - String op = rs.getString("OP"); - SQLOperation opType = toOpEnum(op); - String schema= rs.getString("SCHEMANAME"); - String tbl = rs.getString("TABLENAME"); - String newRowStr = rs.getString("ROWDATA"); - String rowStr = rs.getString("KEYDATA"); - Range range = new Range(schema+"."+tbl); - transactionDigests.addOperation(range,opType,newRowStr,rowStr); - rows.add(ix); - } - rs.getStatement().close(); - // batch delete operations - if (rows.size() > 0) { - this.deleteStagingStatement.setInt(1,smallestIx); - this.deleteStagingStatement.setInt(2,biggestIx); - this.deleteStagingStatement.setInt(3,this.connId); - logger.debug("Staging delete: Executing with vals ["+smallestIx+","+biggestIx+","+this.connId+"]"); - this.deleteStagingStatement.execute(); - } - } catch (SQLException e) { - logger.warn("Exception in postStatementHook: "+e); - e.printStackTrace(); - } - } - - - - /** - * Update music with data from MySQL table - * - * @param tableName - name of table to update in music - */ - @Override - public void synchronizeData(String tableName) { - ResultSet rs = null; - TableInfo ti = getTableInfo(tableName); - String query = "SELECT * FROM "+tableName; - - try { - rs = executeSQLRead(query); - if(rs==null) return; - while(rs.next()) { - - JSONObject jo = new JSONObject(); - if (!getTableInfo(tableName).hasKey()) { - String musicKey = MDBCUtils.generateUniqueKey().toString(); - jo.put(mi.getMusicDefaultPrimaryKeyName(), musicKey); - } - - for (String col : ti.columns) { - jo.put(col, rs.getString(col)); - } - - @SuppressWarnings("unused") - Object[] row = Utils.jsonToRow(ti,tableName, jo, mi.getMusicDefaultPrimaryKeyName()); - //\FIXME this is wrong now, update of the dirty row and entity is now handled by the archival process - //msm.updateDirtyRowAndEntityTableInMusic(ti,tableName, jo); - } - } catch (Exception e) { - logger.error(EELFLoggerDelegate.errorLogger, "synchronizing data " + tableName + - " -> " + e.getMessage()); - } - finally { - try { - if(rs!=null) { - rs.close(); - } - } catch (SQLException e) { - //continue - } - } - - } - - /** - * Return a list of "reserved" names, that should not be used by MySQL client/MUSIC - * These are reserved for mdbc - */ - @Override - public List<String> getReservedTblNames() { - ArrayList<String> rsvdTables = new ArrayList<String>(); - rsvdTables.add(TRANS_TBL); - //Add others here as necessary - return rsvdTables; - } - @Override - public String getPrimaryKey(String sql, String tableName) { - // - return null; - } - - - public String applyDigest(Map<Range, StagingTable> digest){ - throw new NotImplementedException(); - } - - @SuppressWarnings("unused") - @Deprecated - private ArrayList<String> getMusicKey(String sql) { - try { - net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql); - if (stmt instanceof Insert) { - Insert s = (Insert) stmt; - String tbl = s.getTable().getName(); - return getMusicKey(tbl, "INSERT", sql); - } else if (stmt instanceof Update){ - Update u = (Update) stmt; - String tbl = u.getTables().get(0).getName(); - return getMusicKey(tbl, "UPDATE", sql); - } else if (stmt instanceof Delete) { - Delete d = (Delete) stmt; - //TODO: IMPLEMENT - String tbl = d.getTable().getName(); - return getMusicKey(tbl, "DELETE", sql); - } else { - System.err.println("Not recognized sql type"); - } - - } catch (JSQLParserException e) { - - e.printStackTrace(); - } - //Something went wrong here - return new ArrayList<String>(); - } - - /** - * Returns all keys that matches the current sql statement, and not in already updated keys. - * - * @param tbl - * @param cmd - * @param sql - */ - @Deprecated - private ArrayList<String> getMusicKey(String tbl, String cmd, String sql) { - ArrayList<String> musicKeys = new ArrayList<String>(); - /* - if (cmd.equalsIgnoreCase("insert")) { - //create key, return key - musicKeys.add(msm.generatePrimaryKey()); - } else if (cmd.equalsIgnoreCase("update") || cmd.equalsIgnoreCase("delete")) { - try { - net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql); - String where; - if (stmt instanceof Update) { - where = ((Update) stmt).getWhere().toString(); - } else if (stmt instanceof Delete) { - where = ((Delete) stmt).getWhere().toString(); - } else { - System.err.println("Unknown type: " +stmt.getClass()); - where = ""; - } - ResultSet rs = executeSQLRead("SELECT * FROM " + tbl + " WHERE " + where); - musicKeys = msm.getMusicKeysWhere(tbl, Utils.parseResults(getTableInfo(tbl), rs)); - } catch (JSQLParserException e) { - - e.printStackTrace(); - } catch (SQLException e) { - //Not a valid sql query - e.printStackTrace(); - } - } - */ - return musicKeys; - } - - - @Deprecated - public void insertRowIntoSqlDbOLD(String tableName, Map<String, Object> map) { - // First construct the value string and column name string for the db write - TableInfo ti = getTableInfo(tableName); - StringBuilder fields = new StringBuilder(); - StringBuilder values = new StringBuilder(); - String pfx = ""; - for (String col : ti.columns) { - fields.append(pfx).append(col); - values.append(pfx).append(Utils.getStringValue(map.get(col))); - pfx = ", "; - } - - try { - String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString()); - executeSQLWrite(sql); - } catch (SQLException e) { - logger.error(EELFLoggerDelegate.errorLogger,"Insert failed because row exists, do an update"); - StringBuilder where = new StringBuilder(); - pfx = ""; - String pfx2 = ""; - fields.setLength(0); - for (int i = 0; i < ti.columns.size(); i++) { - String col = ti.columns.get(i); - String val = Utils.getStringValue(map.get(col)); - if (ti.iskey.get(i)) { - where.append(pfx).append(col).append("=").append(val); - pfx = " AND "; - } else { - fields.append(pfx2).append(col).append("=").append(val); - pfx2 = ", "; - } - } - String sql = String.format("UPDATE %s SET %s WHERE %s", tableName, fields.toString(), where.toString()); - try { - executeSQLWrite(sql); - } catch (SQLException e1) { - logger.error(EELFLoggerDelegate.errorLogger,"executeSQLWrite"+e1); - } - } - } - - /** - * Parse the transaction digest into individual events - * @param transaction - base 64 encoded, serialized digest - * @throws MDBCServiceException - */ - public void replayTransaction(StagingTable transaction, List<Range> ranges) throws SQLException, MDBCServiceException { - boolean autocommit = jdbcConn.getAutoCommit(); - jdbcConn.setAutoCommit(false); - Statement jdbcStmt = jdbcConn.createStatement(); - ArrayList<Operation> opList = transaction.getOperationList(); - - for (Operation op: opList) { - if(Range.overlaps(ranges,op.getTable())) { - try { - replayOperationIntoDB(jdbcStmt, op); - } catch (SQLException | MDBCServiceException e) { - //rollback transaction - logger.error("Unable to replay: " + op.getOperationType() + "->" + op.getVal() + "." - + "Rolling back the entire digest replay."); - jdbcConn.rollback(); - throw e; - } - } - } - - clearReplayedOperations(jdbcStmt); - jdbcConn.commit(); - jdbcStmt.close(); - - jdbcConn.setAutoCommit(autocommit); + + /* + * mysql> describe tables; +-----------------+---------------------+------+-----+---------+-------+ | Field | Type | + * Null | Key | Default | Extra | +-----------------+---------------------+------+-----+---------+-------+ | + * TABLE_CATALOG | varchar(512) | NO | | | | | TABLE_SCHEMA | varchar(64) | NO | | | | | TABLE_NAME | varchar(64) | + * NO | | | | | TABLE_TYPE | varchar(64) | NO | | | | | ENGINE | varchar(64) | YES | | NULL | | | VERSION | + * bigint(21) unsigned | YES | | NULL | | | ROW_FORMAT | varchar(10) | YES | | NULL | | | TABLE_ROWS | bigint(21) + * unsigned | YES | | NULL | | | AVG_ROW_LENGTH | bigint(21) unsigned | YES | | NULL | | | DATA_LENGTH | bigint(21) + * unsigned | YES | | NULL | | | MAX_DATA_LENGTH | bigint(21) unsigned | YES | | NULL | | | INDEX_LENGTH | + * bigint(21) unsigned | YES | | NULL | | | DATA_FREE | bigint(21) unsigned | YES | | NULL | | | AUTO_INCREMENT | + * bigint(21) unsigned | YES | | NULL | | | CREATE_TIME | datetime | YES | | NULL | | | UPDATE_TIME | datetime | YES + * | | NULL | | | CHECK_TIME | datetime | YES | | NULL | | | TABLE_COLLATION | varchar(32) | YES | | NULL | | | + * CHECKSUM | bigint(21) unsigned | YES | | NULL | | | CREATE_OPTIONS | varchar(255) | YES | | NULL | | | + * TABLE_COMMENT | varchar(2048) | NO | | | | + * +-----------------+---------------------+------+-----+---------+-------+ + */ + /** + * Return a TableInfo object for the specified table. This method first looks in a cache of previously constructed + * TableInfo objects for the table. If not found, it queries the INFORMATION_SCHEMA.COLUMNS table to obtain the + * column names, types, and indexes of the table. It creates a new TableInfo object with the results. + * + * @param tableName the table to look up + * @return a TableInfo object containing the info we need, or null if the table does not exist + */ + @Override + public TableInfo getTableInfo(String tableName) { + TableInfo ti = tables.get(tableName); + if (ti == null) { + try { + final String[] split = tableName.split("\\."); + String tbl = (split.length == 2) ? split[1] : tableName; + String localSchema = (split.length == 2) ? split[0] : getSchema(); + StringBuilder sql = new StringBuilder(); + sql.append( + "SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA="); + if (localSchema == null) { + sql.append("DATABASE() AND TABLE_NAME='"); + } else { + sql.append("'").append(localSchema).append("' AND TABLE_NAME='"); + } + sql.append(tbl).append("';"); + ResultSet rs = executeSQLRead(sql.toString()); + if (rs != null) { + ti = new TableInfo(); + while (rs.next()) { + String name = rs.getString("COLUMN_NAME"); + String type = rs.getString("DATA_TYPE"); + String ckey = rs.getString("COLUMN_KEY"); + ti.columns.add(name); + ti.coltype.add(mapDatatypeNameToType(type)); + ti.iskey.add(ckey != null && !ckey.equals("")); + } + rs.getStatement().close(); + } else { + logger.error(EELFLoggerDelegate.errorLogger, + "Cannot retrieve table info for table " + tableName + " from MySQL."); + } + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, + "Cannot retrieve table info for table " + tableName + " from MySQL: " + e); + return null; + } + tables.put(tableName, ti); + } + return ti; } - - @Override - public void disableForeignKeyChecks() throws SQLException { - Statement disable = jdbcConn.createStatement(); - disable.execute("SET FOREIGN_KEY_CHECKS=0"); - disable.closeOnCompletion(); - } - - @Override - public void enableForeignKeyChecks() throws SQLException { + + // Map MySQL data type names to the java.sql.Types equivalent + private int mapDatatypeNameToType(String nm) { + switch (nm) { + case "tinyint": + return Types.TINYINT; + case "smallint": + return Types.SMALLINT; + case "mediumint": + case "int": + return Types.INTEGER; + case "bigint": + return Types.BIGINT; + case "decimal": + case "numeric": + return Types.DECIMAL; + case "float": + return Types.FLOAT; + case "double": + return Types.DOUBLE; + case "date": + case "datetime": + return Types.DATE; + case "time": + return Types.TIME; + case "timestamp": + return Types.TIMESTAMP; + case "char": + return Types.CHAR; + case "text": + case "varchar": + return Types.VARCHAR; + case "mediumblob": + case "longblob": + case "blob": + return Types.BLOB; + default: + logger.error(EELFLoggerDelegate.errorLogger, "unrecognized and/or unsupported data type " + nm); + return Types.VARCHAR; + } + } + + @Override + public void createSQLTriggers(String table) { + final String[] split = table.split("\\."); + String schemaName = (split.length == 2) ? split[0] : getSchema(); + String tableName = (split.length == 2) ? split[1] : table; + + if (getReservedTblNames().contains(tableName)) { + // Don't create triggers for the table the triggers write into!!! + return; + } + try { + // Give the triggers a way to find this MSM + for (String name : getTriggerNames(tableName)) { + logger.info(EELFLoggerDelegate.applicationLogger, "ADD trigger " + name + " to msm_map"); + // \TODO fix this is an error + // msm.register(name); + } + // No SELECT trigger + executeSQLWrite(generateTrigger(schemaName, tableName, "INSERT")); + executeSQLWrite(generateTrigger(schemaName, tableName, "UPDATE")); + // \TODO: save key row instead of the whole row for delete + executeSQLWrite(generateTrigger(schemaName, tableName, "DELETE")); + } catch (SQLException e) { + if (e.getMessage().equals("Trigger already exists") || e.getMessage().endsWith("already exists")) { + // only warn if trigger already exists + logger.warn(EELFLoggerDelegate.applicationLogger, "createSQLTriggers" + e); + } else { + logger.error(EELFLoggerDelegate.errorLogger, "createSQLTriggers: " + e); + } + } + } + + /* + * CREATE TRIGGER `triggername` BEFORE UPDATE ON `table` FOR EACH ROW BEGIN INSERT INTO `log_table` ( `field1` + * `field2`, ...) VALUES ( NEW.`field1`, NEW.`field2`, ...) ; END; + * + * OLD.field refers to the old value NEW.field refers to the new value + */ + private String generateTrigger(String schema, String tableName, String op) { + boolean isdelete = op.equals("DELETE"); + boolean isinsert = op.equals("INSERT"); + boolean isupdate = op.equals("UPDATE"); + TableInfo ti = getTableInfo(tableName); + StringBuilder newJson = new StringBuilder("JSON_OBJECT("); // JSON_OBJECT(key, val, key, val) page 1766 + StringBuilder keyJson = new StringBuilder("JSON_OBJECT("); // JSON_OBJECT(key, val, key, val) page 1766 + String pfx = ""; + String kfx = ""; + for (String col : ti.columns) { + newJson.append(pfx).append("'").append(col).append("', ").append(isdelete ? "OLD." : "NEW.").append(col); + if (ti.iskey(col) || !ti.hasKey()) { + keyJson.append(kfx).append("'").append(col).append("', ").append(isinsert ? "NEW." : "OLD.") + .append(col); + kfx = ", "; + } + pfx = ", "; + } + newJson.append(")"); + keyJson.append(")"); + // \TODO check if using mysql driver, so instead check the exception + // \TODO add conditional for update, if primary key is still the same, use null in the KEYDATA col + StringBuilder sb = new StringBuilder().append("CREATE TRIGGER ") // IF NOT EXISTS not supported by MySQL! + .append(String.format("%s_%s", op.substring(0, 1), tableName)).append(" AFTER ").append(op) + .append(" ON ").append(tableName).append(" FOR EACH ROW INSERT INTO ").append(TRANS_TBL) + .append(" (SCHEMANAME, TABLENAME, OP, KEYDATA, ROWDATA, CONNECTION_ID) VALUES('") + .append((schema == null) ? this.getSchema() : schema).append("', '").append(tableName).append("', ") + .append(isdelete ? "'D'" : (op.equals("INSERT") ? "'I'" : "'U'")).append(", ") + .append((keyJson.length() > "JSON_OBJECT()".length()) ? keyJson.toString() : "NULL").append(", ") + .append(newJson.toString()).append(", ").append("CONNECTION_ID()").append(")"); + return sb.toString(); + } + + private String[] getTriggerNames(String tableName) { + return new String[] {"I_" + tableName, // INSERT trigger + "U_" + tableName, // UPDATE trigger + "D_" + tableName // DELETE trigger + }; + } + + @Override + public void dropSQLTriggers(String tableName) { + try { + for (String name : getTriggerNames(tableName)) { + logger.info(EELFLoggerDelegate.applicationLogger, "REMOVE trigger " + name + " from msmmap"); + executeSQLWrite("DROP TRIGGER IF EXISTS " + name); + // \TODO Fix this is an error + // msm.unregister(name); + } + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "dropSQLTriggers: " + e); + } + } + + @Override + public void insertRowIntoSqlDb(String tableName, Map<String, Object> map) { + TableInfo ti = getTableInfo(tableName); + String sql = ""; + if (rowExists(tableName, ti, map)) { + // Update - Construct the what and where strings for the DB write + StringBuilder what = new StringBuilder(); + StringBuilder where = new StringBuilder(); + String pfx = ""; + String pfx2 = ""; + for (int i = 0; i < ti.columns.size(); i++) { + String col = ti.columns.get(i); + String val = Utils.getStringValue(map.get(col)); + if (ti.iskey.get(i)) { + where.append(pfx).append(col).append("=").append(val); + pfx = " AND "; + } else { + what.append(pfx2).append(col).append("=").append(val); + pfx2 = ", "; + } + } + sql = String.format("UPDATE %s SET %s WHERE %s", tableName, what.toString(), where.toString()); + } else { + // Construct the value string and column name string for the DB write + StringBuilder fields = new StringBuilder(); + StringBuilder values = new StringBuilder(); + String pfx = ""; + for (String col : ti.columns) { + fields.append(pfx).append(col); + values.append(pfx).append(Utils.getStringValue(map.get(col))); + pfx = ", "; + } + sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString()); + } + try { + executeSQLWrite(sql); + } catch (SQLException e1) { + logger.error(EELFLoggerDelegate.errorLogger, "executeSQLWrite: " + e1); + } + // TODO - remove any entries from MDBC_TRANSLOG corresponding to this update + // SELECT IX, OP, KEYDATA FROM MDBC_TRANS_TBL WHERE CONNID = "+connId AND TABLENAME = tblname + } + + private boolean rowExists(String tableName, TableInfo ti, Map<String, Object> map) { + StringBuilder where = new StringBuilder(); + String pfx = ""; + for (int i = 0; i < ti.columns.size(); i++) { + if (ti.iskey.get(i)) { + String col = ti.columns.get(i); + String val = Utils.getStringValue(map.get(col)); + where.append(pfx).append(col).append("=").append(val); + pfx = " AND "; + } + } + String sql = String.format("SELECT * FROM %s WHERE %s", tableName, where.toString()); + ResultSet rs = executeSQLRead(sql); + try { + boolean rv = rs.next(); + rs.close(); + return rv; + } catch (SQLException e) { + return false; + } + } + + + @Override + public void deleteRowFromSqlDb(String tableName, Map<String, Object> map) { + TableInfo ti = getTableInfo(tableName); + StringBuilder where = new StringBuilder(); + String pfx = ""; + for (int i = 0; i < ti.columns.size(); i++) { + if (ti.iskey.get(i)) { + String col = ti.columns.get(i); + Object val = map.get(col); + where.append(pfx).append(col).append("=").append(Utils.getStringValue(val)); + pfx = " AND "; + } + } + try { + String sql = String.format("DELETE FROM %s WHERE %s", tableName, where.toString()); + executeSQLWrite(sql); + } catch (SQLException e) { + e.printStackTrace(); + } + } + + /** + * This method executes a read query in the SQL database. Methods that call this method should be sure to call + * resultset.getStatement().close() when done in order to free up resources. + * + * @param sql the query to run + * @return a ResultSet containing the rows returned from the query + */ + @Override + public ResultSet executeSQLRead(String sql) { + logger.debug(EELFLoggerDelegate.applicationLogger, "executeSQLRead"); + logger.debug("Executing SQL read:" + sql); + ResultSet rs = null; + try { + Statement stmt = jdbcConn.createStatement(); + rs = stmt.executeQuery(sql); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "executeSQLRead" + e); + } + return rs; + } + + @Override + public void preCommitHook() { + synchronized (stagingHandlerLock) { + // \TODO check if this can potentially block forever in certain scenarios + if (stagingHandler != null) { + stagingHandler.waitForAllPendingUpdates(); + } + } + } + + /** + * This method executes a write query in the sql database. + * + * @param sql the SQL to be sent to MySQL + * @throws SQLException if an underlying JDBC method throws an exception + */ + protected void executeSQLWrite(String sql) throws SQLException { + logger.debug(EELFLoggerDelegate.applicationLogger, "Executing SQL write:" + sql); + + Statement stmt = jdbcConn.createStatement(); + stmt.execute(sql); + stmt.close(); + } + + /** + * Code to be run within the DB driver before a SQL statement is executed. This is where tables can be synchronized + * before a SELECT, for those databases that do not support SELECT triggers. + * + * @param sql the SQL statement that is about to be executed + * @return list of keys that will be updated, if they can't be determined afterwards (i.e. sql table doesn't have + * primary key) + */ + @Override + public void preStatementHook(final String sql) { + if (sql == null) { + return; + } + String cmd = sql.trim().toLowerCase(); + if (cmd.startsWith("select")) { + String[] parts = sql.trim().split(" "); + } + } + + /** + * Code to be run within the DB driver after a SQL statement has been executed. This is where remote statement + * actions can be copied back to Cassandra/MUSIC. + * + * @param sql the SQL statement that was executed + */ + @Override + public void postStatementHook(final String sql, StagingTable transactionDigest) { + if (sql != null) { + String[] parts = sql.trim().split(" "); + String cmd = parts[0].toLowerCase(); + if ("delete".equals(cmd) || "insert".equals(cmd) || "update".equals(cmd)) { + if (useAsyncStagingUpdate) { + synchronized (stagingHandlerLock) { + if (stagingHandler == null || currentStaging != transactionDigest) { + Runnable newRunnable = new StagingTableUpdateRunnable(this, transactionDigest); + currentStaging = transactionDigest; + stagingHandler = new AsyncUpdateHandler(newRunnable); + } + // else we can keep using the current staging Handler + } + stagingHandler.processNewUpdate(); + } else { + + try { + this.updateStagingTable(transactionDigest); + } catch (NoSuchFieldException | MDBCServiceException e) { + // TODO Auto-generated catch block + this.logger.error("Error updating the staging table"); + } + } + } + } + } + + private SQLOperation toOpEnum(String operation) throws NoSuchFieldException { + switch (operation.toLowerCase()) { + case "i": + return SQLOperation.INSERT; + case "d": + return SQLOperation.DELETE; + case "u": + return SQLOperation.UPDATE; + case "s": + return SQLOperation.SELECT; + default: + logger.error(EELFLoggerDelegate.errorLogger, "Invalid operation selected: [" + operation + "]"); + throw new NoSuchFieldException("Invalid operation enum"); + } + + } + + /** + * Copy data that is in transaction table into music interface + * + * @param transactionDigests + * @throws NoSuchFieldException + */ + private void updateStagingTable(StagingTable transactionDigests) throws NoSuchFieldException, MDBCServiceException { + // copy from DB.MDBC_TRANSLOG where connid == myconnid + // then delete from MDBC_TRANSLOG + String sql2 = "SELECT IX, SCHEMANAME, TABLENAME, OP, ROWDATA, KEYDATA FROM " + TRANS_TBL + + " WHERE CONNECTION_ID = " + this.connId; + Integer biggestIx = Integer.MIN_VALUE; + Integer smallestIx = Integer.MAX_VALUE; + try { + ResultSet rs = executeSQLRead(sql2); + Set<Integer> rows = new TreeSet<Integer>(); + while (rs.next()) { + int ix = rs.getInt("IX"); + biggestIx = Integer.max(biggestIx, ix); + smallestIx = Integer.min(smallestIx, ix); + String op = rs.getString("OP"); + SQLOperation opType = toOpEnum(op); + String schema = rs.getString("SCHEMANAME"); + String tbl = rs.getString("TABLENAME"); + String newRowStr = rs.getString("ROWDATA"); + String rowStr = rs.getString("KEYDATA"); + Range range = new Range(schema + "." + tbl); + transactionDigests.addOperation(range, opType, newRowStr, rowStr); + rows.add(ix); + } + rs.getStatement().close(); + // batch delete operations + if (rows.size() > 0) { + this.deleteStagingStatement.setInt(1, smallestIx); + this.deleteStagingStatement.setInt(2, biggestIx); + this.deleteStagingStatement.setInt(3, this.connId); + logger.debug("Staging delete: Executing with vals [" + smallestIx + "," + biggestIx + "," + this.connId + + "]"); + this.deleteStagingStatement.execute(); + } + } catch (SQLException e) { + logger.warn("Exception in postStatementHook: " + e); + e.printStackTrace(); + } + } + + + /** + * Update music with data from MySQL table + * + * @param tableName - name of table to update in music + */ + @Override + public void synchronizeData(String tableName) { + ResultSet rs = null; + TableInfo ti = getTableInfo(tableName); + String query = "SELECT * FROM " + tableName; + + try { + rs = executeSQLRead(query); + if (rs == null) + return; + while (rs.next()) { + + JSONObject jo = new JSONObject(); + if (!getTableInfo(tableName).hasKey()) { + String musicKey = MDBCUtils.generateUniqueKey().toString(); + jo.put(mi.getMusicDefaultPrimaryKeyName(), musicKey); + } + + for (String col : ti.columns) { + jo.put(col, rs.getString(col)); + } + + @SuppressWarnings("unused") + Object[] row = Utils.jsonToRow(ti, tableName, jo, mi.getMusicDefaultPrimaryKeyName()); + // \FIXME this is wrong now, update of the dirty row and entity is now handled by the archival process + // msm.updateDirtyRowAndEntityTableInMusic(ti,tableName, jo); + } + } catch (Exception e) { + logger.error(EELFLoggerDelegate.errorLogger, "synchronizing data " + tableName + " -> " + e.getMessage()); + } finally { + try { + if (rs != null) { + rs.close(); + } + } catch (SQLException e) { + // continue + } + } + + } + + /** + * Return a list of "reserved" names, that should not be used by MySQL client/MUSIC These are reserved for mdbc + * Returned names are in all UPPERCASE + */ + @Override + public List<String> getReservedTblNames() { + ArrayList<String> rsvdTables = new ArrayList<String>(); + rsvdTables.add(dbName.toUpperCase() + "." + TRANS_TBL); + rsvdTables.add(dbName.toUpperCase() + "." + CKPT_TBL); + // Add others here as necessary + return rsvdTables; + } + + @Override + public String getPrimaryKey(String sql, String tableName) { + // + return null; + } + + + public String applyDigest(Map<Range, StagingTable> digest) { + throw new NotImplementedException(); + } + + @SuppressWarnings("unused") + @Deprecated + private ArrayList<String> getMusicKey(String sql) { + try { + net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql); + if (stmt instanceof Insert) { + Insert s = (Insert) stmt; + String tbl = s.getTable().getName(); + return getMusicKey(tbl, "INSERT", sql); + } else if (stmt instanceof Update) { + Update u = (Update) stmt; + String tbl = u.getTables().get(0).getName(); + return getMusicKey(tbl, "UPDATE", sql); + } else if (stmt instanceof Delete) { + Delete d = (Delete) stmt; + // TODO: IMPLEMENT + String tbl = d.getTable().getName(); + return getMusicKey(tbl, "DELETE", sql); + } else { + System.err.println("Not recognized sql type"); + } + + } catch (JSQLParserException e) { + + e.printStackTrace(); + } + // Something went wrong here + return new ArrayList<String>(); + } + + /** + * Returns all keys that matches the current sql statement, and not in already updated keys. + * + * @param tbl + * @param cmd + * @param sql + */ + @Deprecated + private ArrayList<String> getMusicKey(String tbl, String cmd, String sql) { + ArrayList<String> musicKeys = new ArrayList<String>(); + /* + * if (cmd.equalsIgnoreCase("insert")) { //create key, return key musicKeys.add(msm.generatePrimaryKey()); } + * else if (cmd.equalsIgnoreCase("update") || cmd.equalsIgnoreCase("delete")) { try { + * net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql); String where; if (stmt instanceof + * Update) { where = ((Update) stmt).getWhere().toString(); } else if (stmt instanceof Delete) { where = + * ((Delete) stmt).getWhere().toString(); } else { System.err.println("Unknown type: " +stmt.getClass()); where + * = ""; } ResultSet rs = executeSQLRead("SELECT * FROM " + tbl + " WHERE " + where); musicKeys = + * msm.getMusicKeysWhere(tbl, Utils.parseResults(getTableInfo(tbl), rs)); } catch (JSQLParserException e) { + * + * e.printStackTrace(); } catch (SQLException e) { //Not a valid sql query e.printStackTrace(); } } + */ + return musicKeys; + } + + + @Deprecated + public void insertRowIntoSqlDbOLD(String tableName, Map<String, Object> map) { + // First construct the value string and column name string for the db write + TableInfo ti = getTableInfo(tableName); + StringBuilder fields = new StringBuilder(); + StringBuilder values = new StringBuilder(); + String pfx = ""; + for (String col : ti.columns) { + fields.append(pfx).append(col); + values.append(pfx).append(Utils.getStringValue(map.get(col))); + pfx = ", "; + } + + try { + String sql = + String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString()); + executeSQLWrite(sql); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Insert failed because row exists, do an update"); + StringBuilder where = new StringBuilder(); + pfx = ""; + String pfx2 = ""; + fields.setLength(0); + for (int i = 0; i < ti.columns.size(); i++) { + String col = ti.columns.get(i); + String val = Utils.getStringValue(map.get(col)); + if (ti.iskey.get(i)) { + where.append(pfx).append(col).append("=").append(val); + pfx = " AND "; + } else { + fields.append(pfx2).append(col).append("=").append(val); + pfx2 = ", "; + } + } + String sql = String.format("UPDATE %s SET %s WHERE %s", tableName, fields.toString(), where.toString()); + try { + executeSQLWrite(sql); + } catch (SQLException e1) { + logger.error(EELFLoggerDelegate.errorLogger, "executeSQLWrite" + e1); + } + } + } + + /** + * Parse the transaction digest into individual events + * + * @param transaction - base 64 encoded, serialized digest + * @throws MDBCServiceException + */ + public void replayTransaction(StagingTable transaction, Set<Range> ranges) + throws SQLException, MDBCServiceException { + boolean autocommit = jdbcConn.getAutoCommit(); + jdbcConn.setAutoCommit(false); + Statement jdbcStmt = jdbcConn.createStatement(); + ArrayList<Operation> opList = transaction.getOperationList(); + + for (Operation op : opList) { + if (Range.overlaps(ranges, op.getTable())) { + try { + replayOperationIntoDB(jdbcStmt, op); + } catch (SQLException | MDBCServiceException e) { + // rollback transaction + logger.error("Unable to replay: " + op.getOperationType() + "->" + op.getVal() + "." + + "Rolling back the entire digest replay."); + jdbcConn.rollback(); + throw e; + } + } + } + + clearReplayedOperations(jdbcStmt); + jdbcConn.commit(); + jdbcStmt.close(); + + jdbcConn.setAutoCommit(autocommit); + } + + @Override + public void disableForeignKeyChecks() throws SQLException { + Statement disable = jdbcConn.createStatement(); + disable.execute("SET FOREIGN_KEY_CHECKS=0"); + disable.closeOnCompletion(); + } + + @Override + public void enableForeignKeyChecks() throws SQLException { Statement enable = jdbcConn.createStatement(); - enable.execute("SET FOREIGN_KEY_CHECKS=1"); - enable.closeOnCompletion(); - } + enable.execute("SET FOREIGN_KEY_CHECKS=1"); + enable.closeOnCompletion(); + } - @Override - public void applyTxDigest(StagingTable txDigest,List<Range> ranges) throws SQLException, MDBCServiceException { - replayTransaction(txDigest,ranges); - } + @Override + public void applyTxDigest(StagingTable txDigest, Set<Range> ranges) throws SQLException, MDBCServiceException { + replayTransaction(txDigest, ranges); + } - /** + /** * Replays operation into database, usually from txDigest + * * @param jdbcStmt * @param r * @param op - * @throws SQLException - * @throws MDBCServiceException + * @throws SQLException + * @throws MDBCServiceException */ private void replayOperationIntoDB(Statement jdbcStmt, Operation op) throws SQLException, MDBCServiceException { logger.info("Replaying Operation: " + op.getOperationType() + "->" + op.getVal()); JSONObject jsonOp = op.getVal(); - + ArrayList<String> cols = new ArrayList<String>(); ArrayList<Object> vals = new ArrayList<Object>(); Iterator<String> colIterator = jsonOp.keys(); - while(colIterator.hasNext()) { + while (colIterator.hasNext()) { String col = colIterator.next(); - //FIXME: should not explicitly refer to cassandramixin + // FIXME: should not explicitly refer to cassandramixin if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) { - //reserved name + // reserved name continue; } cols.add(col); vals.add(jsonOp.get(col)); } - - //build and replay the queries + + // build and replay the queries StringBuilder sql = constructSQL(op, cols, vals); - if(sql == null) + if (sql == null) return; - + try { logger.info("Replaying operation: " + sql.toString()); int updated = jdbcStmt.executeUpdate(sql.toString()); - - if(updated == 0) { + + if (updated == 0) { // This applies only for replaying transactions involving Eventually Consistent tables - logger.warn("Error Replaying operation: " + sql.toString() + "; Replacing insert/replace/viceversa and replaying "); - + logger.warn("Error Replaying operation: " + sql.toString() + + "; Replacing insert/replace/viceversa and replaying "); + buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals); } } catch (SQLException sqlE) { @@ -994,24 +972,23 @@ NEW.field refers to the new value // or transactions that replay on top of existing keys logger.warn("Error Replaying operation: " + sql.toString() + ";" + "Replacing insert/replace/viceversa and replaying "); - - buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals); - + + buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals); + } } - - protected void buildAndExecuteSQLInverse(Statement jdbcStmt, Operation op, - ArrayList<String> cols, ArrayList<Object> vals) throws SQLException, MDBCServiceException { + + protected void buildAndExecuteSQLInverse(Statement jdbcStmt, Operation op, ArrayList<String> cols, + ArrayList<Object> vals) throws SQLException, MDBCServiceException { StringBuilder sqlInverse = constructSQLInverse(op, cols, vals); - if(sqlInverse == null) + if (sqlInverse == null) return; - logger.info("Replaying operation: " + sqlInverse.toString()); + logger.info("Replaying operation: " + sqlInverse.toString()); jdbcStmt.executeUpdate(sqlInverse.toString()); } - + /** - * Construct an update statement from an insert, or - * construct an insert statement from an update + * Construct an update statement from an insert, or construct an insert statement from an update * * useful when replaying logic, if the primary key value is already present/not present * @@ -1022,120 +999,167 @@ NEW.field refers to the new value * @throws MDBCServiceException */ - protected StringBuilder constructSQLInverse(Operation op, ArrayList<String> cols, - ArrayList<Object> vals) throws MDBCServiceException { + protected StringBuilder constructSQLInverse(Operation op, ArrayList<String> cols, ArrayList<Object> vals) + throws MDBCServiceException { StringBuilder sqlInverse = null; switch (op.getOperationType()) { case INSERT: - sqlInverse = constructUpdate(op.getTable() , SQLOperation.UPDATE, op.getKey(), cols, vals); + sqlInverse = constructUpdate(op.getTable(), SQLOperation.UPDATE, op.getKey(), cols, vals); break; case UPDATE: - sqlInverse = constructInsert(op.getTable() , SQLOperation.INSERT, cols, vals); + sqlInverse = constructInsert(op.getTable(), SQLOperation.INSERT, cols, vals); break; default: break; } return sqlInverse; } - protected StringBuilder constructSQL(Operation op, ArrayList<String> cols, - ArrayList<Object> vals) throws MDBCServiceException { + + protected StringBuilder constructSQL(Operation op, ArrayList<String> cols, ArrayList<Object> vals) + throws MDBCServiceException { StringBuilder sql = null; switch (op.getOperationType()) { - case INSERT: - sql = constructInsert(op.getTable(), op.getOperationType(), cols, vals); - break; - case UPDATE: - sql = constructUpdate(op.getTable(), op.getOperationType(), op.getKey(), cols, vals); - break; - case DELETE: - sql = constructDelete(op.getTable(), op.getOperationType(), op.getKey()); - break; - case SELECT: - //no update happened, do nothing - break; - default: - logger.error(op.getOperationType() + "not implemented for replay"); + case INSERT: + sql = constructInsert(op.getTable(), op.getOperationType(), cols, vals); + break; + case UPDATE: + sql = constructUpdate(op.getTable(), op.getOperationType(), op.getKey(), cols, vals); + break; + case DELETE: + sql = constructDelete(op.getTable(), op.getOperationType(), op.getKey()); + break; + case SELECT: + // no update happened, do nothing + break; + default: + logger.error(op.getOperationType() + "not implemented for replay"); } return sql; } + private StringBuilder constructDelete(String r, SQLOperation op, JSONObject key) { StringBuilder sql = new StringBuilder(); sql.append(op + " FROM "); - sql.append(r + " WHERE "); + sql.append(r + " WHERE "); sql.append(getPrimaryKeyConditional(key)); sql.append(";"); return sql; } - private StringBuilder constructInsert(String r, SQLOperation op, ArrayList<String> cols, - ArrayList<Object> vals) { + + private StringBuilder constructInsert(String r, SQLOperation op, ArrayList<String> cols, ArrayList<Object> vals) { StringBuilder sql = new StringBuilder(); String sep; sql.append(op + " INTO "); - sql.append(r + " (") ; + sql.append(r + " ("); sep = ""; - for (String col: cols) { + for (String col : cols) { sql.append(sep + col); sep = ", "; - } + } sql.append(") VALUES ("); sep = ""; - for (Object val: vals) { + for (Object val : vals) { sql.append(sep + "\"" + val + "\""); sep = ", "; } sql.append(");"); return sql; } + private StringBuilder constructUpdate(String r, SQLOperation op, JSONObject key, ArrayList<String> cols, ArrayList<Object> vals) { StringBuilder sql = new StringBuilder(); String sep; sql.append(op + " "); sql.append(r + " SET "); - sep=""; - for (int i=0; i<cols.size(); i++) { - sql.append(sep + cols.get(i) + "=\"" + vals.get(i) +"\""); + sep = ""; + for (int i = 0; i < cols.size(); i++) { + sql.append(sep + cols.get(i) + "=\"" + vals.get(i) + "\""); sep = ", "; } sql.append(" WHERE "); sql.append(getPrimaryKeyConditional(key)); sql.append(";"); - + return sql; } - - /** - * Create an SQL string for AND'ing all of the primary keys - * @param primaryKeys Json of primary keys and their values - * @return string in the form of PK1=Val1 AND PK2=Val2 AND PK3=Val3 - */ + + /** + * Create an SQL string for AND'ing all of the primary keys + * + * @param primaryKeys Json of primary keys and their values + * @return string in the form of PK1=Val1 AND PK2=Val2 AND PK3=Val3 + */ private String getPrimaryKeyConditional(JSONObject primaryKeys) { - StringBuilder keyCondStmt = new StringBuilder(); - String and = ""; - for (String key: primaryKeys.keySet()) { - // We cannot use the default primary key for the sql table and operations - if(!key.equals(mi.getMusicDefaultPrimaryKeyName())) { + StringBuilder keyCondStmt = new StringBuilder(); + String and = ""; + for (String key : primaryKeys.keySet()) { + // We cannot use the default primary key for the sql table and operations + if (!key.equals(mi.getMusicDefaultPrimaryKeyName())) { Object val = primaryKeys.get(key); keyCondStmt.append(and + key + "=\"" + val + "\""); and = " AND "; } - } - return keyCondStmt.toString(); - } - - /** - * Cleans out the transaction table, removing the replayed operations - * @param jdbcStmt - * @throws SQLException - */ - private void clearReplayedOperations(Statement jdbcStmt) throws SQLException { - logger.info("Clearing replayed operations"); - String sql = "DELETE FROM " + TRANS_TBL + " WHERE CONNECTION_ID = " + this.connId; - jdbcStmt.executeUpdate(sql); - } - - @Override - public Connection getSQLConnection(){ - return jdbcConn; - } + } + return keyCondStmt.toString(); + } + + /** + * Cleans out the transaction table, removing the replayed operations + * + * @param jdbcStmt + * @throws SQLException + */ + private void clearReplayedOperations(Statement jdbcStmt) throws SQLException { + logger.info("Clearing replayed operations"); + String sql = "DELETE FROM " + TRANS_TBL + " WHERE CONNECTION_ID = " + this.connId; + jdbcStmt.executeUpdate(sql); + } + + @Override + public Connection getSQLConnection() { + return jdbcConn; + } + + @Override + public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) { + String query = "UPDATE " + CKPT_TBL + " SET MRIROW=?, DIGESTINDEX=? where RANGENAME=?;"; + try { + PreparedStatement stmt = jdbcConn.prepareStatement(query); + stmt.setString(1, playbackPointer.getLeft().toString()); + stmt.setInt(2, playbackPointer.getRight()); + stmt.setString(3, r.getTable()); + stmt.execute(); + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Unable to update replay checkpoint location"); + } + } + + @Override + public void initTables() { + try { + Statement stmt = jdbcConn.createStatement(); + stmt.execute(CREATE_TBL_SQL); + stmt.execute(CREATE_CKPT_SQL); + stmt.close(); + + //prepare checkpoint table + String query = "INSERT INTO " + CKPT_TBL + " (RANGENAME) VALUES (?);"; + for (Range range: getSQLRangeSet()) { + if (getReservedTblNames().contains(range.getTable().toUpperCase())) { + continue; + } + PreparedStatement prepstmt = jdbcConn.prepareStatement(query); + prepstmt.setString(1, range.getTable()); + prepstmt.execute(); + prepstmt.close(); + } + + logger.info(EELFLoggerDelegate.applicationLogger, "initTables: Server side dirty table created."); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "initTables: problem creating th mdbc tables!"); + } + } + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java index 0f66731..4afaa71 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java @@ -32,6 +32,8 @@ import net.sf.jsqlparser.parser.CCJSqlParserUtil; import net.sf.jsqlparser.statement.delete.Delete; import net.sf.jsqlparser.statement.insert.Insert; import net.sf.jsqlparser.statement.update.Update; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.zookeeper.KeeperException.UnimplementedException; import org.json.JSONObject; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; @@ -814,7 +816,8 @@ public class PostgresMixin implements DBInterface { * * @param transaction - base 64 encoded, serialized digest */ - public void replayTransaction(StagingTable transaction, List<Range> ranges) + @Override + public void replayTransaction(StagingTable transaction, Set<Range> ranges) throws SQLException, MDBCServiceException { boolean autocommit = jdbcConn.getAutoCommit(); jdbcConn.setAutoCommit(false); @@ -856,7 +859,7 @@ public class PostgresMixin implements DBInterface { } @Override - public void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException { + public void applyTxDigest(StagingTable txDigest, Set<Range> ranges) throws SQLException, MDBCServiceException { replayTransaction(txDigest, ranges); } @@ -1063,4 +1066,14 @@ public class PostgresMixin implements DBInterface { return set; } + @Override + public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) { + throw new org.apache.commons.lang.NotImplementedException(); + } + + @Override + public void initTables() { + throw new org.apache.commons.lang.NotImplementedException(); + } + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java index 07a5fe6..9d1685c 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java @@ -62,7 +62,7 @@ public class Dag { rowsPerLatestRange = null; } - private void createDag(List<MusicRangeInformationRow> rows, List<Range> ranges){ + private void createDag(List<MusicRangeInformationRow> rows, Set<Range> ranges){ this.ranges = new ArrayList<>(ranges); Map<Range,DagNode> latestRow = new HashMap<>(); //sort to make sure rows are in chronological order @@ -72,7 +72,7 @@ public class Dag { DagNode node = new DagNode(row); nodes.put(row.getPartitionIndex(),node); for(Range range : ranges){ - List<Range> nodeRanges = row.getDBPartition().getSnapshot(); + Set<Range> nodeRanges = row.getDBPartition().getSnapshot(); for(Range nRange : nodeRanges){ if(nRange.overlaps(range)){ if(latestRow.containsKey(range)){ @@ -88,7 +88,7 @@ public class Dag { } } - public static Dag getDag(List<MusicRangeInformationRow> rows, List<Range> ranges){ + public static Dag getDag(List<MusicRangeInformationRow> rows, Set<Range> ranges){ Dag newDag = new Dag(true); newDag.createDag(rows,ranges); return newDag; @@ -116,7 +116,7 @@ public class Dag { }); } - public DagNode getNode(UUID rowId) throws MDBCServiceException { + public DagNode getNode(UUID rowId) { if(!nodes.containsKey(rowId)){ return null; } @@ -141,7 +141,7 @@ public class Dag { return nextNode; } - public synchronized DagNode nextToApply(List<Range> ranges){ + public synchronized DagNode nextToApply(Set<Range> ranges){ if(!readyInit){ initApplyDatastructures(); } @@ -283,7 +283,7 @@ public class Dag { } } - public void addNewNodeWithSearch(MusicRangeInformationRow row, List<Range> ranges) throws MDBCServiceException { + public void addNewNodeWithSearch(MusicRangeInformationRow row, Set<Range> ranges) throws MDBCServiceException { Map<Range,DagNode> newestNode = new HashMap<>(); for(DagNode node : nodes.values()){ for(Range range : ranges) { @@ -304,6 +304,10 @@ public class Dag { addNewNode(row,dependencies); } + /** + * + * @return All ranges in every node of the DAG + */ public Set<Range> getAllRanges(){ Set<Range> ranges = new HashSet<>(); for(DagNode node : nodes.values()){ @@ -385,8 +389,8 @@ public class Dag { return toDisable; } - public Pair<List<Range>,Set<DagNode>> getIncompleteRangesAndDependents() throws MDBCServiceException { - List<Range> incomplete = new ArrayList<>(); + public Pair<Set<Range>, Set<DagNode>> getIncompleteRangesAndDependents() throws MDBCServiceException { + Set<Range> incomplete = new HashSet<>(); Set<DagNode> dependents = new HashSet<>(); Map<Range,Set<DagNode>> rowsPerLatestRange = getIsLatestPerRange(); List<DagNode> toDisable = getOldestDoubleRows(rowsPerLatestRange); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java index e737b26..78c68e1 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java @@ -61,7 +61,9 @@ public class DagNode { startIndex = new HashMap<>(); } - public MusicRangeInformationRow getRow() { return row;} + public MusicRangeInformationRow getRow() { + return row; + } public synchronized void setOwned(){ owned = true; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java index 854eb5f..b848964 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java @@ -25,7 +25,6 @@ import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.apache.commons.lang.NotImplementedException; import org.apache.commons.lang3.tuple.Pair; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; @@ -36,7 +35,6 @@ import org.onap.music.mdbc.mixins.LockRequest; import org.onap.music.mdbc.mixins.LockResult; import org.onap.music.mdbc.mixins.MusicInterface; import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn; -import org.onap.music.mdbc.mixins.MusicMixin; import org.onap.music.mdbc.query.SQLOperationType; import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MusicRangeInformationRow; @@ -90,14 +88,14 @@ public class OwnershipAndCheckpoint{ } - private List<MusicRangeInformationRow> extractRowsForRange(List<MusicRangeInformationRow> allMriRows, List<Range> ranges, + private List<MusicRangeInformationRow> extractRowsForRange(List<MusicRangeInformationRow> allMriRows, Set<Range> ranges, boolean onlyIsLatest){ List<MusicRangeInformationRow> rows = new ArrayList<>(); for(MusicRangeInformationRow row : allMriRows){ if(onlyIsLatest && !row.getIsLatest()){ continue; } - final List<Range> rowRanges = row.getDBPartition().getSnapshot(); + final Set<Range> rowRanges = row.getDBPartition().getSnapshot(); boolean found = false; for(Range sRange : ranges){ for(Range rRange: rowRanges) { @@ -120,7 +118,7 @@ public class OwnershipAndCheckpoint{ * @param onlyIsLatest - only return the "latest" rows * @return */ - private List<MusicRangeInformationRow> extractRowsForRange(MusicInterface music, List<Range> ranges, boolean onlyIsLatest) + private List<MusicRangeInformationRow> extractRowsForRange(MusicInterface music, Set<Range> ranges, boolean onlyIsLatest) throws MDBCServiceException { final List<MusicRangeInformationRow> allMriRows = music.getAllMriRows(); return extractRowsForRange(allMriRows,ranges,onlyIsLatest); @@ -136,7 +134,7 @@ public class OwnershipAndCheckpoint{ * @param ownOpId * @throws MDBCServiceException */ - public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, List<Range> ranges, + public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, Set<Range> ranges, Map<MusicRangeInformationRow, LockResult> locks, UUID ownOpId) throws MDBCServiceException { if(ranges.isEmpty()){ return; @@ -173,7 +171,7 @@ public class OwnershipAndCheckpoint{ } } - private void applyTxDigest(List<Range> ranges, DBInterface di, StagingTable txDigest) + private void applyTxDigest(Set<Range> ranges, DBInterface di, StagingTable txDigest) throws MDBCServiceException { try { di.applyTxDigest(txDigest,ranges); @@ -189,7 +187,7 @@ public class OwnershipAndCheckpoint{ * @param rangesToWarmup * @throws MDBCServiceException */ - public void warmup(MusicInterface mi, DBInterface di, List<Range> rangesToWarmup) throws MDBCServiceException { + public void warmup(MusicInterface mi, DBInterface di, Set<Range> rangesToWarmup) throws MDBCServiceException { if(rangesToWarmup.isEmpty()){ return; } @@ -229,7 +227,16 @@ public class OwnershipAndCheckpoint{ } } - private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface di, List<Range> ranges, DagNode node, + /** + * Apply tx digest for ranges, update checkpoint location (alreadyApplied) + * @param mi + * @param di + * @param ranges + * @param node + * @param pair + * @throws MDBCServiceException + */ + private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface di, Set<Range> ranges, DagNode node, Pair<MusicTxDigestId, List<Range>> pair) throws MDBCServiceException { final StagingTable txDigest; try { @@ -244,10 +251,34 @@ public class OwnershipAndCheckpoint{ for (Range r : pair.getValue()) { MusicRangeInformationRow row = node.getRow(); alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index)); + + updateCheckpointLocations(mi, di, r, row.getPartitionIndex(), pair.getKey().index); } } - private void applyRequiredChanges(MusicInterface mi, DBInterface db, Dag extendedDag, List<Range> ranges, UUID ownOpId) + /** + * Update external checkpoint markers in sql db and music + * @param mi + * @param di + * @param r + * @param partitionIndex + * @param index + */ + private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, int index) { + dbi.updateCheckpointLocations(r, Pair.of(partitionIndex, index)); + mi.updateCheckpointLocations(r, Pair.of(partitionIndex, index)); + } + + /** + * Forceably apply changes in tx digest for ranges + * @param mi + * @param db + * @param extendedDag + * @param ranges + * @param ownOpId + * @throws MDBCServiceException + */ + private void applyRequiredChanges(MusicInterface mi, DBInterface db, Dag extendedDag, Set<Range> ranges, UUID ownOpId) throws MDBCServiceException { Set<Range> rangeSet = new HashSet<Range>(ranges); disableForeignKeys(db); @@ -278,7 +309,7 @@ public class OwnershipAndCheckpoint{ * @return an object indicating the status of the own function result * @throws MDBCServiceException */ - public OwnershipReturn own(MusicInterface mi, List<Range> ranges, + public OwnershipReturn own(MusicInterface mi, Set<Range> ranges, DatabasePartition currPartition, UUID opId, SQLOperationType lockType) throws MDBCServiceException { if (ranges == null || ranges.isEmpty()) { @@ -292,15 +323,15 @@ public class OwnershipAndCheckpoint{ currPartition.getSnapshot(),null); } //Find - Map<UUID,LockResult> newLocks = new HashMap<>(); - List<Range> rangesToOwn = mi.getRangeDependencies(ranges); + Map<UUID,LockResult> locksForOwnership = new HashMap<>(); + Set<Range> rangesToOwn = mi.getRangeDependencies(ranges); List<MusicRangeInformationRow> rangesToOwnRows = extractRowsForRange(mi,rangesToOwn, false); Dag toOwn = Dag.getDag(rangesToOwnRows,rangesToOwn); Dag currentlyOwn = new Dag(); while ( (toOwn.isDifferent(currentlyOwn) || !currentlyOwn.isOwned() ) && !timeout(opId) ) { - takeOwnershipOfDag(mi, currPartition, opId, newLocks, toOwn, lockType); + takeOwnershipOfDag(mi, currPartition, opId, locksForOwnership, toOwn, lockType); currentlyOwn=toOwn; //TODO instead of comparing dags, compare rows rangesToOwnRows = extractRowsForRange(mi, rangesToOwn, false); @@ -308,29 +339,30 @@ public class OwnershipAndCheckpoint{ } if (!currentlyOwn.isOwned() || toOwn.isDifferent(currentlyOwn)) { //hold on to previous partition - newLocks.remove(currPartition.getMRIIndex()); - mi.releaseLocks(newLocks); + locksForOwnership.remove(currPartition.getMRIIndex()); + mi.releaseLocks(locksForOwnership); stopOwnershipTimeoutClock(opId); logger.error("Error when owning a range: Timeout"); throw new MDBCServiceException("Ownership timeout"); } Set<Range> allRanges = currentlyOwn.getAllRanges(); - List<MusicRangeInformationRow> latestRows = extractRowsForRange(mi, new ArrayList<>(allRanges), true); + //TODO: we shouldn't need to go back to music at this point + List<MusicRangeInformationRow> latestRows = extractRowsForRange(mi, new HashSet<>(allRanges), true); currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,latestRows)); - return mi.mergeLatestRowsIfNecessary(currentlyOwn,latestRows,ranges,newLocks,opId); + return mi.mergeLatestRowsIfNecessary(currentlyOwn,locksForOwnership,opId); } - + /** * Step through dag and take lock ownership of each range * @param partition current partition owned by system * @param opId - * @param newLocks + * @param ownershipLocks * @param toOwn * @param lockType * @throws MDBCServiceException */ private void takeOwnershipOfDag(MusicInterface mi, DatabasePartition partition, UUID opId, - Map<UUID, LockResult> newLocks, Dag toOwn, SQLOperationType lockType) throws MDBCServiceException { + Map<UUID, LockResult> ownershipLocks, Dag toOwn, SQLOperationType lockType) throws MDBCServiceException { while(toOwn.hasNextToOwn()){ DagNode node = toOwn.nextToOwn(); @@ -338,9 +370,9 @@ public class OwnershipAndCheckpoint{ UUID uuidToOwn = row.getPartitionIndex(); if (partition.isLocked() && partition.getMRIIndex().equals(uuidToOwn) ) { toOwn.setOwn(node); - newLocks.put(uuidToOwn, new LockResult(true, uuidToOwn, partition.getLockId(), + ownershipLocks.put(uuidToOwn, new LockResult(true, uuidToOwn, partition.getLockId(), false, partition.getSnapshot())); - } else if ( newLocks.containsKey(uuidToOwn) || !row.getIsLatest() ) { + } else if ( ownershipLocks.containsKey(uuidToOwn) || !row.getIsLatest() ) { toOwn.setOwn(node); } else { LockRequest request = new LockRequest(uuidToOwn, @@ -370,7 +402,7 @@ public class OwnershipAndCheckpoint{ // TODO look into updating the partition object with the latest lockId; if(owned){ toOwn.setOwn(node); - newLocks.put(uuidToOwn,result); + ownershipLocks.put(uuidToOwn,result); } else{ mi.relinquish(lockId,uuidToOwn.toString()); @@ -380,9 +412,54 @@ public class OwnershipAndCheckpoint{ } } + public String getDebugInfo(MusicInterface mi, String rangesStr) { + + Set<Range> ranges = new HashSet<>(); + Arrays.stream(rangesStr.split(",")).forEach(a -> ranges.add(new Range(a))); + + StringBuffer buffer = new StringBuffer(); + Set<Range> rangesToOwn; + try { + rangesToOwn = mi.getRangeDependencies(ranges); + List<MusicRangeInformationRow> rangesToOwnRows = extractRowsForRange(mi,rangesToOwn, false); + Dag toOwn = Dag.getDag(rangesToOwnRows,rangesToOwn); + while(toOwn.hasNextToOwn()){ + DagNode node = null; + try { + node = toOwn.nextToOwn(); + MusicRangeInformationRow row = node.getRow(); + + buffer.append("\n-------------\n"); + buffer.append(row.getDBPartition()).append(","); + buffer.append(row.getPrevRowIndexes()).append(","); + buffer.append(row.getIsLatest()).append(""); + + + } catch (Exception e) { + buffer.append("\n------missing MRI------\n"); + } finally { + + if(node != null) { + toOwn.setOwn(node); + } + + } + + } + + + } catch (MDBCServiceException e) { + buffer.setLength(0); + buffer.append(" Debugging info could not be determined"); + } + + return buffer.toString(); + + } + public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException { - List<Range> snapshot = partition.getSnapshot(); + Set<Range> snapshot = partition.getSnapshot(); UUID row = partition.getMRIIndex(); for(Range r : snapshot){ alreadyApplied.put(r,Pair.of(new MriReference(row),-1)); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java b/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java index a31a2a0..6d6c661 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java @@ -35,6 +35,7 @@ import org.apache.calcite.sql.SqlInsert; import org.apache.calcite.sql.SqlJoin; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOrderBy; import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.sql.SqlUpdate; import org.apache.calcite.sql.fun.SqlInOperator; @@ -122,6 +123,9 @@ public class QueryProcessor { case SELECT: parseSelect((SqlSelect) sqlNode, tableOpsMap); break; + case ORDER_BY: + parseSelect((SqlSelect)((SqlOrderBy) sqlNode).query, tableOpsMap); + break; default: logger.error("Unhandled sql query type " + sqlNode.getKind() +" for query " + query); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java index 6e6ade6..de711ef 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java @@ -19,6 +19,7 @@ */ package org.onap.music.mdbc.tables; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -49,8 +50,15 @@ public final class MusicRangeInformationRow implements Comparable<MusicRangeInfo this.prevRowIndexes = prevPartitions; } + public MusicRangeInformationRow(DatabasePartition dbPartition, boolean isLatest, Set<UUID> prevPartitions) { + this.dbPartition = dbPartition; + this.redoLog = new ArrayList<MusicTxDigestId>(); + this.isLatest = isLatest; + this.prevRowIndexes = prevPartitions; + } + public UUID getPartitionIndex() { - return dbPartition.getMRIIndex(); + return dbPartition.getMRIIndex(); } public boolean getIsLatest(){ return isLatest; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java index 5c6fae4..6f95d3c 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java @@ -51,7 +51,7 @@ public class MusicTxDigestDaemon implements Runnable { * @param dbi interface to the database that will replay the operations * @param ranges only these ranges will be applied from the digests */ - public void replayDigest(MusicInterface mi, DBInterface dbi, List<Range> ranges) throws MDBCServiceException { + public void replayDigest(MusicInterface mi, DBInterface dbi, Set<Range> ranges) throws MDBCServiceException { StagingTable transaction; String nodeName = stateManager.getMdbcServerName(); @@ -117,11 +117,11 @@ public class MusicTxDigestDaemon implements Runnable { List<Range> missingRanges = new ArrayList<>(); if (currentPartitions.size() != 0) { for (DatabasePartition part : currentPartitions) { - List<Range> partitionRanges = part.getSnapshot(); + Set<Range> partitionRanges = part.getSnapshot(); warmupRanges.removeAll(partitionRanges); } try { - stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges)); + stateManager.getOwnAndCheck().warmup(mi, dbi, new HashSet<>(warmupRanges)); } catch (MDBCServiceException e) { logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage()); continue; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java index 4ef9d30..9ff7a0f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java @@ -266,7 +266,7 @@ public class StagingTable { digestBuilder.clear(); } - synchronized public boolean areEventualContained(List<Range> ranges){ + synchronized public boolean areEventualContained(Set<Range> ranges){ return eventuallyConsistentRanges.containsAll(ranges); } } diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/StateManagerTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/StateManagerTest.java index 899fff2..280c733 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/StateManagerTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/StateManagerTest.java @@ -53,7 +53,7 @@ public class StateManagerTest { @Test public void testGetEventualRanges() throws NoSuchFieldException, SecurityException { - List<Range> evList = new ArrayList<>(); + Set<Range> evList = new HashSet<>(); evList.add(new Range("eventualRange")); FieldSetter.setField(stateManager, stateManager.getClass().getDeclaredField("eventualRanges"), evList); assertEquals(evList, stateManager.getEventualRanges()); @@ -61,7 +61,7 @@ public class StateManagerTest { @Test public void testSetEventualRanges() { - List<Range> evList = new ArrayList<>(); + Set<Range> evList = new HashSet<>(); evList.add(new Range("eventualRange")); stateManager.setEventualRanges(evList); assertEquals(evList, stateManager.getEventualRanges()); @@ -116,7 +116,7 @@ public class StateManagerTest { allRanges.add(new Range("eventualRange")); Mockito.when(dbiMock.getSQLRangeSet()).thenReturn(allRanges); - List<Range> eventualRanges = new ArrayList<Range>(); + Set<Range> eventualRanges = new HashSet<Range>(); eventualRanges.add(new Range("eventualRange")); stateManager.setEventualRanges(eventualRanges); diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java index f2bbdcd..ef26cb6 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java @@ -115,7 +115,7 @@ public class MusicMixinTest { @Test public void own() { Range range = new Range("TEST.TABLE1"); - List<Range> ranges = new ArrayList<>(); + Set<Range> ranges = new HashSet<>(); ranges.add(range); DatabasePartition partition=null; try { @@ -139,7 +139,7 @@ public class MusicMixinTest { } } - private DatabasePartition addRow(List<Range> ranges,boolean isLatest){ + private DatabasePartition addRow(Set<Range> ranges,boolean isLatest){ final UUID uuid = MDBCUtils.generateTimebasedUniqueKey(); DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null); MusicRangeInformationRow newRow = new MusicRangeInformationRow(dbPartition, new ArrayList<>(), isLatest); @@ -161,19 +161,19 @@ public class MusicMixinTest { @Ignore // TODO: Move ownership tests to OwnershipAndCheckpointTest @Test(timeout=1000) public void own2() throws InterruptedException, MDBCServiceException { - List<Range> range12 = new ArrayList<>( Arrays.asList( + Set<Range> range12 = new HashSet<>( Arrays.asList( new Range("TEST.RANGE1"), new Range("TEST.RANGE2") )); - List<Range> range34 = new ArrayList<>( Arrays.asList( + Set<Range> range34 = new HashSet<>( Arrays.asList( new Range("TEST.RANGE3"), new Range("TEST.RANGE4") )); - List<Range> range24 = new ArrayList<>( Arrays.asList( + Set<Range> range24 = new HashSet<>( Arrays.asList( new Range("TEST.RANGE2"), new Range("TEST.RANGE4") )); - List<Range> range123 = new ArrayList<>( Arrays.asList( + Set<Range> range123 = new HashSet<>( Arrays.asList( new Range("TEST.RANGE1"), new Range("TEST.RANGE2"), new Range("TEST.RANGE3") @@ -225,7 +225,7 @@ public class MusicMixinTest { MusicRangeInformationRow row = mixin.getMusicRangeInformation(own.getRangeId()); assertTrue(row.getIsLatest()); DatabasePartition dbPartition = row.getDBPartition(); - List<Range> snapshot = dbPartition.getSnapshot(); + Set<Range> snapshot = dbPartition.getSnapshot(); assertEquals(3,snapshot.size()); MusicRangeInformationRow node5row = mixin.getMusicRangeInformation(node5.getId()); assertFalse(node5row.getIsLatest()); diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java index 2134a79..a1cf2b1 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.*; import java.sql.*; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.Set; @@ -207,7 +208,7 @@ public class PostgresMixinTest { assertFalse(st.isEmpty()); cleanTestTable(); checkEmptyTestTable(); - List<Range> ranges = new ArrayList<>(); + Set<Range> ranges = new HashSet<>(); ranges.add(new Range("public.testtable")); try { mixin.applyTxDigest(st,ranges); diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java index fa5583c..ee50dca 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java @@ -43,12 +43,12 @@ import static org.junit.Assert.*; public class DagTest { - private MusicRangeInformationRow createNewRow(List<Range> ranges, String lockid, boolean isLatest){ + private MusicRangeInformationRow createNewRow(Set<Range> ranges, String lockid, boolean isLatest){ List<MusicTxDigestId> redoLog = new ArrayList<>(); return createNewRow(ranges,lockid,isLatest,redoLog); } - private MusicRangeInformationRow createNewRow(List<Range> ranges, String lockid, boolean isLatest, + private MusicRangeInformationRow createNewRow(Set<Range> ranges, String lockid, boolean isLatest, List<MusicTxDigestId> redoLog) { UUID id = MDBCUtils.generateTimebasedUniqueKey(); DatabasePartition dbPartition = new DatabasePartition(ranges, id, lockid); @@ -58,14 +58,14 @@ public class DagTest { @Test public void getDag() throws InterruptedException, MDBCServiceException { List<MusicRangeInformationRow> rows = new ArrayList<>(); - List<Range> ranges = new ArrayList<>( Arrays.asList( + Set<Range> ranges = new HashSet<>( Arrays.asList( new Range("schema.range1") )); - rows.add(createNewRow(new ArrayList<>(ranges),"",false)); + rows.add(createNewRow(new HashSet<>(ranges),"",false)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(ranges),"",false)); + rows.add(createNewRow(new HashSet<>(ranges),"",false)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(ranges),"",true)); + rows.add(createNewRow(new HashSet<>(ranges),"",true)); Dag dag = Dag.getDag(rows, ranges); DagNode node1 = dag.getNode(rows.get(0).getPartitionIndex()); DagNode node2 = dag.getNode(rows.get(1).getPartitionIndex()); @@ -92,15 +92,15 @@ public class DagTest { List<Range> range2 = new ArrayList<>( Arrays.asList( new Range("schema.range2") )); - List<Range> ranges = new ArrayList<>( Arrays.asList( + Set<Range> ranges = new HashSet<>( Arrays.asList( new Range("schema.range2"), new Range("schema.range1") )); - rows.add(createNewRow(new ArrayList<>(range1),"",false)); + rows.add(createNewRow(new HashSet<>(range1),"",false)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(range2),"",false)); + rows.add(createNewRow(new HashSet<>(range2),"",false)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(ranges),"",true)); + rows.add(createNewRow(new HashSet<>(ranges),"",true)); Dag dag = Dag.getDag(rows, ranges); DagNode node1 = dag.getNode(rows.get(0).getPartitionIndex()); DagNode node2 = dag.getNode(rows.get(1).getPartitionIndex()); @@ -122,14 +122,14 @@ public class DagTest { @Test public void nextToOwn() throws InterruptedException, MDBCServiceException { List<MusicRangeInformationRow> rows = new ArrayList<>(); - List<Range> ranges = new ArrayList<>( Arrays.asList( + Set<Range> ranges = new HashSet<>( Arrays.asList( new Range("schema.range1") )); - rows.add(createNewRow(new ArrayList<>(ranges),"",false)); + rows.add(createNewRow(new HashSet<>(ranges),"",false)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(ranges),"",false)); + rows.add(createNewRow(new HashSet<>(ranges),"",false)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(ranges),"",true)); + rows.add(createNewRow(new HashSet<>(ranges),"",true)); Dag dag = Dag.getDag(rows, ranges); int counter = 0; while(dag.hasNextToOwn()){ @@ -148,23 +148,23 @@ public class DagTest { @Test public void nextToApply() throws InterruptedException { List<MusicRangeInformationRow> rows = new ArrayList<>(); - List<Range> ranges = new ArrayList<>( Arrays.asList( + Set<Range> ranges = new HashSet<>( Arrays.asList( new Range("schema.range1") )); List<MusicTxDigestId> redo1 = new ArrayList<>(Arrays.asList( new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0) )); - rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo1)); + rows.add(createNewRow(new HashSet<>(ranges),"",false,redo1)); MILLISECONDS.sleep(10); List<MusicTxDigestId> redo2 = new ArrayList<>(Arrays.asList( new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0) )); - rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo2)); + rows.add(createNewRow(new HashSet<>(ranges),"",false,redo2)); MILLISECONDS.sleep(10); List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList( new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0) )); - rows.add(createNewRow(new ArrayList<>(ranges),"",true,redo3)); + rows.add(createNewRow(new HashSet<>(ranges),"",true,redo3)); Dag dag = Dag.getDag(rows, ranges); int nodeCounter = 0; HashSet<Range> rangesSet = new HashSet<>(ranges); @@ -194,26 +194,26 @@ public class DagTest { public void nextToApply2() throws InterruptedException, MDBCServiceException { Map<Range, Pair<MriReference, Integer>> alreadyApplied = new HashMap<>(); List<MusicRangeInformationRow> rows = new ArrayList<>(); - List<Range> ranges = new ArrayList<>( Arrays.asList( + Set<Range> ranges = new HashSet<>( Arrays.asList( new Range("schema.range1") )); List<MusicTxDigestId> redo1 = new ArrayList<>(Arrays.asList( new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0) )); - rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo1)); + rows.add(createNewRow(new HashSet<>(ranges),"",false,redo1)); MILLISECONDS.sleep(10); List<MusicTxDigestId> redo2 = new ArrayList<>(Arrays.asList( new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0), new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),1) )); - MusicRangeInformationRow newRow = createNewRow(new ArrayList<>(ranges), "", false, redo2); + MusicRangeInformationRow newRow = createNewRow(new HashSet<>(ranges), "", false, redo2); alreadyApplied.put(new Range("schema.range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0)); rows.add(newRow); MILLISECONDS.sleep(10); List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList( new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0) )); - rows.add(createNewRow(new ArrayList<>(ranges),"",true,redo3)); + rows.add(createNewRow(new HashSet<>(ranges),"",true,redo3)); Dag dag = Dag.getDag(rows, ranges); HashSet<Range> rangesSet = new HashSet<>(ranges); dag.setAlreadyApplied(alreadyApplied, rangesSet); @@ -249,22 +249,22 @@ public class DagTest { List<Range> range2 = new ArrayList<>( Arrays.asList( new Range("schema.range2") )); - List<Range> ranges = new ArrayList<>( Arrays.asList( + Set<Range> ranges = new HashSet<>( Arrays.asList( new Range("schema.range2"), new Range("schema.range1") )); - rows.add(createNewRow(new ArrayList<>(range1),"",false)); + rows.add(createNewRow(new HashSet<>(range1),"",false)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(range2),"",false)); + rows.add(createNewRow(new HashSet<>(range2),"",false)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(ranges),"",true)); + rows.add(createNewRow(new HashSet<>(ranges),"",true)); List<MusicRangeInformationRow> rows2 = new ArrayList<>(rows); List<MusicRangeInformationRow> rows3 = new ArrayList<>(rows); MILLISECONDS.sleep(10); - rows3.add(createNewRow(new ArrayList<>(ranges),"",true)); + rows3.add(createNewRow(new HashSet<>(ranges),"",true)); Dag dag = Dag.getDag(rows, ranges); - Dag dag2 = Dag.getDag(rows2, new ArrayList<>(ranges)); - Dag dag3 = Dag.getDag(rows3, new ArrayList<>(ranges)); + Dag dag2 = Dag.getDag(rows2, new HashSet<>(ranges)); + Dag dag3 = Dag.getDag(rows3, new HashSet<>(ranges)); assertFalse(dag.isDifferent(dag2)); assertFalse(dag2.isDifferent(dag)); assertTrue(dag.isDifferent(dag3)); @@ -282,19 +282,19 @@ public class DagTest { List<Range> range2 = new ArrayList<>( Arrays.asList( new Range("schema.range2") )); - List<Range> ranges = new ArrayList<>( Arrays.asList( + Set<Range> ranges = new HashSet<>( Arrays.asList( new Range("schema.range2"), new Range("schema.range1") )); - rows.add(createNewRow(new ArrayList<>(range1),"",false)); + rows.add(createNewRow(new HashSet<>(range1),"",false)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(range2),"",false)); + rows.add(createNewRow(new HashSet<>(range2),"",false)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(range1),"",true)); + rows.add(createNewRow(new HashSet<>(range1),"",true)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(range2),"",true)); + rows.add(createNewRow(new HashSet<>(range2),"",true)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(ranges),"",true)); + rows.add(createNewRow(new HashSet<>(ranges),"",true)); Dag dag = Dag.getDag(rows, ranges); List<DagNode> oldestDoubles = dag.getOldestDoubles(); assertTrue(oldestDoubles.contains(dag.getNode(rows.get(2).getPartitionIndex()))); @@ -312,22 +312,22 @@ public class DagTest { new Range("schema.range2"), new Range("schema.range3") )); - List<Range> ranges = new ArrayList<>( Arrays.asList( + Set<Range> ranges = new HashSet<>( Arrays.asList( new Range("schema.range2"), new Range("schema.range1") )); - rows.add(createNewRow(new ArrayList<>(range1),"",false)); + rows.add(createNewRow(new HashSet<>(range1),"",false)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(range2),"",false)); + rows.add(createNewRow(new HashSet<>(range2),"",false)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(range1),"",true)); + rows.add(createNewRow(new HashSet<>(range1),"",true)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(range2),"",true)); + rows.add(createNewRow(new HashSet<>(range2),"",true)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(ranges),"",true)); + rows.add(createNewRow(new HashSet<>(ranges),"",true)); Dag dag = Dag.getDag(rows, ranges); - Pair<List<Range>, Set<DagNode>> incompleteRangesAndDependents = dag.getIncompleteRangesAndDependents(); - List<Range> incomplete = incompleteRangesAndDependents.getKey(); + Pair<Set<Range>, Set<DagNode>> incompleteRangesAndDependents = dag.getIncompleteRangesAndDependents(); + Set<Range> incomplete = incompleteRangesAndDependents.getKey(); Set<DagNode> dependents = incompleteRangesAndDependents.getValue(); assertEquals(1,incomplete.size()); assertTrue(incomplete.contains(new Range("schema.range3"))); @@ -346,22 +346,22 @@ public class DagTest { new Range("schema.range2"), new Range("schema.range3") )); - List<Range> ranges = new ArrayList<>( Arrays.asList( + Set<Range> ranges = new HashSet<>( Arrays.asList( new Range("schema.range2"), new Range("schema.range1") )); - rows.add(createNewRow(new ArrayList<>(range1),"",false)); + rows.add(createNewRow(new HashSet<>(range1),"",false)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(range2),"",false)); + rows.add(createNewRow(new HashSet<>(range2),"",false)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(range1),"",true)); + rows.add(createNewRow(new HashSet<>(range1),"",true)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(range2),"",true)); + rows.add(createNewRow(new HashSet<>(range2),"",true)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(ranges),"",true)); + rows.add(createNewRow(new HashSet<>(ranges),"",true)); Dag dag = Dag.getDag(rows, ranges); - Pair<List<Range>, Set<DagNode>> incompleteRangesAndDependents = dag.getIncompleteRangesAndDependents(); - List<Range> incomplete = incompleteRangesAndDependents.getKey(); + Pair<Set<Range>, Set<DagNode>> incompleteRangesAndDependents = dag.getIncompleteRangesAndDependents(); + Set<Range> incomplete = incompleteRangesAndDependents.getKey(); Set<DagNode> dependents = incompleteRangesAndDependents.getValue(); assertEquals(2,incomplete.size()); assertTrue(incomplete.contains(new Range("schema.range3"))); @@ -381,26 +381,26 @@ public class DagTest { new Range("schema.range2"), new Range("schema.range3") )); - List<Range> ranges = new ArrayList<>( Arrays.asList( + Set<Range> ranges = new HashSet<>( Arrays.asList( new Range("schema.range2"), new Range("schema.range1") )); - List<Range> allRanges = new ArrayList<>( Arrays.asList( + Set<Range> allRanges = new HashSet<>( Arrays.asList( new Range("schema.range2"), new Range("schema.range3"), new Range("schema.range1") )); - rows.add(createNewRow(new ArrayList<>(range1),"",false)); + rows.add(createNewRow(new HashSet<>(range1),"",false)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(range2),"",false)); + rows.add(createNewRow(new HashSet<>(range2),"",false)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(range1),"",true)); + rows.add(createNewRow(new HashSet<>(range1),"",true)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(range2),"",true)); + rows.add(createNewRow(new HashSet<>(range2),"",true)); MILLISECONDS.sleep(10); - rows.add(createNewRow(new ArrayList<>(ranges),"",true)); + rows.add(createNewRow(new HashSet<>(ranges),"",true)); Dag dag = Dag.getDag(rows, ranges); - MusicRangeInformationRow newRow = createNewRow(new ArrayList<>(allRanges), "", true); + MusicRangeInformationRow newRow = createNewRow(new HashSet<>(allRanges), "", true); dag.addNewNodeWithSearch(newRow,allRanges); DagNode newNode = dag.getNode(newRow.getPartitionIndex()); DagNode node = dag.getNode(rows.get(4).getPartitionIndex()); diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java index 2435762..2443d1e 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java @@ -142,6 +142,7 @@ public class OwnershipAndCheckpointTest { this.conn = MdbcTestUtils.getConnection(DBType.MySQL); Properties info = new Properties(); this.mysqlMixin = new MySQLMixin(musicMixin, "localhost:"+MdbcTestUtils.getMariaDbPort()+"/"+DATABASE, conn, info); + this.mysqlMixin.initTables(); dropAndCreateTable(); } @@ -170,7 +171,7 @@ public class OwnershipAndCheckpointTest { } } - private OwnershipReturn cleanAndOwnPartition(List<Range> ranges, UUID ownOpId) throws SQLException { + private OwnershipReturn cleanAndOwnPartition(Set<Range> ranges, UUID ownOpId) throws SQLException { dropAndCreateTable(); cleanAlreadyApplied(ownAndCheck); DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey()); @@ -213,7 +214,7 @@ public class OwnershipAndCheckpointTest { initDatabase(range); - List<Range> ranges = new ArrayList<>(); + Set<Range> ranges = new HashSet<>(); ranges.add(range); UUID ownOpId = MDBCUtils.generateTimebasedUniqueKey(); OwnershipReturn own = cleanAndOwnPartition(ranges,ownOpId); @@ -237,7 +238,7 @@ public class OwnershipAndCheckpointTest { initDatabase(range); - List<Range> ranges = new ArrayList<>(); + Set<Range> ranges = new HashSet<>(); ranges.add(range); UUID ownOpId = MDBCUtils.generateTimebasedUniqueKey(); OwnershipReturn own = cleanAndOwnPartition(ranges,ownOpId); @@ -259,7 +260,7 @@ public class OwnershipAndCheckpointTest { public void readOwn() throws Exception { Range range = new Range("TABLE1"); MusicInterface mi = MdbcTestUtils.getMusicMixin(); - List<Range> ranges = new ArrayList<>(); + Set<Range> ranges = new HashSet<>(); ranges.add(range); final DatabasePartition partition = TestUtils.createBasicRow(range, mi, MdbcTestUtils.getServerName()); TestUtils.unlockRow(MdbcTestUtils.getKeyspace(), MdbcTestUtils.getMriTableName(), partition); |