aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java16
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java7
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java36
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/Range.java4
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java16
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java2
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java9
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java8
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java3
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java64
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java19
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java9
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java42
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java169
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java1930
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java17
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java20
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java4
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java127
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java4
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java10
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java6
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java2
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/StateManagerTest.java6
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java14
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java3
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java118
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java9
28 files changed, 1479 insertions, 1195 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
index c8cad47..314248f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
@@ -39,7 +39,7 @@ public class DatabasePartition {
private UUID musicRangeInformationIndex;//Index that can be obtained either from
private String lockId;
- protected List<Range> ranges;
+ protected Set<Range> ranges;
private boolean ready;
@@ -49,14 +49,14 @@ public class DatabasePartition {
*/
public DatabasePartition() {
- this(new ArrayList<Range>(),null,"");
+ this(new HashSet<Range>(),null,"");
}
public DatabasePartition(UUID mriIndex) {
- this(new ArrayList<Range>(), mriIndex,"");
+ this(new HashSet<Range>(), mriIndex,"");
}
-
- public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String lockId) {
+
+ public DatabasePartition(Set<Range> knownRanges, UUID mriIndex, String lockId) {
if(mriIndex==null){
ready = false;
}
@@ -139,9 +139,9 @@ public class DatabasePartition {
* Get all the ranges that are currently owned
* @return ranges
*/
- public synchronized List<Range> getSnapshot() {
- List<Range> newRange = new ArrayList<>();
- for(Range r : ranges){
+ public synchronized Set<Range> getSnapshot() {
+ Set<Range> newRange = new HashSet<>();
+ for (Range r: ranges){
newRange.add(r.clone());
}
return newRange;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
index a02e6d0..b60062e 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Base64;
import java.util.Deque;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -98,12 +99,12 @@ public class MDBCUtils {
return prop;
}
- public static List<Range> getTables(Map<String,List<SQLOperation>> queryParsed){
+ public static Set<Range> getTables(Map<String,List<SQLOperation>> queryParsed){
return getTables(null, queryParsed);
}
- public static List<Range> getTables(String defaultDatabaseName, Map<String,List<SQLOperation>> queryParsed){
- List<Range> ranges = new ArrayList<>();
+ public static Set<Range> getTables(String defaultDatabaseName, Map<String,List<SQLOperation>> queryParsed){
+ Set<Range> ranges = new HashSet<>();
for(String table: queryParsed.keySet()){
String[] parts = table.split("\\.");
if(parts.length==2){
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
index 61ce6bd..2294673 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
@@ -34,10 +34,15 @@ import java.sql.SQLXML;
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
-import java.util.*;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.Executor;
-
-import org.apache.commons.lang3.NotImplementedException;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.exceptions.QueryException;
import org.onap.music.logging.EELFLoggerDelegate;
@@ -55,7 +60,6 @@ import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
import org.onap.music.mdbc.query.QueryProcessor;
import org.onap.music.mdbc.query.SQLOperation;
import org.onap.music.mdbc.query.SQLOperationType;
-import org.onap.music.mdbc.tables.MusicTxDigestDaemon;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.TxCommitProgress;
@@ -501,21 +505,29 @@ public class MdbcConnection implements Connection {
* @param sql the SQL statement that is about to be executed
*/
public void preStatementHook(final String sql) throws MDBCServiceException, SQLException {
+
+ // some debug specific logic
+ if(sql.startsWith("DEBUG")) {
+ // if the SQL follows this convention: "DEBUG:TABLE_A,TABLE_B",
+ // DAG information pertaining to the tables will get printed
+ throw new SQLException("\nThis call was made for debugging purposes only\n" + statemanager.getOwnAndCheck().getDebugInfo(mi,sql.split(":")[1]));
+ }
+
//TODO: verify ownership of keys here
//Parse tables from the sql query
Map<String, List<SQLOperation>> tableToQueryType = QueryProcessor.parseSqlQuery(sql, table_set);
//Check ownership of keys
String defaultSchema = dbi.getSchema();
- List<Range> queryTables = MDBCUtils.getTables(defaultSchema, tableToQueryType);
+ Set<Range> queryTables = MDBCUtils.getTables(defaultSchema, tableToQueryType);
if (this.partition!=null) {
- List<Range> snapshot = this.partition.getSnapshot();
+ Set<Range> snapshot = this.partition.getSnapshot();
if(snapshot!=null){
queryTables.addAll(snapshot);
}
}
// filter out ranges that fall under Eventually consistent
// category as these tables do not need ownership
- List<Range> scQueryTables = filterEveTables(queryTables);
+ Set<Range> scQueryTables = filterEveTables(queryTables);
DatabasePartition tempPartition = own(scQueryTables, MDBCUtils.getOperationType(tableToQueryType));
if(tempPartition!=null && tempPartition != partition) {
this.partition.updateDatabasePartition(tempPartition);
@@ -525,7 +537,7 @@ public class MdbcConnection implements Connection {
}
- private List<Range> filterEveTables(List<Range> queryTables) {
+ private Set<Range> filterEveTables(Set<Range> queryTables) {
queryTables.removeAll(statemanager.getEventualRanges());
return queryTables;
}
@@ -539,6 +551,11 @@ public class MdbcConnection implements Connection {
dbi.postStatementHook(sql, transactionDigest);
}
+ public void initDatabase() throws QueryException {
+ dbi.initTables();
+ createTriggers();
+ }
+
/**
* Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the
* proxy first starts, and whenever there is the possibility that tables were created or dropped. It is synchronized
@@ -553,6 +570,7 @@ public class MdbcConnection implements Connection {
logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName);
try {
dbi.createSQLTriggers(tableName);
+ mi.createPartitionIfNeeded(new Range(tableName));
table_set.add(tableName.toUpperCase());
} catch (Exception e) {
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
@@ -573,7 +591,7 @@ public class MdbcConnection implements Connection {
* @return
* @throws MDBCServiceException
*/
- private DatabasePartition own(List<Range> ranges, SQLOperationType lockType) throws MDBCServiceException {
+ private DatabasePartition own(Set<Range> ranges, SQLOperationType lockType) throws MDBCServiceException {
if(ranges==null||ranges.isEmpty()){
return null;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
index 41aed26..82a5d16 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
@@ -22,7 +22,7 @@ package org.onap.music.mdbc;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
-
+import java.util.Set;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.mixins.MusicMixin;
@@ -83,7 +83,7 @@ public class Range implements Cloneable{
}
- public static boolean overlaps(List<Range> ranges, String table){
+ public static boolean overlaps(Set<Range> ranges, String table) {
//\TODO check if parallel stream makes sense here
return ranges.stream().map((Range r) -> r.table.toUpperCase().equals(table.toUpperCase())).anyMatch((Boolean b) -> b);
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index e284103..66c8fa9 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -86,7 +86,7 @@ public class StateManager {
private String mdbcServerName;
private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition
private final Lock eventualLock = new ReentrantLock();
- private List<Range> eventualRanges;
+ private Set<Range> eventualRanges;
/** lock for warmupRanges */
private final Lock warmupLock = new ReentrantLock();
/** a set of ranges that should be periodically updated with latest information, if null all tables should be warmed up */
@@ -182,7 +182,7 @@ public class StateManager {
// and create triggers on any tables that need them
try {
MdbcConnection mdbcConn = (MdbcConnection) openConnection("init");
- mdbcConn.createTriggers();
+ mdbcConn.initDatabase();
closeConnection("init");
} catch (QueryException e) {
logger.error("Error syncrhonizing tables");
@@ -255,24 +255,24 @@ public class StateManager {
* Get a list of ranges that are eventually consistent
* @return
*/
- public List<Range> getEventualRanges() {
+ public Set<Range> getEventualRanges() {
eventualLock.lock();
- List<Range> returnArray;
+ Set<Range> returnSet;
try {
if(eventualRanges!=null){
- returnArray = new ArrayList<>(eventualRanges);
+ returnSet = new HashSet<>(eventualRanges);
}
else{
- returnArray= new ArrayList<>();
+ returnSet= new HashSet<>();
}
}
finally{
eventualLock.unlock();
}
- return returnArray;
+ return returnSet;
}
- public void setEventualRanges(List<Range> eventualRanges) {
+ public void setEventualRanges(Set<Range> eventualRanges) {
eventualLock.lock();
try {
this.eventualRanges = eventualRanges;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java
index 0693a97..496f48d 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java
@@ -42,7 +42,7 @@ public class TestUtils {
public static DatabasePartition createBasicRow(Range range, MusicInterface mixin, String mdbcServerName)
throws MDBCServiceException {
final UUID uuid = MDBCUtils.generateTimebasedUniqueKey();
- List<Range> ranges = new ArrayList<>();
+ Set<Range> ranges = new HashSet<>();
ranges.add(range);
DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null);
new MusicRangeInformationRow(dbPartition, new ArrayList<>(), true);
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java
index 0021bcc..bbd3f35 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java
@@ -20,6 +20,7 @@
package org.onap.music.mdbc.configurations;
import java.util.List;
+import java.util.Set;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.Range;
@@ -30,18 +31,18 @@ public class Eventual {
private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Eventual.class);
- protected List<Range> ranges;
+ protected Set<Range> ranges;
- public Eventual(List<Range> ranges) {
+ public Eventual(Set<Range> ranges) {
super();
this.ranges = ranges;
}
- public List<Range> getRanges() {
+ public Set<Range> getRanges() {
return ranges;
}
- public void setRanges(List<Range> ranges) {
+ public void setRanges(Set<Range> ranges) {
this.ranges = ranges;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
index 391ee1a..be8217b 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
@@ -49,17 +49,17 @@ public class NodeConfiguration {
this.sqlDatabaseName = sqlDatabaseName;
}
- protected List<Range> toRanges(List<String> tables){
- List<Range> newRange = new ArrayList<>();
+ protected Set<Range> toRanges(List<String> tables){
+ Set<Range> newRange = new HashSet<>();
for(String table: tables) {
newRange.add(new Range(table));
}
return newRange;
}
- protected List<Range> toRanges(String tables){
+ protected Set<Range> toRanges(String tables){
if(tables.isEmpty()){
- return new ArrayList<>();
+ return new HashSet<>();
}
String[] tablesArray=tables.split(",");
return toRanges(new ArrayList<>(Arrays.asList(tablesArray)));
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
index 0598271..1c9d07c 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
@@ -38,6 +38,7 @@ import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
public class TablesConfiguration {
@@ -113,7 +114,7 @@ public class TablesConfiguration {
final ResultSet resultSet = MusicCore.quorumGet(checkRowsInTable);
while(resultSet!=null && !resultSet.isExhausted()){
final MusicRangeInformationRow mriRowFromCassandraRow = MusicMixin.getMRIRowFromCassandraRow(resultSet.one());
- List<Range> ranges = mriRowFromCassandraRow.getDBPartition().getSnapshot();
+ Set<Range> ranges = mriRowFromCassandraRow.getDBPartition().getSnapshot();
for(Range range: partition.getTables()) {
if (Range.overlaps(ranges,range.getTable())){
throw new MDBCServiceException("MRI row already exists");
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java
index fea329d..fac9f36 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java
@@ -106,11 +106,13 @@ public class MdbcTestClient {
e.printStackTrace();
}
- final String insertSQL = "INSERT INTO Persons VALUES (1, 'Martinez', 'Juan', 'KACB', 'ATLANTA');";
- final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=2;";
- final String insertSQL2 = "INSERT INTO Persons VALUES (2, 'Smith', 'JOHN', 'GNOC', 'BEDMINSTER');";
+ final String insertSQL = "INSERT INTO Persons VALUES (1, 'Smith', 'Juan', 'KACB', 'ATLANTA');";
+ final String insertSQL1 = "INSERT INTO Persons2 VALUES (1, 'Smith', 'Juan', 'KACB', 'ATLANTA');";
+ final String insertSQL2 = "INSERT INTO Persons3 VALUES (2, 'Smith', 'JOHN', 'GNOC', 'BEDMINSTER');";
final String insertSQL3 = "UPDATE Persons SET FirstName='JOSH' WHERE LastName='Smith';";
- final String insertSQL4 = "UPDATE Persons SET FirstName='JOHN' WHERE LastName='Smith';";
+ final String insertSQL4 = "UPDATE Persons2 SET FirstName='JOHN' WHERE LastName='Smith';";
+ final String insertSQL5 = "UPDATE Persons SET FirstName='JOHN' WHERE LastName='Smith';";
+ final String insertSQL6 = "UPDATE Persons3 SET FirstName='JOHN' WHERE LastName='Smith';";
final String selectSQL1 = "SELECT * FROM Persons;";
@@ -123,13 +125,55 @@ public class MdbcTestClient {
}
try {
- //execute = insertStmt.execute(insertSQL);
- //execute = insertStmt.execute(insertSQL1);
- //execute = insertStmt.execute(insertSQL2);
- //execute = insertStmt.execute(insertSQL3);
- //execute = insertStmt.execute(insertSQL4);
+ /*
+ * insert into 1
+ * insert into 2
+ * insert into 3
+ * insert into 1,2
+ * insert into 1,3
+ */
+ execute = insertStmt.execute(insertSQL);
+ connection.commit();
+
+ connection.close();
+ connection = DriverManager.getConnection("jdbc:avatica:remote:url=" + "http://localhost:30000/test"+ ";serialization=protobuf");
+ connection.setAutoCommit(false);
+ insertStmt = connection.createStatement();
+
+ execute = insertStmt.execute(insertSQL1);
+ connection.commit();
+
+ connection.close();
+ connection = DriverManager.getConnection("jdbc:avatica:remote:url=" + "http://localhost:30000/test"+ ";serialization=protobuf");
+ connection.setAutoCommit(false);
+ insertStmt = connection.createStatement();
+
+ execute = insertStmt.execute(insertSQL2);
+ connection.commit();
+
+ connection.close();
+ connection = DriverManager.getConnection("jdbc:avatica:remote:url=" + "http://localhost:30000/test"+ ";serialization=protobuf");
+ connection.setAutoCommit(false);
+ insertStmt = connection.createStatement();
+
+ System.out.println("1,2");
+ execute = insertStmt.execute(insertSQL3);
+ execute = insertStmt.execute(insertSQL4);
+ connection.commit();
+
+ connection.close();
+ connection = DriverManager.getConnection("jdbc:avatica:remote:url=" + "http://localhost:30000/test"+ ";serialization=protobuf");
+ connection.setAutoCommit(false);
+ insertStmt = connection.createStatement();
+
+ System.out.println("1,3,2");
+
+ execute = insertStmt.execute(insertSQL5);
+ execute = insertStmt.execute(insertSQL6);
+ execute = insertStmt.execute(insertSQL4);
+ connection.commit();
- ///*
+ /*
ResultSet rs = insertStmt.executeQuery(selectSQL1);
while (rs.next()) {
System.out.printf("%d, %s, %s\n", rs.getInt("PersonID"), rs.getString("FirstName"), rs.getString("LastName"));
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
index 063ea3f..745307c 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
@@ -26,7 +26,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
+import java.util.UUID;
+import org.apache.commons.lang3.tuple.Pair;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
@@ -133,15 +134,27 @@ public interface DBInterface {
* @throws SQLException if replay cannot occur correctly
* @throws MDBCServiceException
*/
- void replayTransaction(StagingTable digest, List<Range> ranges) throws SQLException, MDBCServiceException;
+ void replayTransaction(StagingTable digest, Set<Range> ranges) throws SQLException, MDBCServiceException;
void disableForeignKeyChecks() throws SQLException;
void enableForeignKeyChecks() throws SQLException;
- void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException;
+ void applyTxDigest(StagingTable txDigest, Set<Range> ranges) throws SQLException, MDBCServiceException;
Connection getSQLConnection();
String getSchema();
+
+ /**
+ * Update pointer to where this server has successfully replayed transactions
+ * @param r
+ * @param playbackPointer
+ */
+ public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer);
+
+ /**
+ * Initialize the SQL database by creating any tables necessary
+ */
+ public void initTables();
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java
index 8e3f20c..5e8ba87 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java
@@ -21,6 +21,7 @@
package org.onap.music.mdbc.mixins;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import org.onap.music.mdbc.Range;
@@ -28,12 +29,12 @@ public class LockResult{
private boolean successful;
private UUID musicRangeInformationIndex;
private String lockId;
- private List<Range> ranges;
+ private Set<Range> ranges;
private boolean newLock;
/** back off time in milliseconds */
private long backOffPeriodms;
- public LockResult(boolean succesful, UUID rowId, String lockId, boolean newLock, List<Range> ranges){
+ public LockResult(boolean succesful, UUID rowId, String lockId, boolean newLock, Set<Range> ranges){
this.successful = succesful;
this.musicRangeInformationIndex = rowId;
this.lockId=lockId;
@@ -48,7 +49,7 @@ public class LockResult{
* @param ranges
*/
@Deprecated
- public LockResult(UUID rowId, String lockId, boolean newLock, List<Range> ranges){
+ public LockResult(UUID rowId, String lockId, boolean newLock, Set<Range> ranges){
this.successful = true;
this.musicRangeInformationIndex = rowId;
this.lockId=lockId;
@@ -74,7 +75,7 @@ public class LockResult{
return musicRangeInformationIndex;
}
- public List<Range> getRanges() {
+ public Set<Range> getRanges() {
return ranges;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
index 4ae4413..637cb15 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
@@ -22,7 +22,7 @@ package org.onap.music.mdbc.mixins;
import com.datastax.driver.core.ResultSet;
import java.nio.ByteBuffer;
import java.util.*;
-
+import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONObject;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.exceptions.MusicLockingException;
@@ -46,24 +46,24 @@ import org.onap.music.mdbc.tables.*;
public interface MusicInterface {
class OwnershipReturn{
private final UUID ownershipId;
- private final String ownerId;
+ private final String lockId;
private final UUID rangeId;
- private final List<Range> ranges;
+ private final Set<Range> ranges;
private final Dag dag;
- public OwnershipReturn(UUID ownershipId, String ownerId, UUID rangeId, List<Range> ranges, Dag dag){
+ public OwnershipReturn(UUID ownershipId, String ownerId, UUID rangeId, Set<Range> ranges, Dag dag){
this.ownershipId=ownershipId;
- this.ownerId=ownerId;
+ this.lockId=ownerId;
this.rangeId=rangeId;
this.ranges=ranges;
this.dag=dag;
}
public String getOwnerId(){
- return ownerId;
+ return lockId;
}
public UUID getRangeId(){
return rangeId;
}
- public List<Range> getRanges(){ return ranges; }
+ public Set<Range> getRanges(){ return ranges; }
public Dag getDag(){return dag;}
public UUID getOwnershipId() { return ownershipId; }
}
@@ -189,7 +189,7 @@ public interface MusicInterface {
* @param progressKeeper data structure that is used to handle to detect failures, and know what to do
* @throws MDBCServiceException
*/
- void commitLog(DatabasePartition partition, List<Range> eventualRanges, StagingTable transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException;
+ void commitLog(DatabasePartition partition, Set<Range> eventualRanges, StagingTable transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException;
/**
@@ -208,7 +208,7 @@ public interface MusicInterface {
*/
RangeDependency getMusicRangeDependency(Range baseRange) throws MDBCServiceException;
- List<Range> getRangeDependencies(List<Range> range) throws MDBCServiceException;
+ public Set<Range> getRangeDependencies(Set<Range> range) throws MDBCServiceException;
/**
* This function is used to create a new locked row in the MRI table
@@ -329,16 +329,28 @@ public interface MusicInterface {
*
* Does not merge rows if a single previous row is sufficient to match new partition needed
*
- * @param extendedDag
- * @param latestRows
- * @param ranges
- * @param locks
+ * @param currentlyOwned
+ * @param locksForOwnership
* @param ownershipId
* @return
* @throws MDBCServiceException
*/
- OwnershipReturn mergeLatestRowsIfNecessary(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges,
- Map<UUID, LockResult> locks, UUID ownershipId) throws MDBCServiceException;
+ OwnershipReturn mergeLatestRowsIfNecessary(Dag currentlyOwned, Map<UUID, LockResult> locksForOwnership, UUID ownershipId)
+ throws MDBCServiceException;
+
+ /**
+ * Create ranges in MRI table, if not already present
+ * @param range to add into mri table
+ */
+ public void createPartitionIfNeeded(Range rangeToCreate) throws MDBCServiceException;
+
+ /**
+ * Update pointer to where this server has successfully replayed transactions
+ * This is an eventual operation for minimal performance hits
+ * @param r
+ * @param playbackPointer
+ */
+ public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer);
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index 1bdb022..5581573 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -24,6 +24,7 @@ import java.io.Reader;
import java.nio.ByteBuffer;
import java.sql.Types;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -124,7 +125,9 @@ public class MusicMixin implements MusicInterface {
private String musicEventualTxDigestTableName = "musicevetxdigest";
public static final String musicRangeInformationTableName = "musicrangeinformation";
private String musicRangeDependencyTableName = "musicrangedependency";
- private String musicNodeInfoTableName = "nodeinfo";
+ private String musicNodeInfoTableName = "musicnodeinfo";
+ /** Table mapping mdbc nodes to their current checkpoint status */
+ private String musicMdbcCheckpointsTableName = "musicmdbccheckpoints";
private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicMixin.class);
@@ -339,9 +342,10 @@ public class MusicMixin implements MusicInterface {
createMusicNodeInfoTable();
createMusicRangeInformationTable(this.music_ns,this.musicRangeInformationTableName);
createMusicRangeDependencyTable(this.music_ns,this.musicRangeDependencyTableName);
+ createMusicMdbcCheckpointTable();
}
catch(MDBCServiceException e){
- logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC");
+ logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC: " + e.getErrorMessage());
}
}
@@ -1194,7 +1198,7 @@ public class MusicMixin implements MusicInterface {
return pendingRows;
}
- private List<Range> lockRow(LockRequest request,Map.Entry<UUID, List<Range>> pending,Map<UUID, String> currentLockRef,
+ private List<Range> lockRow(LockRequest request,Map.Entry<UUID, Set<Range>> pending,Map<UUID, String> currentLockRef,
String fullyQualifiedKey, String lockId, List<Range> pendingToLock,
Map<UUID, LockResult> alreadyHeldLocks)
throws MDBCServiceException{
@@ -1290,7 +1294,7 @@ public class MusicMixin implements MusicInterface {
* This officially commits the transaction globally
*/
@Override
- public void commitLog(DatabasePartition partition,List<Range> eventualRanges, StagingTable transactionDigest,
+ public void commitLog(DatabasePartition partition,Set<Range> eventualRanges, StagingTable transactionDigest,
String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException {
// first deal with commit for eventually consistent tables
@@ -1301,7 +1305,7 @@ public class MusicMixin implements MusicInterface {
return;
}
- List<Range> snapshot = partition.getSnapshot();
+ Set<Range> snapshot = partition.getSnapshot();
if(snapshot==null || snapshot.isEmpty()){
logger.warn("Trying to commit log with empty ranges");
return;
@@ -1360,7 +1364,7 @@ public class MusicMixin implements MusicInterface {
if (progressKeeper != null) {
progressKeeper.setRecordId(txId, digestId);
}
- List<Range> ranges = partition.getSnapshot();
+ Set<Range> ranges = partition.getSnapshot();
for(Range r : ranges) {
Map<Range, Pair<MriReference, Integer>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied();
if(!alreadyApplied.containsKey(r)){
@@ -1377,7 +1381,7 @@ public class MusicMixin implements MusicInterface {
}
}
- private void filterAndAddEventualTxDigest(List<Range> eventualRanges,
+ private void filterAndAddEventualTxDigest(Set<Range> eventualRanges,
StagingTable transactionDigest, String txId,
TxCommitProgress progressKeeper) throws MDBCServiceException {
@@ -1481,7 +1485,7 @@ public class MusicMixin implements MusicInterface {
final UUID id = t.getUUID(1);
digestIds.add(new MusicTxDigestId(partitionIndex,id,index++));
}
- List<Range> partitions = new ArrayList<>();
+ Set<Range> partitions = new HashSet<>();
Set<String> tables = newRow.getSet("keys",String.class);
for (String table:tables){
partitions.add(new Range(table));
@@ -1795,6 +1799,27 @@ public class MusicMixin implements MusicInterface {
throw(e);
}
}
+
+ private void createMusicMdbcCheckpointTable() throws MDBCServiceException {
+ createMusicMdbcCheckpointTable(this.music_ns, this.musicMdbcCheckpointsTableName);
+ }
+
+ public static void createMusicMdbcCheckpointTable(String namespace, String checkpointTable) throws MDBCServiceException {
+ String priKey = "txid";
+ StringBuilder fields = new StringBuilder();
+ fields.append("txid uuid, ");
+ fields.append("compressed boolean, ");
+ fields.append("transactiondigest blob ");//notice lack of ','
+ String cql =
+ String.format("CREATE TABLE IF NOT EXISTS %s.%s (mdbcnode UUID, mridigest UUID, digestindex int, PRIMARY KEY (mdbcnode));",
+ namespace, checkpointTable);
+ try {
+ executeMusicWriteQuery(namespace,checkpointTable,cql);
+ } catch (MDBCServiceException e) {
+ logger.error("Initialization error: Failure to create redo records table");
+ throw(e);
+ }
+ }
/**
* Writes the transaction history to the txDigest
@@ -2048,7 +2073,7 @@ public class MusicMixin implements MusicInterface {
* @throws MDBCServiceException
*/
@Override
- public List<Range> getRangeDependencies(List<Range> range) throws MDBCServiceException{
+ public Set<Range> getRangeDependencies(Set<Range> range) throws MDBCServiceException{
Set<Range> extendedRange = new HashSet<>();
for(Range r: range){
extendedRange.add(r);
@@ -2057,7 +2082,7 @@ public class MusicMixin implements MusicInterface {
extendedRange.addAll(dependencies.dependentRanges());
}
}
- return new ArrayList<>(extendedRange);
+ return extendedRange;
}
@Override
@@ -2089,25 +2114,28 @@ public class MusicMixin implements MusicInterface {
/**
* fixes the DAG in case the previous owner failed while trying to own the row
* @param latestDag
- * @param rows
- * @param ranges
* @param locks
* @throws MDBCServiceException
*/
- private void recoverFromFailureAndUpdateDag(Dag latestDag,List<MusicRangeInformationRow> rows,List<Range> ranges,
- Map<UUID,LockResult> locks) throws MDBCServiceException{
- Pair<List<Range>,Set<DagNode>> rangesAndDependents = latestDag.getIncompleteRangesAndDependents();
+ private void recoverFromFailureAndUpdateDag(Dag latestDag, Map<UUID,LockResult> locks) throws MDBCServiceException {
+ Pair<Set<Range>, Set<DagNode>> rangesAndDependents = latestDag.getIncompleteRangesAndDependents();
if(rangesAndDependents.getKey()==null || rangesAndDependents.getKey().size()==0 ||
rangesAndDependents.getValue()==null || rangesAndDependents.getValue().size() == 0){
return;
}
- MusicRangeInformationRow r = createAndAssignLock(rangesAndDependents.getKey(), rows);
+
+ Set<UUID> prevPartitions = new HashSet<>();
+ for (DagNode dagnode: rangesAndDependents.getRight()) {
+ prevPartitions.add(dagnode.getId());
+ }
+
+ MusicRangeInformationRow r = createAndAssignLock(rangesAndDependents.getKey(), prevPartitions);
locks.put(r.getPartitionIndex(),new LockResult(r.getPartitionIndex(),r.getDBPartition().getLockId(),true,rangesAndDependents.getKey()));
latestDag.addNewNode(r,new ArrayList<>(rangesAndDependents.getValue()));
}
- private List<MusicRangeInformationRow> setReadOnlyAnyDoubleRow(Dag latestDag,List<MusicRangeInformationRow> rows, Map<UUID,LockResult> locks)
+ private List<MusicRangeInformationRow> setReadOnlyAnyDoubleRow(Dag latestDag,Map<UUID,LockResult> locks)
throws MDBCServiceException{
List<MusicRangeInformationRow> returnInfo = new ArrayList<>();
List<DagNode> toDisable = latestDag.getOldestDoubles();
@@ -2119,15 +2147,6 @@ public class MusicMixin implements MusicInterface {
return returnInfo;
}
- private MusicRangeInformationRow createAndAssignLock(List<Range> ranges, List<MusicRangeInformationRow> latestRows) throws MDBCServiceException {
- UUID newUUID = MDBCUtils.generateTimebasedUniqueKey();
- DatabasePartition newPartition = new DatabasePartition(ranges,newUUID,null);
- MusicRangeInformationRow newRow = new MusicRangeInformationRow(newPartition,new ArrayList<>(),
- true, extractPreviousPartitions(latestRows));
- createLockedMRIRow(newRow);
- return newRow;
- }
-
/**
* Create a set of previous partitions to their uuids
* @param latestRows
@@ -2142,37 +2161,45 @@ public class MusicMixin implements MusicInterface {
}
@Override
- public OwnershipReturn mergeLatestRowsIfNecessary(Dag extendedDag, List<MusicRangeInformationRow> latestRows,
- List<Range> ranges, Map<UUID, LockResult> locks, UUID ownershipId) throws MDBCServiceException {
- recoverFromFailureAndUpdateDag(extendedDag,latestRows,ranges,locks);
-
- if (latestRows.size()==1 && latestRows.get(0).getDBPartition().contains(ranges)) {
- //reuse current row if possible
- MusicRangeInformationRow row = latestRows.get(0);
- LockResult lockresult = locks.get(row.getPartitionIndex());
- if (lockresult!=null) {
- return new OwnershipReturn(ownershipId, lockresult.getLockId(), row.getPartitionIndex(), ranges, extendedDag);
+ public OwnershipReturn mergeLatestRowsIfNecessary(Dag currentlyOwned, Map<UUID, LockResult> locksForOwnership, UUID ownershipId) throws MDBCServiceException {
+ recoverFromFailureAndUpdateDag(currentlyOwned,locksForOwnership);
+
+ if (locksForOwnership.keySet().size()==1) {
+ //reuse if overlapping single partition, no merge necessary
+ for (UUID uuid: locksForOwnership.keySet()) {
+ return new OwnershipReturn(ownershipId, locksForOwnership.get(uuid).getLockId(), uuid,
+ currentlyOwned.getNode(uuid).getRangeSet(), currentlyOwned);
}
}
- List<MusicRangeInformationRow> changed = setReadOnlyAnyDoubleRow(extendedDag, latestRows,locks);
- releaseLocks(changed, locks);
- MusicRangeInformationRow row = createAndAssignLock(ranges, latestRows);
- latestRows.add(row);
- locks.put(row.getPartitionIndex(),new LockResult(row.getPartitionIndex(),row.getDBPartition().getLockId(),true,ranges));
- extendedDag.addNewNodeWithSearch(row,ranges);
- Pair<List<Range>, Set<DagNode>> missing = extendedDag.getIncompleteRangesAndDependents();
- if(missing.getKey().size()!=0 && missing.getValue().size()!=0) {
- MusicRangeInformationRow newRow = createAndAssignLock(missing.getKey(), latestRows);
- latestRows.add(newRow);
- locks.put(newRow.getPartitionIndex(), new LockResult(newRow.getPartitionIndex(), row.getDBPartition().getLockId(),
- true, missing.getKey()));
- extendedDag.addNewNode(newRow, new ArrayList<>(missing.getValue()));
- }
- changed = setReadOnlyAnyDoubleRow(extendedDag, latestRows,locks);
- releaseLocks(changed,locks);
- releaseAllLocksExcept(row.getPartitionIndex(),locks);
- LockResult ownRow = locks.get(row.getPartitionIndex());
- return new OwnershipReturn(ownershipId, ownRow.getLockId(), ownRow.getIndex(),ranges,extendedDag);
+
+ //merge is necessary
+ List<MusicRangeInformationRow> changed = setReadOnlyAnyDoubleRow(currentlyOwned, locksForOwnership);
+ releaseLocks(changed, locksForOwnership);
+
+ Set<Range> ranges = extractRangesToOwn(currentlyOwned, locksForOwnership.keySet());
+
+ MusicRangeInformationRow createdRow = createAndAssignLock(ranges, locksForOwnership.keySet());
+ currentlyOwned.addNewNodeWithSearch(createdRow, ranges);
+ changed = setReadOnlyAnyDoubleRow(currentlyOwned, locksForOwnership);
+ releaseLocks(locksForOwnership);
+ return new OwnershipReturn(ownershipId, createdRow.getDBPartition().getLockId(), createdRow.getPartitionIndex(),
+ createdRow.getDBPartition().getSnapshot(), currentlyOwned);
+ }
+
+ private MusicRangeInformationRow createAndAssignLock(Set<Range> ranges, Set<UUID> prevPartitions) throws MDBCServiceException {
+ UUID newUUID = MDBCUtils.generateTimebasedUniqueKey();
+ DatabasePartition newPartition = new DatabasePartition(ranges,newUUID,null);
+ MusicRangeInformationRow row = new MusicRangeInformationRow(newPartition, true, prevPartitions);
+ createLockedMRIRow(row);
+ return row;
+ }
+
+ private Set<Range> extractRangesToOwn(Dag currentlyOwned, Set<UUID> UUIDs) {
+ HashSet<Range> ranges = new HashSet<>();
+ for (UUID uuid: UUIDs) {
+ ranges.addAll(currentlyOwned.getNode(uuid).getRow().getDBPartition().getSnapshot());
+ }
+ return ranges;
}
/**
@@ -2466,9 +2493,41 @@ public class MusicMixin implements MusicInterface {
}
}
+ @Deprecated //used only in testing, should use other method instead
public StateManager getStateManager() {
return stateManager;
}
+
+ @Override
+ public void createPartitionIfNeeded(Range rangeToCreate) throws MDBCServiceException {
+ List<MusicRangeInformationRow> allRows = getAllMriRows();
+ for (MusicRangeInformationRow row: allRows) {
+ if (row.getDBPartition().getSnapshot().contains(rangeToCreate)) {
+ //range already in MRI row, do not re-create
+ return;
+ }
+ }
+
+ MusicRangeInformationRow mriRow =
+ createAndAssignLock(new HashSet<Range>(Arrays.asList(rangeToCreate)), new HashSet<UUID>());
+ //TODO: should make sure we didn't create 2 new rows simultaneously, while we still own the lock
+ unlockKeyInMusic(musicRangeInformationTableName, mriRow.getPartitionIndex().toString(),
+ mriRow.getDBPartition().getLockId());
+ }
+
+ @Override
+ public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) {
+ String cql = String.format("INSERT INTO %s.%s (mdbcnode, mridigest, digestindex) VALUES ("
+ + this.myId + ", " + playbackPointer.getLeft() + ", " + playbackPointer.getRight() + ");",
+ music_ns, this.musicMdbcCheckpointsTableName);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ try {
+ MusicCore.nonKeyRelatedPut(pQueryObject,"eventual");
+ } catch (MusicServiceException e) {
+ logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to update the checkpoint location", e);
+ }
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
index 1faf281..ec91ceb 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
@@ -34,10 +34,10 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
-
+import java.util.UUID;
+import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONObject;
import org.json.JSONTokener;
-
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.Configuration;
@@ -48,7 +48,6 @@ import org.onap.music.mdbc.query.SQLOperation;
import org.onap.music.mdbc.query.SQLOperationType;
import org.onap.music.mdbc.tables.Operation;
import org.onap.music.mdbc.tables.StagingTable;
-
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.delete.Delete;
@@ -58,166 +57,184 @@ import sun.reflect.generics.reflectiveObjects.NotImplementedException;
/**
* This class provides the methods that MDBC needs in order to mirror data to/from a
- * <a href="https://dev.mysql.com/">MySQL</a> or <a href="http://mariadb.org/">MariaDB</a> database instance.
- * This class uses the <code>JSON_OBJECT()</code> database function, which means it requires the following
- * minimum versions of either database:
+ * <a href="https://dev.mysql.com/">MySQL</a> or <a href="http://mariadb.org/">MariaDB</a> database instance. This class
+ * uses the <code>JSON_OBJECT()</code> database function, which means it requires the following minimum versions of
+ * either database:
* <table summary="">
- * <tr><th>DATABASE</th><th>VERSION</th></tr>
- * <tr><td>MySQL</td><td>5.7.8</td></tr>
- * <tr><td>MariaDB</td><td>10.2.3 (Note: 10.2.3 is currently (July 2017) a <i>beta</i> release)</td></tr>
+ * <tr>
+ * <th>DATABASE</th>
+ * <th>VERSION</th>
+ * </tr>
+ * <tr>
+ * <td>MySQL</td>
+ * <td>5.7.8</td>
+ * </tr>
+ * <tr>
+ * <td>MariaDB</td>
+ * <td>10.2.3 (Note: 10.2.3 is currently (July 2017) a <i>beta</i> release)</td>
+ * </tr>
* </table>
*
* @author Robert P. Eby
*/
public class MySQLMixin implements DBInterface {
- private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MySQLMixin.class);
-
- public static final String MIXIN_NAME = "mysql";
- public static final String TRANS_TBL = "MDBC_TRANSLOG";
- private static final String CREATE_TBL_SQL =
- "CREATE TABLE IF NOT EXISTS "+TRANS_TBL+
- " (IX INT AUTO_INCREMENT, OP CHAR(1), SCHEMANAME VARCHAR(255), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA BLOB, " +
- "CONNECTION_ID INT, PRIMARY KEY (IX));";
-
- private final MusicInterface mi;
- private final int connId;
- private final String dbName;
- private final Connection jdbcConn;
- private final Map<String, TableInfo> tables;
- private PreparedStatement deleteStagingStatement;
- private boolean server_tbl_created = false;
- private boolean useAsyncStagingUpdate = false;
- private Object stagingHandlerLock = new Object();
- private AsyncUpdateHandler stagingHandler = null;
- private StagingTable currentStaging=null;
-
- public MySQLMixin() {
- this.mi = null;
- this.connId = 0;
- this.dbName = null;
- this.jdbcConn = null;
- this.tables = null;
- this.deleteStagingStatement = null;
- }
- public MySQLMixin(MusicInterface mi, String url, Connection conn, Properties info) throws SQLException {
- this.mi = mi;
- this.connId = generateConnID(conn);
- this.dbName = getDBName(conn);
- this.jdbcConn = conn;
- this.tables = new HashMap<String, TableInfo>();
- useAsyncStagingUpdate = Boolean.parseBoolean(info.getProperty(Configuration.KEY_ASYNC_STAGING_TABLE_UPDATE,
- Configuration.ASYNC_STAGING_TABLE_UPDATE));
- this.deleteStagingStatement = getStagingDeletePreparedStatement();
- }
-
- class StagingTableUpdateRunnable implements Runnable{
-
- private MySQLMixin mixin;
- private StagingTable staging;
-
- StagingTableUpdateRunnable(MySQLMixin mixin, StagingTable staging){
- this.mixin=mixin;
- this.staging=staging;
- }
-
- @Override
- public void run() {
- try {
- this.mixin.updateStagingTable(staging);
- } catch (NoSuchFieldException|MDBCServiceException e) {
- this.mixin.logger.error("Error when updating the staging table");
- }
- }
- }
-
- private PreparedStatement getStagingDeletePreparedStatement() throws SQLException {
- return jdbcConn.prepareStatement("DELETE FROM "+TRANS_TBL+" WHERE (IX BETWEEN ? AND ? ) AND " +
- "CONNECTION_ID = ?;");
- }
-
- // This is used to generate a unique connId for this connection to the DB.
- private int generateConnID(Connection conn) {
- int rv = (int) System.currentTimeMillis(); // random-ish
- try {
- Statement stmt = conn.createStatement();
- ResultSet rs = stmt.executeQuery("SELECT CONNECTION_ID() AS IX");
- if (rs.next()) {
- rv = rs.getInt("IX");
- }
- stmt.close();
- } catch (SQLException e) {
- logger.error(EELFLoggerDelegate.errorLogger,"generateConnID: problem generating a connection ID!");
- }
- return rv;
- }
-
- /**
- * Get the name of this DBnterface mixin object.
- * @return the name
- */
- @Override
- public String getMixinName() {
- return MIXIN_NAME;
- }
-
- @Override
- public void close() {
- // nothing yet
- }
-
- /**
- * Determines the db name associated with the connection
- * This is the private/internal method that actually determines the name
- * @param conn
- * @return
- */
- private String getDBName(Connection conn) {
- String dbname = "mdbc"; //default name
- try {
- Statement stmt = conn.createStatement();
- ResultSet rs = stmt.executeQuery("SELECT DATABASE() AS DB");
- if (rs.next()) {
- dbname = rs.getString("DB");
- }
- stmt.close();
- } catch (SQLException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "getDBName: problem getting database name from mysql");
- }
- return dbname;
- }
-
- @Override
- public String getDatabaseName() {
- return this.dbName;
- }
-
- @Override
- public String getSchema() {return this.dbName;}
-
-
- @Override
- public Set<String> getSQLTableSet() {
- Set<String> set = new TreeSet<String>();
- String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
- try {
- Statement stmt = jdbcConn.createStatement();
- ResultSet rs = stmt.executeQuery(sql);
- while (rs.next()) {
- String s = rs.getString("TABLE_NAME");
- set.add(s);
- }
- stmt.close();
- } catch (SQLException e) {
- logger.error(EELFLoggerDelegate.errorLogger,"getSQLTableSet: "+e);
- }
- logger.debug(EELFLoggerDelegate.applicationLogger,"getSQLTableSet returning: "+ set);
- return set;
- }
-
- @Override
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MySQLMixin.class);
+
+ public static final String MIXIN_NAME = "mysql";
+ public static final String TRANS_TBL = "MDBC_TRANSLOG";
+ private static final String CREATE_TBL_SQL = "CREATE TABLE IF NOT EXISTS " + TRANS_TBL
+ + " (IX INT AUTO_INCREMENT, OP CHAR(1), SCHEMANAME VARCHAR(255), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA BLOB, "
+ + "CONNECTION_ID INT, PRIMARY KEY (IX));";
+ private static final String CKPT_TBL = "MDBC_CHECKPOINT";
+ private static final String CREATE_CKPT_SQL =
+ "CREATE TABLE IF NOT EXISTS " + CKPT_TBL + " (RANGENAME VARCHAR(64) PRIMARY KEY, MRIROW VARCHAR(36), DIGESTINDEX INT);";
+
+ private final MusicInterface mi;
+ private final int connId;
+ private final String dbName;
+ private final Connection jdbcConn;
+ private final Map<String, TableInfo> tables;
+ private PreparedStatement deleteStagingStatement;
+ private boolean useAsyncStagingUpdate = false;
+ private Object stagingHandlerLock = new Object();
+ private AsyncUpdateHandler stagingHandler = null;
+ private StagingTable currentStaging = null;
+
+ public MySQLMixin() {
+ this.mi = null;
+ this.connId = 0;
+ this.dbName = null;
+ this.jdbcConn = null;
+ this.tables = null;
+ this.deleteStagingStatement = null;
+ }
+
+ public MySQLMixin(MusicInterface mi, String url, Connection conn, Properties info) throws SQLException {
+ this.mi = mi;
+ this.connId = generateConnID(conn);
+ this.dbName = getDBName(conn);
+ this.jdbcConn = conn;
+ this.tables = new HashMap<String, TableInfo>();
+ useAsyncStagingUpdate = Boolean.parseBoolean(info.getProperty(Configuration.KEY_ASYNC_STAGING_TABLE_UPDATE,
+ Configuration.ASYNC_STAGING_TABLE_UPDATE));
+ this.deleteStagingStatement = getStagingDeletePreparedStatement();
+ }
+
+ class StagingTableUpdateRunnable implements Runnable {
+
+ private MySQLMixin mixin;
+ private StagingTable staging;
+
+ StagingTableUpdateRunnable(MySQLMixin mixin, StagingTable staging) {
+ this.mixin = mixin;
+ this.staging = staging;
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.mixin.updateStagingTable(staging);
+ } catch (NoSuchFieldException | MDBCServiceException e) {
+ this.mixin.logger.error("Error when updating the staging table");
+ }
+ }
+ }
+
+ private PreparedStatement getStagingDeletePreparedStatement() throws SQLException {
+ return jdbcConn.prepareStatement(
+ "DELETE FROM " + TRANS_TBL + " WHERE (IX BETWEEN ? AND ? ) AND " + "CONNECTION_ID = ?;");
+ }
+
+ // This is used to generate a unique connId for this connection to the DB.
+ private int generateConnID(Connection conn) {
+ int rv = (int) System.currentTimeMillis(); // random-ish
+ try {
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT CONNECTION_ID() AS IX");
+ if (rs.next()) {
+ rv = rs.getInt("IX");
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "generateConnID: problem generating a connection ID!");
+ }
+ return rv;
+ }
+
+ /**
+ * Get the name of this DBnterface mixin object.
+ *
+ * @return the name
+ */
+ @Override
+ public String getMixinName() {
+ return MIXIN_NAME;
+ }
+
+ @Override
+ public void close() {
+ // nothing yet
+ }
+
+ /**
+ * Determines the db name associated with the connection This is the private/internal method that actually
+ * determines the name
+ *
+ * @param conn
+ * @return
+ */
+ private String getDBName(Connection conn) {
+ String dbname = "mdbc"; // default name
+ try {
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT DATABASE() AS DB");
+ if (rs.next()) {
+ dbname = rs.getString("DB");
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "getDBName: problem getting database name from mysql");
+ }
+ return dbname;
+ }
+
+ @Override
+ public String getDatabaseName() {
+ return this.dbName;
+ }
+
+ @Override
+ public String getSchema() {
+ return this.dbName;
+ }
+
+
+ @Deprecated
+ @Override
+ public Set<String> getSQLTableSet() {
+ Set<String> set = new TreeSet<String>();
+ String sql =
+ "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
+ try {
+ Statement stmt = jdbcConn.createStatement();
+ ResultSet rs = stmt.executeQuery(sql);
+ while (rs.next()) {
+ String s = rs.getString("TABLE_NAME");
+ set.add(s);
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "getSQLTableSet: " + e);
+ }
+ logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set);
+ return set;
+ }
+
+ @Override
public Set<Range> getSQLRangeSet() {
Set<String> set = new TreeSet<String>();
- String sql = "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
+ String sql =
+ "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
try {
Statement stmt = jdbcConn.createStatement();
ResultSet rs = stmt.executeQuery(sql);
@@ -227,766 +244,727 @@ public class MySQLMixin implements DBInterface {
}
stmt.close();
} catch (SQLException e) {
- logger.error(EELFLoggerDelegate.errorLogger,"getSQLTableSet: "+e);
+ logger.error(EELFLoggerDelegate.errorLogger, "getSQLTableSet: " + e);
}
- logger.debug(EELFLoggerDelegate.applicationLogger,"getSQLTableSet returning: "+ set);
+ logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set);
Set<Range> rangeSet = new HashSet<>();
- for (String table: set) {
+ for (String table : set) {
rangeSet.add(new Range(table));
}
return rangeSet;
}
-
-/*
-mysql> describe tables;
-+-----------------+---------------------+------+-----+---------+-------+
-| Field | Type | Null | Key | Default | Extra |
-+-----------------+---------------------+------+-----+---------+-------+
-| TABLE_CATALOG | varchar(512) | NO | | | |
-| TABLE_SCHEMA | varchar(64) | NO | | | |
-| TABLE_NAME | varchar(64) | NO | | | |
-| TABLE_TYPE | varchar(64) | NO | | | |
-| ENGINE | varchar(64) | YES | | NULL | |
-| VERSION | bigint(21) unsigned | YES | | NULL | |
-| ROW_FORMAT | varchar(10) | YES | | NULL | |
-| TABLE_ROWS | bigint(21) unsigned | YES | | NULL | |
-| AVG_ROW_LENGTH | bigint(21) unsigned | YES | | NULL | |
-| DATA_LENGTH | bigint(21) unsigned | YES | | NULL | |
-| MAX_DATA_LENGTH | bigint(21) unsigned | YES | | NULL | |
-| INDEX_LENGTH | bigint(21) unsigned | YES | | NULL | |
-| DATA_FREE | bigint(21) unsigned | YES | | NULL | |
-| AUTO_INCREMENT | bigint(21) unsigned | YES | | NULL | |
-| CREATE_TIME | datetime | YES | | NULL | |
-| UPDATE_TIME | datetime | YES | | NULL | |
-| CHECK_TIME | datetime | YES | | NULL | |
-| TABLE_COLLATION | varchar(32) | YES | | NULL | |
-| CHECKSUM | bigint(21) unsigned | YES | | NULL | |
-| CREATE_OPTIONS | varchar(255) | YES | | NULL | |
-| TABLE_COMMENT | varchar(2048) | NO | | | |
-+-----------------+---------------------+------+-----+---------+-------+
- */
- /**
- * Return a TableInfo object for the specified table.
- * This method first looks in a cache of previously constructed TableInfo objects for the table.
- * If not found, it queries the INFORMATION_SCHEMA.COLUMNS table to obtain the column names, types, and indexes of the table.
- * It creates a new TableInfo object with the results.
- * @param tableName the table to look up
- * @return a TableInfo object containing the info we need, or null if the table does not exist
- */
- @Override
- public TableInfo getTableInfo(String tableName) {
- TableInfo ti = tables.get(tableName);
- if (ti == null) {
- try {
- final String[] split = tableName.split("\\.");
- String tbl = (split.length==2)?split[1]:tableName;
- String localSchema = (split.length==2)?split[0]:getSchema();
- StringBuilder sql=new StringBuilder();
- sql.append("SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=");
- if(localSchema==null) {
- sql.append("DATABASE() AND TABLE_NAME='");
- }
- else {
- sql.append("'").append(localSchema).append("' AND TABLE_NAME='");
- }
- sql.append(tbl).append("';");
- ResultSet rs = executeSQLRead(sql.toString());
- if (rs != null) {
- ti = new TableInfo();
- while (rs.next()) {
- String name = rs.getString("COLUMN_NAME");
- String type = rs.getString("DATA_TYPE");
- String ckey = rs.getString("COLUMN_KEY");
- ti.columns.add(name);
- ti.coltype.add(mapDatatypeNameToType(type));
- ti.iskey.add(ckey != null && !ckey.equals(""));
- }
- rs.getStatement().close();
- } else {
- logger.error(EELFLoggerDelegate.errorLogger,"Cannot retrieve table info for table "+tableName+" from MySQL.");
- }
- } catch (SQLException e) {
- logger.error(EELFLoggerDelegate.errorLogger,"Cannot retrieve table info for table "+tableName+" from MySQL: "+e);
- return null;
- }
- tables.put(tableName, ti);
- }
- return ti;
- }
- // Map MySQL data type names to the java.sql.Types equivalent
- private int mapDatatypeNameToType(String nm) {
- switch (nm) {
- case "tinyint": return Types.TINYINT;
- case "smallint": return Types.SMALLINT;
- case "mediumint":
- case "int": return Types.INTEGER;
- case "bigint": return Types.BIGINT;
- case "decimal":
- case "numeric": return Types.DECIMAL;
- case "float": return Types.FLOAT;
- case "double": return Types.DOUBLE;
- case "date":
- case "datetime": return Types.DATE;
- case "time": return Types.TIME;
- case "timestamp": return Types.TIMESTAMP;
- case "char": return Types.CHAR;
- case "text":
- case "varchar": return Types.VARCHAR;
- case "mediumblob":
- case "longblob":
- case "blob": return Types.BLOB;
- default:
- logger.error(EELFLoggerDelegate.errorLogger,"unrecognized and/or unsupported data type "+nm);
- return Types.VARCHAR;
- }
- }
- @Override
- public void createSQLTriggers(String table) {
- final String[] split = table.split("\\.");
- String schemaName = (split.length==2)?split[0]:getSchema();
- String tableName = (split.length==2)?split[1]:table;
-
- if (tableName.equals(TRANS_TBL))
- // Don't create triggers for the table the triggers write into!!!
- return;
- try {
- if (!server_tbl_created) {
- try {
- Statement stmt = jdbcConn.createStatement();
- stmt.execute(CREATE_TBL_SQL);
- stmt.close();
-
- logger.info(EELFLoggerDelegate.applicationLogger,"createSQLTriggers: Server side dirty table created.");
- server_tbl_created = true;
- } catch (SQLException e) {
- logger.error(EELFLoggerDelegate.errorLogger,"createSQLTriggers: problem creating the "+TRANS_TBL+" table!");
- }
- }
-
- // Give the triggers a way to find this MSM
- for (String name : getTriggerNames(tableName)) {
- logger.info(EELFLoggerDelegate.applicationLogger,"ADD trigger "+name+" to msm_map");
- //\TODO fix this is an error
- //msm.register(name);
- }
- // No SELECT trigger
- executeSQLWrite(generateTrigger(schemaName,tableName, "INSERT"));
- executeSQLWrite(generateTrigger(schemaName,tableName, "UPDATE"));
- //\TODO: save key row instead of the whole row for delete
- executeSQLWrite(generateTrigger(schemaName,tableName, "DELETE"));
- } catch (SQLException e) {
- if (e.getMessage().equals("Trigger already exists") || e.getMessage().endsWith("already exists")){
- //only warn if trigger already exists
- logger.warn(EELFLoggerDelegate.applicationLogger, "createSQLTriggers" + e);
- } else {
- logger.error(EELFLoggerDelegate.errorLogger,"createSQLTriggers: "+e);
- }
- }
- }
-/*
-CREATE TRIGGER `triggername` BEFORE UPDATE ON `table`
-FOR EACH ROW BEGIN
-INSERT INTO `log_table` ( `field1` `field2`, ...) VALUES ( NEW.`field1`, NEW.`field2`, ...) ;
-END;
-
-OLD.field refers to the old value
-NEW.field refers to the new value
-*/
- private String generateTrigger(String schema, String tableName, String op) {
- boolean isdelete = op.equals("DELETE");
- boolean isinsert = op.equals("INSERT");
- boolean isupdate = op.equals("UPDATE");
- TableInfo ti = getTableInfo(tableName);
- StringBuilder newJson = new StringBuilder("JSON_OBJECT("); // JSON_OBJECT(key, val, key, val) page 1766
- StringBuilder keyJson = new StringBuilder("JSON_OBJECT("); // JSON_OBJECT(key, val, key, val) page 1766
- String pfx = "";
- String kfx = "";
- for (String col : ti.columns) {
- newJson.append(pfx)
- .append("'").append(col).append("', ")
- .append(isdelete ? "OLD." : "NEW.")
- .append(col);
- if (ti.iskey(col) || !ti.hasKey()) {
- keyJson.append(kfx)
- .append("'").append(col).append("', ")
- .append(isinsert ? "NEW." : "OLD.")
- .append(col);
- kfx = ", ";
- }
- pfx = ", ";
- }
- newJson.append(")");
- keyJson.append(")");
- //\TODO check if using mysql driver, so instead check the exception
- //\TODO add conditional for update, if primary key is still the same, use null in the KEYDATA col
- StringBuilder sb = new StringBuilder()
- .append("CREATE TRIGGER ") // IF NOT EXISTS not supported by MySQL!
- .append(String.format("%s_%s", op.substring(0, 1), tableName))
- .append(" AFTER ")
- .append(op)
- .append(" ON ")
- .append(tableName)
- .append(" FOR EACH ROW INSERT INTO ")
- .append(TRANS_TBL)
- .append(" (SCHEMANAME, TABLENAME, OP, KEYDATA, ROWDATA, CONNECTION_ID) VALUES('")
- .append( (schema==null)?this.getSchema():schema )
- .append("', '")
- .append(tableName)
- .append("', ")
- .append(isdelete ? "'D'" : (op.equals("INSERT") ? "'I'" : "'U'"))
- .append(", ")
- .append( (keyJson.length()>"JSON_OBJECT()".length()) ? keyJson.toString() : "NULL")
- .append(", ")
- .append(newJson.toString())
- .append(", ")
- .append("CONNECTION_ID()")
- .append(")");
- return sb.toString();
- }
- private String[] getTriggerNames(String tableName) {
- return new String[] {
- "I_" + tableName, // INSERT trigger
- "U_" + tableName, // UPDATE trigger
- "D_" + tableName // DELETE trigger
- };
- }
-
- @Override
- public void dropSQLTriggers(String tableName) {
- try {
- for (String name : getTriggerNames(tableName)) {
- logger.info(EELFLoggerDelegate.applicationLogger,"REMOVE trigger "+name+" from msmmap");
- executeSQLWrite("DROP TRIGGER IF EXISTS " +name);
- //\TODO Fix this is an error
- //msm.unregister(name);
- }
- } catch (SQLException e) {
- logger.error(EELFLoggerDelegate.errorLogger,"dropSQLTriggers: "+e);
- }
- }
-
- @Override
- public void insertRowIntoSqlDb(String tableName, Map<String, Object> map) {
- TableInfo ti = getTableInfo(tableName);
- String sql = "";
- if (rowExists(tableName, ti, map)) {
- // Update - Construct the what and where strings for the DB write
- StringBuilder what = new StringBuilder();
- StringBuilder where = new StringBuilder();
- String pfx = "";
- String pfx2 = "";
- for (int i = 0; i < ti.columns.size(); i++) {
- String col = ti.columns.get(i);
- String val = Utils.getStringValue(map.get(col));
- if (ti.iskey.get(i)) {
- where.append(pfx).append(col).append("=").append(val);
- pfx = " AND ";
- } else {
- what.append(pfx2).append(col).append("=").append(val);
- pfx2 = ", ";
- }
- }
- sql = String.format("UPDATE %s SET %s WHERE %s", tableName, what.toString(), where.toString());
- } else {
- // Construct the value string and column name string for the DB write
- StringBuilder fields = new StringBuilder();
- StringBuilder values = new StringBuilder();
- String pfx = "";
- for (String col : ti.columns) {
- fields.append(pfx).append(col);
- values.append(pfx).append(Utils.getStringValue(map.get(col)));
- pfx = ", ";
- }
- sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString());
- }
- try {
- executeSQLWrite(sql);
- } catch (SQLException e1) {
- logger.error(EELFLoggerDelegate.errorLogger,"executeSQLWrite: "+e1);
- }
- // TODO - remove any entries from MDBC_TRANSLOG corresponding to this update
- // SELECT IX, OP, KEYDATA FROM MDBC_TRANS_TBL WHERE CONNID = "+connId AND TABLENAME = tblname
- }
-
- private boolean rowExists(String tableName, TableInfo ti, Map<String, Object> map) {
- StringBuilder where = new StringBuilder();
- String pfx = "";
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- String col = ti.columns.get(i);
- String val = Utils.getStringValue(map.get(col));
- where.append(pfx).append(col).append("=").append(val);
- pfx = " AND ";
- }
- }
- String sql = String.format("SELECT * FROM %s WHERE %s", tableName, where.toString());
- ResultSet rs = executeSQLRead(sql);
- try {
- boolean rv = rs.next();
- rs.close();
- return rv;
- } catch (SQLException e) {
- return false;
- }
- }
-
-
- @Override
- public void deleteRowFromSqlDb(String tableName, Map<String, Object> map) {
- TableInfo ti = getTableInfo(tableName);
- StringBuilder where = new StringBuilder();
- String pfx = "";
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- String col = ti.columns.get(i);
- Object val = map.get(col);
- where.append(pfx).append(col).append("=").append(Utils.getStringValue(val));
- pfx = " AND ";
- }
- }
- try {
- String sql = String.format("DELETE FROM %s WHERE %s", tableName, where.toString());
- executeSQLWrite(sql);
- } catch (SQLException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * This method executes a read query in the SQL database. Methods that call this method should be sure
- * to call resultset.getStatement().close() when done in order to free up resources.
- * @param sql the query to run
- * @return a ResultSet containing the rows returned from the query
- */
- @Override
- public ResultSet executeSQLRead(String sql) {
- logger.debug(EELFLoggerDelegate.applicationLogger,"executeSQLRead");
- logger.debug("Executing SQL read:"+ sql);
- ResultSet rs = null;
- try {
- Statement stmt = jdbcConn.createStatement();
- rs = stmt.executeQuery(sql);
- } catch (SQLException e) {
- logger.error(EELFLoggerDelegate.errorLogger,"executeSQLRead"+e);
- }
- return rs;
- }
-
- @Override
- public void preCommitHook() {
- synchronized (stagingHandlerLock){
- //\TODO check if this can potentially block forever in certain scenarios
- if(stagingHandler!=null){
- stagingHandler.waitForAllPendingUpdates();
- }
- }
- }
-
- /**
- * This method executes a write query in the sql database.
- * @param sql the SQL to be sent to MySQL
- * @throws SQLException if an underlying JDBC method throws an exception
- */
- protected void executeSQLWrite(String sql) throws SQLException {
- logger.debug(EELFLoggerDelegate.applicationLogger, "Executing SQL write:"+ sql);
-
- Statement stmt = jdbcConn.createStatement();
- stmt.execute(sql);
- stmt.close();
- }
-
- /**
- * Code to be run within the DB driver before a SQL statement is executed. This is where tables
- * can be synchronized before a SELECT, for those databases that do not support SELECT triggers.
- * @param sql the SQL statement that is about to be executed
- * @return list of keys that will be updated, if they can't be determined afterwards (i.e. sql table doesn't have primary key)
- */
- @Override
- public void preStatementHook(final String sql) {
- if (sql == null) {
- return;
- }
- String cmd = sql.trim().toLowerCase();
- if (cmd.startsWith("select")) {
- String[] parts = sql.trim().split(" ");
- Set<String> set = getSQLTableSet();
- for (String part : parts) {
- if (set.contains(part.toUpperCase())) {
- // Found a candidate table name in the SELECT SQL -- update this table
- //msm.readDirtyRowsAndUpdateDb(part);
- }
- }
- }
- }
-
- /**
- * Code to be run within the DB driver after a SQL statement has been executed. This is where remote
- * statement actions can be copied back to Cassandra/MUSIC.
- * @param sql the SQL statement that was executed
- */
- @Override
- public void postStatementHook(final String sql,StagingTable transactionDigest) {
- if (sql != null) {
- String[] parts = sql.trim().split(" ");
- String cmd = parts[0].toLowerCase();
- if ("delete".equals(cmd) || "insert".equals(cmd) || "update".equals(cmd)) {
- if (useAsyncStagingUpdate) {
- synchronized (stagingHandlerLock){
- if(stagingHandler==null||currentStaging!=transactionDigest){
- Runnable newRunnable = new StagingTableUpdateRunnable(this, transactionDigest);
- currentStaging=transactionDigest;
- stagingHandler=new AsyncUpdateHandler(newRunnable);
- }
- //else we can keep using the current staging Handler
- }
- stagingHandler.processNewUpdate();
- } else {
-
- try {
- this.updateStagingTable(transactionDigest);
- } catch (NoSuchFieldException | MDBCServiceException e) {
- // TODO Auto-generated catch block
- this.logger.error("Error updating the staging table");
- }
- }
- }
- }
- }
-
- private SQLOperation toOpEnum(String operation) throws NoSuchFieldException {
- switch (operation.toLowerCase()) {
- case "i":
- return SQLOperation.INSERT;
- case "d":
- return SQLOperation.DELETE;
- case "u":
- return SQLOperation.UPDATE;
- case "s":
- return SQLOperation.SELECT;
- default:
- logger.error(EELFLoggerDelegate.errorLogger,"Invalid operation selected: ["+operation+"]");
- throw new NoSuchFieldException("Invalid operation enum");
- }
-
- }
- /**
- * Copy data that is in transaction table into music interface
- * @param transactionDigests
- * @throws NoSuchFieldException
- */
- private void updateStagingTable(StagingTable transactionDigests)
- throws NoSuchFieldException, MDBCServiceException {
- // copy from DB.MDBC_TRANSLOG where connid == myconnid
- // then delete from MDBC_TRANSLOG
- String sql2 = "SELECT IX, SCHEMANAME, TABLENAME, OP, ROWDATA, KEYDATA FROM " + TRANS_TBL +" WHERE CONNECTION_ID = " + this.connId;
- Integer biggestIx = Integer.MIN_VALUE;
- Integer smallestIx = Integer.MAX_VALUE;
- try {
- ResultSet rs = executeSQLRead(sql2);
- Set<Integer> rows = new TreeSet<Integer>();
- while (rs.next()) {
- int ix = rs.getInt("IX");
- biggestIx = Integer.max(biggestIx,ix);
- smallestIx = Integer.min(smallestIx,ix);
- String op = rs.getString("OP");
- SQLOperation opType = toOpEnum(op);
- String schema= rs.getString("SCHEMANAME");
- String tbl = rs.getString("TABLENAME");
- String newRowStr = rs.getString("ROWDATA");
- String rowStr = rs.getString("KEYDATA");
- Range range = new Range(schema+"."+tbl);
- transactionDigests.addOperation(range,opType,newRowStr,rowStr);
- rows.add(ix);
- }
- rs.getStatement().close();
- // batch delete operations
- if (rows.size() > 0) {
- this.deleteStagingStatement.setInt(1,smallestIx);
- this.deleteStagingStatement.setInt(2,biggestIx);
- this.deleteStagingStatement.setInt(3,this.connId);
- logger.debug("Staging delete: Executing with vals ["+smallestIx+","+biggestIx+","+this.connId+"]");
- this.deleteStagingStatement.execute();
- }
- } catch (SQLException e) {
- logger.warn("Exception in postStatementHook: "+e);
- e.printStackTrace();
- }
- }
-
-
-
- /**
- * Update music with data from MySQL table
- *
- * @param tableName - name of table to update in music
- */
- @Override
- public void synchronizeData(String tableName) {
- ResultSet rs = null;
- TableInfo ti = getTableInfo(tableName);
- String query = "SELECT * FROM "+tableName;
-
- try {
- rs = executeSQLRead(query);
- if(rs==null) return;
- while(rs.next()) {
-
- JSONObject jo = new JSONObject();
- if (!getTableInfo(tableName).hasKey()) {
- String musicKey = MDBCUtils.generateUniqueKey().toString();
- jo.put(mi.getMusicDefaultPrimaryKeyName(), musicKey);
- }
-
- for (String col : ti.columns) {
- jo.put(col, rs.getString(col));
- }
-
- @SuppressWarnings("unused")
- Object[] row = Utils.jsonToRow(ti,tableName, jo, mi.getMusicDefaultPrimaryKeyName());
- //\FIXME this is wrong now, update of the dirty row and entity is now handled by the archival process
- //msm.updateDirtyRowAndEntityTableInMusic(ti,tableName, jo);
- }
- } catch (Exception e) {
- logger.error(EELFLoggerDelegate.errorLogger, "synchronizing data " + tableName +
- " -> " + e.getMessage());
- }
- finally {
- try {
- if(rs!=null) {
- rs.close();
- }
- } catch (SQLException e) {
- //continue
- }
- }
-
- }
-
- /**
- * Return a list of "reserved" names, that should not be used by MySQL client/MUSIC
- * These are reserved for mdbc
- */
- @Override
- public List<String> getReservedTblNames() {
- ArrayList<String> rsvdTables = new ArrayList<String>();
- rsvdTables.add(TRANS_TBL);
- //Add others here as necessary
- return rsvdTables;
- }
- @Override
- public String getPrimaryKey(String sql, String tableName) {
- //
- return null;
- }
-
-
- public String applyDigest(Map<Range, StagingTable> digest){
- throw new NotImplementedException();
- }
-
- @SuppressWarnings("unused")
- @Deprecated
- private ArrayList<String> getMusicKey(String sql) {
- try {
- net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql);
- if (stmt instanceof Insert) {
- Insert s = (Insert) stmt;
- String tbl = s.getTable().getName();
- return getMusicKey(tbl, "INSERT", sql);
- } else if (stmt instanceof Update){
- Update u = (Update) stmt;
- String tbl = u.getTables().get(0).getName();
- return getMusicKey(tbl, "UPDATE", sql);
- } else if (stmt instanceof Delete) {
- Delete d = (Delete) stmt;
- //TODO: IMPLEMENT
- String tbl = d.getTable().getName();
- return getMusicKey(tbl, "DELETE", sql);
- } else {
- System.err.println("Not recognized sql type");
- }
-
- } catch (JSQLParserException e) {
-
- e.printStackTrace();
- }
- //Something went wrong here
- return new ArrayList<String>();
- }
-
- /**
- * Returns all keys that matches the current sql statement, and not in already updated keys.
- *
- * @param tbl
- * @param cmd
- * @param sql
- */
- @Deprecated
- private ArrayList<String> getMusicKey(String tbl, String cmd, String sql) {
- ArrayList<String> musicKeys = new ArrayList<String>();
- /*
- if (cmd.equalsIgnoreCase("insert")) {
- //create key, return key
- musicKeys.add(msm.generatePrimaryKey());
- } else if (cmd.equalsIgnoreCase("update") || cmd.equalsIgnoreCase("delete")) {
- try {
- net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql);
- String where;
- if (stmt instanceof Update) {
- where = ((Update) stmt).getWhere().toString();
- } else if (stmt instanceof Delete) {
- where = ((Delete) stmt).getWhere().toString();
- } else {
- System.err.println("Unknown type: " +stmt.getClass());
- where = "";
- }
- ResultSet rs = executeSQLRead("SELECT * FROM " + tbl + " WHERE " + where);
- musicKeys = msm.getMusicKeysWhere(tbl, Utils.parseResults(getTableInfo(tbl), rs));
- } catch (JSQLParserException e) {
-
- e.printStackTrace();
- } catch (SQLException e) {
- //Not a valid sql query
- e.printStackTrace();
- }
- }
- */
- return musicKeys;
- }
-
-
- @Deprecated
- public void insertRowIntoSqlDbOLD(String tableName, Map<String, Object> map) {
- // First construct the value string and column name string for the db write
- TableInfo ti = getTableInfo(tableName);
- StringBuilder fields = new StringBuilder();
- StringBuilder values = new StringBuilder();
- String pfx = "";
- for (String col : ti.columns) {
- fields.append(pfx).append(col);
- values.append(pfx).append(Utils.getStringValue(map.get(col)));
- pfx = ", ";
- }
-
- try {
- String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString());
- executeSQLWrite(sql);
- } catch (SQLException e) {
- logger.error(EELFLoggerDelegate.errorLogger,"Insert failed because row exists, do an update");
- StringBuilder where = new StringBuilder();
- pfx = "";
- String pfx2 = "";
- fields.setLength(0);
- for (int i = 0; i < ti.columns.size(); i++) {
- String col = ti.columns.get(i);
- String val = Utils.getStringValue(map.get(col));
- if (ti.iskey.get(i)) {
- where.append(pfx).append(col).append("=").append(val);
- pfx = " AND ";
- } else {
- fields.append(pfx2).append(col).append("=").append(val);
- pfx2 = ", ";
- }
- }
- String sql = String.format("UPDATE %s SET %s WHERE %s", tableName, fields.toString(), where.toString());
- try {
- executeSQLWrite(sql);
- } catch (SQLException e1) {
- logger.error(EELFLoggerDelegate.errorLogger,"executeSQLWrite"+e1);
- }
- }
- }
-
- /**
- * Parse the transaction digest into individual events
- * @param transaction - base 64 encoded, serialized digest
- * @throws MDBCServiceException
- */
- public void replayTransaction(StagingTable transaction, List<Range> ranges) throws SQLException, MDBCServiceException {
- boolean autocommit = jdbcConn.getAutoCommit();
- jdbcConn.setAutoCommit(false);
- Statement jdbcStmt = jdbcConn.createStatement();
- ArrayList<Operation> opList = transaction.getOperationList();
-
- for (Operation op: opList) {
- if(Range.overlaps(ranges,op.getTable())) {
- try {
- replayOperationIntoDB(jdbcStmt, op);
- } catch (SQLException | MDBCServiceException e) {
- //rollback transaction
- logger.error("Unable to replay: " + op.getOperationType() + "->" + op.getVal() + "."
- + "Rolling back the entire digest replay.");
- jdbcConn.rollback();
- throw e;
- }
- }
- }
-
- clearReplayedOperations(jdbcStmt);
- jdbcConn.commit();
- jdbcStmt.close();
-
- jdbcConn.setAutoCommit(autocommit);
+
+ /*
+ * mysql> describe tables; +-----------------+---------------------+------+-----+---------+-------+ | Field | Type |
+ * Null | Key | Default | Extra | +-----------------+---------------------+------+-----+---------+-------+ |
+ * TABLE_CATALOG | varchar(512) | NO | | | | | TABLE_SCHEMA | varchar(64) | NO | | | | | TABLE_NAME | varchar(64) |
+ * NO | | | | | TABLE_TYPE | varchar(64) | NO | | | | | ENGINE | varchar(64) | YES | | NULL | | | VERSION |
+ * bigint(21) unsigned | YES | | NULL | | | ROW_FORMAT | varchar(10) | YES | | NULL | | | TABLE_ROWS | bigint(21)
+ * unsigned | YES | | NULL | | | AVG_ROW_LENGTH | bigint(21) unsigned | YES | | NULL | | | DATA_LENGTH | bigint(21)
+ * unsigned | YES | | NULL | | | MAX_DATA_LENGTH | bigint(21) unsigned | YES | | NULL | | | INDEX_LENGTH |
+ * bigint(21) unsigned | YES | | NULL | | | DATA_FREE | bigint(21) unsigned | YES | | NULL | | | AUTO_INCREMENT |
+ * bigint(21) unsigned | YES | | NULL | | | CREATE_TIME | datetime | YES | | NULL | | | UPDATE_TIME | datetime | YES
+ * | | NULL | | | CHECK_TIME | datetime | YES | | NULL | | | TABLE_COLLATION | varchar(32) | YES | | NULL | | |
+ * CHECKSUM | bigint(21) unsigned | YES | | NULL | | | CREATE_OPTIONS | varchar(255) | YES | | NULL | | |
+ * TABLE_COMMENT | varchar(2048) | NO | | | |
+ * +-----------------+---------------------+------+-----+---------+-------+
+ */
+ /**
+ * Return a TableInfo object for the specified table. This method first looks in a cache of previously constructed
+ * TableInfo objects for the table. If not found, it queries the INFORMATION_SCHEMA.COLUMNS table to obtain the
+ * column names, types, and indexes of the table. It creates a new TableInfo object with the results.
+ *
+ * @param tableName the table to look up
+ * @return a TableInfo object containing the info we need, or null if the table does not exist
+ */
+ @Override
+ public TableInfo getTableInfo(String tableName) {
+ TableInfo ti = tables.get(tableName);
+ if (ti == null) {
+ try {
+ final String[] split = tableName.split("\\.");
+ String tbl = (split.length == 2) ? split[1] : tableName;
+ String localSchema = (split.length == 2) ? split[0] : getSchema();
+ StringBuilder sql = new StringBuilder();
+ sql.append(
+ "SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=");
+ if (localSchema == null) {
+ sql.append("DATABASE() AND TABLE_NAME='");
+ } else {
+ sql.append("'").append(localSchema).append("' AND TABLE_NAME='");
+ }
+ sql.append(tbl).append("';");
+ ResultSet rs = executeSQLRead(sql.toString());
+ if (rs != null) {
+ ti = new TableInfo();
+ while (rs.next()) {
+ String name = rs.getString("COLUMN_NAME");
+ String type = rs.getString("DATA_TYPE");
+ String ckey = rs.getString("COLUMN_KEY");
+ ti.columns.add(name);
+ ti.coltype.add(mapDatatypeNameToType(type));
+ ti.iskey.add(ckey != null && !ckey.equals(""));
+ }
+ rs.getStatement().close();
+ } else {
+ logger.error(EELFLoggerDelegate.errorLogger,
+ "Cannot retrieve table info for table " + tableName + " from MySQL.");
+ }
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,
+ "Cannot retrieve table info for table " + tableName + " from MySQL: " + e);
+ return null;
+ }
+ tables.put(tableName, ti);
+ }
+ return ti;
}
-
- @Override
- public void disableForeignKeyChecks() throws SQLException {
- Statement disable = jdbcConn.createStatement();
- disable.execute("SET FOREIGN_KEY_CHECKS=0");
- disable.closeOnCompletion();
- }
-
- @Override
- public void enableForeignKeyChecks() throws SQLException {
+
+ // Map MySQL data type names to the java.sql.Types equivalent
+ private int mapDatatypeNameToType(String nm) {
+ switch (nm) {
+ case "tinyint":
+ return Types.TINYINT;
+ case "smallint":
+ return Types.SMALLINT;
+ case "mediumint":
+ case "int":
+ return Types.INTEGER;
+ case "bigint":
+ return Types.BIGINT;
+ case "decimal":
+ case "numeric":
+ return Types.DECIMAL;
+ case "float":
+ return Types.FLOAT;
+ case "double":
+ return Types.DOUBLE;
+ case "date":
+ case "datetime":
+ return Types.DATE;
+ case "time":
+ return Types.TIME;
+ case "timestamp":
+ return Types.TIMESTAMP;
+ case "char":
+ return Types.CHAR;
+ case "text":
+ case "varchar":
+ return Types.VARCHAR;
+ case "mediumblob":
+ case "longblob":
+ case "blob":
+ return Types.BLOB;
+ default:
+ logger.error(EELFLoggerDelegate.errorLogger, "unrecognized and/or unsupported data type " + nm);
+ return Types.VARCHAR;
+ }
+ }
+
+ @Override
+ public void createSQLTriggers(String table) {
+ final String[] split = table.split("\\.");
+ String schemaName = (split.length == 2) ? split[0] : getSchema();
+ String tableName = (split.length == 2) ? split[1] : table;
+
+ if (getReservedTblNames().contains(tableName)) {
+ // Don't create triggers for the table the triggers write into!!!
+ return;
+ }
+ try {
+ // Give the triggers a way to find this MSM
+ for (String name : getTriggerNames(tableName)) {
+ logger.info(EELFLoggerDelegate.applicationLogger, "ADD trigger " + name + " to msm_map");
+ // \TODO fix this is an error
+ // msm.register(name);
+ }
+ // No SELECT trigger
+ executeSQLWrite(generateTrigger(schemaName, tableName, "INSERT"));
+ executeSQLWrite(generateTrigger(schemaName, tableName, "UPDATE"));
+ // \TODO: save key row instead of the whole row for delete
+ executeSQLWrite(generateTrigger(schemaName, tableName, "DELETE"));
+ } catch (SQLException e) {
+ if (e.getMessage().equals("Trigger already exists") || e.getMessage().endsWith("already exists")) {
+ // only warn if trigger already exists
+ logger.warn(EELFLoggerDelegate.applicationLogger, "createSQLTriggers" + e);
+ } else {
+ logger.error(EELFLoggerDelegate.errorLogger, "createSQLTriggers: " + e);
+ }
+ }
+ }
+
+ /*
+ * CREATE TRIGGER `triggername` BEFORE UPDATE ON `table` FOR EACH ROW BEGIN INSERT INTO `log_table` ( `field1`
+ * `field2`, ...) VALUES ( NEW.`field1`, NEW.`field2`, ...) ; END;
+ *
+ * OLD.field refers to the old value NEW.field refers to the new value
+ */
+ private String generateTrigger(String schema, String tableName, String op) {
+ boolean isdelete = op.equals("DELETE");
+ boolean isinsert = op.equals("INSERT");
+ boolean isupdate = op.equals("UPDATE");
+ TableInfo ti = getTableInfo(tableName);
+ StringBuilder newJson = new StringBuilder("JSON_OBJECT("); // JSON_OBJECT(key, val, key, val) page 1766
+ StringBuilder keyJson = new StringBuilder("JSON_OBJECT("); // JSON_OBJECT(key, val, key, val) page 1766
+ String pfx = "";
+ String kfx = "";
+ for (String col : ti.columns) {
+ newJson.append(pfx).append("'").append(col).append("', ").append(isdelete ? "OLD." : "NEW.").append(col);
+ if (ti.iskey(col) || !ti.hasKey()) {
+ keyJson.append(kfx).append("'").append(col).append("', ").append(isinsert ? "NEW." : "OLD.")
+ .append(col);
+ kfx = ", ";
+ }
+ pfx = ", ";
+ }
+ newJson.append(")");
+ keyJson.append(")");
+ // \TODO check if using mysql driver, so instead check the exception
+ // \TODO add conditional for update, if primary key is still the same, use null in the KEYDATA col
+ StringBuilder sb = new StringBuilder().append("CREATE TRIGGER ") // IF NOT EXISTS not supported by MySQL!
+ .append(String.format("%s_%s", op.substring(0, 1), tableName)).append(" AFTER ").append(op)
+ .append(" ON ").append(tableName).append(" FOR EACH ROW INSERT INTO ").append(TRANS_TBL)
+ .append(" (SCHEMANAME, TABLENAME, OP, KEYDATA, ROWDATA, CONNECTION_ID) VALUES('")
+ .append((schema == null) ? this.getSchema() : schema).append("', '").append(tableName).append("', ")
+ .append(isdelete ? "'D'" : (op.equals("INSERT") ? "'I'" : "'U'")).append(", ")
+ .append((keyJson.length() > "JSON_OBJECT()".length()) ? keyJson.toString() : "NULL").append(", ")
+ .append(newJson.toString()).append(", ").append("CONNECTION_ID()").append(")");
+ return sb.toString();
+ }
+
+ private String[] getTriggerNames(String tableName) {
+ return new String[] {"I_" + tableName, // INSERT trigger
+ "U_" + tableName, // UPDATE trigger
+ "D_" + tableName // DELETE trigger
+ };
+ }
+
+ @Override
+ public void dropSQLTriggers(String tableName) {
+ try {
+ for (String name : getTriggerNames(tableName)) {
+ logger.info(EELFLoggerDelegate.applicationLogger, "REMOVE trigger " + name + " from msmmap");
+ executeSQLWrite("DROP TRIGGER IF EXISTS " + name);
+ // \TODO Fix this is an error
+ // msm.unregister(name);
+ }
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "dropSQLTriggers: " + e);
+ }
+ }
+
+ @Override
+ public void insertRowIntoSqlDb(String tableName, Map<String, Object> map) {
+ TableInfo ti = getTableInfo(tableName);
+ String sql = "";
+ if (rowExists(tableName, ti, map)) {
+ // Update - Construct the what and where strings for the DB write
+ StringBuilder what = new StringBuilder();
+ StringBuilder where = new StringBuilder();
+ String pfx = "";
+ String pfx2 = "";
+ for (int i = 0; i < ti.columns.size(); i++) {
+ String col = ti.columns.get(i);
+ String val = Utils.getStringValue(map.get(col));
+ if (ti.iskey.get(i)) {
+ where.append(pfx).append(col).append("=").append(val);
+ pfx = " AND ";
+ } else {
+ what.append(pfx2).append(col).append("=").append(val);
+ pfx2 = ", ";
+ }
+ }
+ sql = String.format("UPDATE %s SET %s WHERE %s", tableName, what.toString(), where.toString());
+ } else {
+ // Construct the value string and column name string for the DB write
+ StringBuilder fields = new StringBuilder();
+ StringBuilder values = new StringBuilder();
+ String pfx = "";
+ for (String col : ti.columns) {
+ fields.append(pfx).append(col);
+ values.append(pfx).append(Utils.getStringValue(map.get(col)));
+ pfx = ", ";
+ }
+ sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString());
+ }
+ try {
+ executeSQLWrite(sql);
+ } catch (SQLException e1) {
+ logger.error(EELFLoggerDelegate.errorLogger, "executeSQLWrite: " + e1);
+ }
+ // TODO - remove any entries from MDBC_TRANSLOG corresponding to this update
+ // SELECT IX, OP, KEYDATA FROM MDBC_TRANS_TBL WHERE CONNID = "+connId AND TABLENAME = tblname
+ }
+
+ private boolean rowExists(String tableName, TableInfo ti, Map<String, Object> map) {
+ StringBuilder where = new StringBuilder();
+ String pfx = "";
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ String col = ti.columns.get(i);
+ String val = Utils.getStringValue(map.get(col));
+ where.append(pfx).append(col).append("=").append(val);
+ pfx = " AND ";
+ }
+ }
+ String sql = String.format("SELECT * FROM %s WHERE %s", tableName, where.toString());
+ ResultSet rs = executeSQLRead(sql);
+ try {
+ boolean rv = rs.next();
+ rs.close();
+ return rv;
+ } catch (SQLException e) {
+ return false;
+ }
+ }
+
+
+ @Override
+ public void deleteRowFromSqlDb(String tableName, Map<String, Object> map) {
+ TableInfo ti = getTableInfo(tableName);
+ StringBuilder where = new StringBuilder();
+ String pfx = "";
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ String col = ti.columns.get(i);
+ Object val = map.get(col);
+ where.append(pfx).append(col).append("=").append(Utils.getStringValue(val));
+ pfx = " AND ";
+ }
+ }
+ try {
+ String sql = String.format("DELETE FROM %s WHERE %s", tableName, where.toString());
+ executeSQLWrite(sql);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * This method executes a read query in the SQL database. Methods that call this method should be sure to call
+ * resultset.getStatement().close() when done in order to free up resources.
+ *
+ * @param sql the query to run
+ * @return a ResultSet containing the rows returned from the query
+ */
+ @Override
+ public ResultSet executeSQLRead(String sql) {
+ logger.debug(EELFLoggerDelegate.applicationLogger, "executeSQLRead");
+ logger.debug("Executing SQL read:" + sql);
+ ResultSet rs = null;
+ try {
+ Statement stmt = jdbcConn.createStatement();
+ rs = stmt.executeQuery(sql);
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "executeSQLRead" + e);
+ }
+ return rs;
+ }
+
+ @Override
+ public void preCommitHook() {
+ synchronized (stagingHandlerLock) {
+ // \TODO check if this can potentially block forever in certain scenarios
+ if (stagingHandler != null) {
+ stagingHandler.waitForAllPendingUpdates();
+ }
+ }
+ }
+
+ /**
+ * This method executes a write query in the sql database.
+ *
+ * @param sql the SQL to be sent to MySQL
+ * @throws SQLException if an underlying JDBC method throws an exception
+ */
+ protected void executeSQLWrite(String sql) throws SQLException {
+ logger.debug(EELFLoggerDelegate.applicationLogger, "Executing SQL write:" + sql);
+
+ Statement stmt = jdbcConn.createStatement();
+ stmt.execute(sql);
+ stmt.close();
+ }
+
+ /**
+ * Code to be run within the DB driver before a SQL statement is executed. This is where tables can be synchronized
+ * before a SELECT, for those databases that do not support SELECT triggers.
+ *
+ * @param sql the SQL statement that is about to be executed
+ * @return list of keys that will be updated, if they can't be determined afterwards (i.e. sql table doesn't have
+ * primary key)
+ */
+ @Override
+ public void preStatementHook(final String sql) {
+ if (sql == null) {
+ return;
+ }
+ String cmd = sql.trim().toLowerCase();
+ if (cmd.startsWith("select")) {
+ String[] parts = sql.trim().split(" ");
+ }
+ }
+
+ /**
+ * Code to be run within the DB driver after a SQL statement has been executed. This is where remote statement
+ * actions can be copied back to Cassandra/MUSIC.
+ *
+ * @param sql the SQL statement that was executed
+ */
+ @Override
+ public void postStatementHook(final String sql, StagingTable transactionDigest) {
+ if (sql != null) {
+ String[] parts = sql.trim().split(" ");
+ String cmd = parts[0].toLowerCase();
+ if ("delete".equals(cmd) || "insert".equals(cmd) || "update".equals(cmd)) {
+ if (useAsyncStagingUpdate) {
+ synchronized (stagingHandlerLock) {
+ if (stagingHandler == null || currentStaging != transactionDigest) {
+ Runnable newRunnable = new StagingTableUpdateRunnable(this, transactionDigest);
+ currentStaging = transactionDigest;
+ stagingHandler = new AsyncUpdateHandler(newRunnable);
+ }
+ // else we can keep using the current staging Handler
+ }
+ stagingHandler.processNewUpdate();
+ } else {
+
+ try {
+ this.updateStagingTable(transactionDigest);
+ } catch (NoSuchFieldException | MDBCServiceException e) {
+ // TODO Auto-generated catch block
+ this.logger.error("Error updating the staging table");
+ }
+ }
+ }
+ }
+ }
+
+ private SQLOperation toOpEnum(String operation) throws NoSuchFieldException {
+ switch (operation.toLowerCase()) {
+ case "i":
+ return SQLOperation.INSERT;
+ case "d":
+ return SQLOperation.DELETE;
+ case "u":
+ return SQLOperation.UPDATE;
+ case "s":
+ return SQLOperation.SELECT;
+ default:
+ logger.error(EELFLoggerDelegate.errorLogger, "Invalid operation selected: [" + operation + "]");
+ throw new NoSuchFieldException("Invalid operation enum");
+ }
+
+ }
+
+ /**
+ * Copy data that is in transaction table into music interface
+ *
+ * @param transactionDigests
+ * @throws NoSuchFieldException
+ */
+ private void updateStagingTable(StagingTable transactionDigests) throws NoSuchFieldException, MDBCServiceException {
+ // copy from DB.MDBC_TRANSLOG where connid == myconnid
+ // then delete from MDBC_TRANSLOG
+ String sql2 = "SELECT IX, SCHEMANAME, TABLENAME, OP, ROWDATA, KEYDATA FROM " + TRANS_TBL
+ + " WHERE CONNECTION_ID = " + this.connId;
+ Integer biggestIx = Integer.MIN_VALUE;
+ Integer smallestIx = Integer.MAX_VALUE;
+ try {
+ ResultSet rs = executeSQLRead(sql2);
+ Set<Integer> rows = new TreeSet<Integer>();
+ while (rs.next()) {
+ int ix = rs.getInt("IX");
+ biggestIx = Integer.max(biggestIx, ix);
+ smallestIx = Integer.min(smallestIx, ix);
+ String op = rs.getString("OP");
+ SQLOperation opType = toOpEnum(op);
+ String schema = rs.getString("SCHEMANAME");
+ String tbl = rs.getString("TABLENAME");
+ String newRowStr = rs.getString("ROWDATA");
+ String rowStr = rs.getString("KEYDATA");
+ Range range = new Range(schema + "." + tbl);
+ transactionDigests.addOperation(range, opType, newRowStr, rowStr);
+ rows.add(ix);
+ }
+ rs.getStatement().close();
+ // batch delete operations
+ if (rows.size() > 0) {
+ this.deleteStagingStatement.setInt(1, smallestIx);
+ this.deleteStagingStatement.setInt(2, biggestIx);
+ this.deleteStagingStatement.setInt(3, this.connId);
+ logger.debug("Staging delete: Executing with vals [" + smallestIx + "," + biggestIx + "," + this.connId
+ + "]");
+ this.deleteStagingStatement.execute();
+ }
+ } catch (SQLException e) {
+ logger.warn("Exception in postStatementHook: " + e);
+ e.printStackTrace();
+ }
+ }
+
+
+ /**
+ * Update music with data from MySQL table
+ *
+ * @param tableName - name of table to update in music
+ */
+ @Override
+ public void synchronizeData(String tableName) {
+ ResultSet rs = null;
+ TableInfo ti = getTableInfo(tableName);
+ String query = "SELECT * FROM " + tableName;
+
+ try {
+ rs = executeSQLRead(query);
+ if (rs == null)
+ return;
+ while (rs.next()) {
+
+ JSONObject jo = new JSONObject();
+ if (!getTableInfo(tableName).hasKey()) {
+ String musicKey = MDBCUtils.generateUniqueKey().toString();
+ jo.put(mi.getMusicDefaultPrimaryKeyName(), musicKey);
+ }
+
+ for (String col : ti.columns) {
+ jo.put(col, rs.getString(col));
+ }
+
+ @SuppressWarnings("unused")
+ Object[] row = Utils.jsonToRow(ti, tableName, jo, mi.getMusicDefaultPrimaryKeyName());
+ // \FIXME this is wrong now, update of the dirty row and entity is now handled by the archival process
+ // msm.updateDirtyRowAndEntityTableInMusic(ti,tableName, jo);
+ }
+ } catch (Exception e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "synchronizing data " + tableName + " -> " + e.getMessage());
+ } finally {
+ try {
+ if (rs != null) {
+ rs.close();
+ }
+ } catch (SQLException e) {
+ // continue
+ }
+ }
+
+ }
+
+ /**
+ * Return a list of "reserved" names, that should not be used by MySQL client/MUSIC These are reserved for mdbc
+ * Returned names are in all UPPERCASE
+ */
+ @Override
+ public List<String> getReservedTblNames() {
+ ArrayList<String> rsvdTables = new ArrayList<String>();
+ rsvdTables.add(dbName.toUpperCase() + "." + TRANS_TBL);
+ rsvdTables.add(dbName.toUpperCase() + "." + CKPT_TBL);
+ // Add others here as necessary
+ return rsvdTables;
+ }
+
+ @Override
+ public String getPrimaryKey(String sql, String tableName) {
+ //
+ return null;
+ }
+
+
+ public String applyDigest(Map<Range, StagingTable> digest) {
+ throw new NotImplementedException();
+ }
+
+ @SuppressWarnings("unused")
+ @Deprecated
+ private ArrayList<String> getMusicKey(String sql) {
+ try {
+ net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql);
+ if (stmt instanceof Insert) {
+ Insert s = (Insert) stmt;
+ String tbl = s.getTable().getName();
+ return getMusicKey(tbl, "INSERT", sql);
+ } else if (stmt instanceof Update) {
+ Update u = (Update) stmt;
+ String tbl = u.getTables().get(0).getName();
+ return getMusicKey(tbl, "UPDATE", sql);
+ } else if (stmt instanceof Delete) {
+ Delete d = (Delete) stmt;
+ // TODO: IMPLEMENT
+ String tbl = d.getTable().getName();
+ return getMusicKey(tbl, "DELETE", sql);
+ } else {
+ System.err.println("Not recognized sql type");
+ }
+
+ } catch (JSQLParserException e) {
+
+ e.printStackTrace();
+ }
+ // Something went wrong here
+ return new ArrayList<String>();
+ }
+
+ /**
+ * Returns all keys that matches the current sql statement, and not in already updated keys.
+ *
+ * @param tbl
+ * @param cmd
+ * @param sql
+ */
+ @Deprecated
+ private ArrayList<String> getMusicKey(String tbl, String cmd, String sql) {
+ ArrayList<String> musicKeys = new ArrayList<String>();
+ /*
+ * if (cmd.equalsIgnoreCase("insert")) { //create key, return key musicKeys.add(msm.generatePrimaryKey()); }
+ * else if (cmd.equalsIgnoreCase("update") || cmd.equalsIgnoreCase("delete")) { try {
+ * net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql); String where; if (stmt instanceof
+ * Update) { where = ((Update) stmt).getWhere().toString(); } else if (stmt instanceof Delete) { where =
+ * ((Delete) stmt).getWhere().toString(); } else { System.err.println("Unknown type: " +stmt.getClass()); where
+ * = ""; } ResultSet rs = executeSQLRead("SELECT * FROM " + tbl + " WHERE " + where); musicKeys =
+ * msm.getMusicKeysWhere(tbl, Utils.parseResults(getTableInfo(tbl), rs)); } catch (JSQLParserException e) {
+ *
+ * e.printStackTrace(); } catch (SQLException e) { //Not a valid sql query e.printStackTrace(); } }
+ */
+ return musicKeys;
+ }
+
+
+ @Deprecated
+ public void insertRowIntoSqlDbOLD(String tableName, Map<String, Object> map) {
+ // First construct the value string and column name string for the db write
+ TableInfo ti = getTableInfo(tableName);
+ StringBuilder fields = new StringBuilder();
+ StringBuilder values = new StringBuilder();
+ String pfx = "";
+ for (String col : ti.columns) {
+ fields.append(pfx).append(col);
+ values.append(pfx).append(Utils.getStringValue(map.get(col)));
+ pfx = ", ";
+ }
+
+ try {
+ String sql =
+ String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString());
+ executeSQLWrite(sql);
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Insert failed because row exists, do an update");
+ StringBuilder where = new StringBuilder();
+ pfx = "";
+ String pfx2 = "";
+ fields.setLength(0);
+ for (int i = 0; i < ti.columns.size(); i++) {
+ String col = ti.columns.get(i);
+ String val = Utils.getStringValue(map.get(col));
+ if (ti.iskey.get(i)) {
+ where.append(pfx).append(col).append("=").append(val);
+ pfx = " AND ";
+ } else {
+ fields.append(pfx2).append(col).append("=").append(val);
+ pfx2 = ", ";
+ }
+ }
+ String sql = String.format("UPDATE %s SET %s WHERE %s", tableName, fields.toString(), where.toString());
+ try {
+ executeSQLWrite(sql);
+ } catch (SQLException e1) {
+ logger.error(EELFLoggerDelegate.errorLogger, "executeSQLWrite" + e1);
+ }
+ }
+ }
+
+ /**
+ * Parse the transaction digest into individual events
+ *
+ * @param transaction - base 64 encoded, serialized digest
+ * @throws MDBCServiceException
+ */
+ public void replayTransaction(StagingTable transaction, Set<Range> ranges)
+ throws SQLException, MDBCServiceException {
+ boolean autocommit = jdbcConn.getAutoCommit();
+ jdbcConn.setAutoCommit(false);
+ Statement jdbcStmt = jdbcConn.createStatement();
+ ArrayList<Operation> opList = transaction.getOperationList();
+
+ for (Operation op : opList) {
+ if (Range.overlaps(ranges, op.getTable())) {
+ try {
+ replayOperationIntoDB(jdbcStmt, op);
+ } catch (SQLException | MDBCServiceException e) {
+ // rollback transaction
+ logger.error("Unable to replay: " + op.getOperationType() + "->" + op.getVal() + "."
+ + "Rolling back the entire digest replay.");
+ jdbcConn.rollback();
+ throw e;
+ }
+ }
+ }
+
+ clearReplayedOperations(jdbcStmt);
+ jdbcConn.commit();
+ jdbcStmt.close();
+
+ jdbcConn.setAutoCommit(autocommit);
+ }
+
+ @Override
+ public void disableForeignKeyChecks() throws SQLException {
+ Statement disable = jdbcConn.createStatement();
+ disable.execute("SET FOREIGN_KEY_CHECKS=0");
+ disable.closeOnCompletion();
+ }
+
+ @Override
+ public void enableForeignKeyChecks() throws SQLException {
Statement enable = jdbcConn.createStatement();
- enable.execute("SET FOREIGN_KEY_CHECKS=1");
- enable.closeOnCompletion();
- }
+ enable.execute("SET FOREIGN_KEY_CHECKS=1");
+ enable.closeOnCompletion();
+ }
- @Override
- public void applyTxDigest(StagingTable txDigest,List<Range> ranges) throws SQLException, MDBCServiceException {
- replayTransaction(txDigest,ranges);
- }
+ @Override
+ public void applyTxDigest(StagingTable txDigest, Set<Range> ranges) throws SQLException, MDBCServiceException {
+ replayTransaction(txDigest, ranges);
+ }
- /**
+ /**
* Replays operation into database, usually from txDigest
+ *
* @param jdbcStmt
* @param r
* @param op
- * @throws SQLException
- * @throws MDBCServiceException
+ * @throws SQLException
+ * @throws MDBCServiceException
*/
private void replayOperationIntoDB(Statement jdbcStmt, Operation op) throws SQLException, MDBCServiceException {
logger.info("Replaying Operation: " + op.getOperationType() + "->" + op.getVal());
JSONObject jsonOp = op.getVal();
-
+
ArrayList<String> cols = new ArrayList<String>();
ArrayList<Object> vals = new ArrayList<Object>();
Iterator<String> colIterator = jsonOp.keys();
- while(colIterator.hasNext()) {
+ while (colIterator.hasNext()) {
String col = colIterator.next();
- //FIXME: should not explicitly refer to cassandramixin
+ // FIXME: should not explicitly refer to cassandramixin
if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) {
- //reserved name
+ // reserved name
continue;
}
cols.add(col);
vals.add(jsonOp.get(col));
}
-
- //build and replay the queries
+
+ // build and replay the queries
StringBuilder sql = constructSQL(op, cols, vals);
- if(sql == null)
+ if (sql == null)
return;
-
+
try {
logger.info("Replaying operation: " + sql.toString());
int updated = jdbcStmt.executeUpdate(sql.toString());
-
- if(updated == 0) {
+
+ if (updated == 0) {
// This applies only for replaying transactions involving Eventually Consistent tables
- logger.warn("Error Replaying operation: " + sql.toString() + "; Replacing insert/replace/viceversa and replaying ");
-
+ logger.warn("Error Replaying operation: " + sql.toString()
+ + "; Replacing insert/replace/viceversa and replaying ");
+
buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals);
}
} catch (SQLException sqlE) {
@@ -994,24 +972,23 @@ NEW.field refers to the new value
// or transactions that replay on top of existing keys
logger.warn("Error Replaying operation: " + sql.toString() + ";"
+ "Replacing insert/replace/viceversa and replaying ");
-
- buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals);
-
+
+ buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals);
+
}
}
-
- protected void buildAndExecuteSQLInverse(Statement jdbcStmt, Operation op,
- ArrayList<String> cols, ArrayList<Object> vals) throws SQLException, MDBCServiceException {
+
+ protected void buildAndExecuteSQLInverse(Statement jdbcStmt, Operation op, ArrayList<String> cols,
+ ArrayList<Object> vals) throws SQLException, MDBCServiceException {
StringBuilder sqlInverse = constructSQLInverse(op, cols, vals);
- if(sqlInverse == null)
+ if (sqlInverse == null)
return;
- logger.info("Replaying operation: " + sqlInverse.toString());
+ logger.info("Replaying operation: " + sqlInverse.toString());
jdbcStmt.executeUpdate(sqlInverse.toString());
}
-
+
/**
- * Construct an update statement from an insert, or
- * construct an insert statement from an update
+ * Construct an update statement from an insert, or construct an insert statement from an update
*
* useful when replaying logic, if the primary key value is already present/not present
*
@@ -1022,120 +999,167 @@ NEW.field refers to the new value
* @throws MDBCServiceException
*/
- protected StringBuilder constructSQLInverse(Operation op, ArrayList<String> cols,
- ArrayList<Object> vals) throws MDBCServiceException {
+ protected StringBuilder constructSQLInverse(Operation op, ArrayList<String> cols, ArrayList<Object> vals)
+ throws MDBCServiceException {
StringBuilder sqlInverse = null;
switch (op.getOperationType()) {
case INSERT:
- sqlInverse = constructUpdate(op.getTable() , SQLOperation.UPDATE, op.getKey(), cols, vals);
+ sqlInverse = constructUpdate(op.getTable(), SQLOperation.UPDATE, op.getKey(), cols, vals);
break;
case UPDATE:
- sqlInverse = constructInsert(op.getTable() , SQLOperation.INSERT, cols, vals);
+ sqlInverse = constructInsert(op.getTable(), SQLOperation.INSERT, cols, vals);
break;
default:
break;
}
return sqlInverse;
}
- protected StringBuilder constructSQL(Operation op, ArrayList<String> cols,
- ArrayList<Object> vals) throws MDBCServiceException {
+
+ protected StringBuilder constructSQL(Operation op, ArrayList<String> cols, ArrayList<Object> vals)
+ throws MDBCServiceException {
StringBuilder sql = null;
switch (op.getOperationType()) {
- case INSERT:
- sql = constructInsert(op.getTable(), op.getOperationType(), cols, vals);
- break;
- case UPDATE:
- sql = constructUpdate(op.getTable(), op.getOperationType(), op.getKey(), cols, vals);
- break;
- case DELETE:
- sql = constructDelete(op.getTable(), op.getOperationType(), op.getKey());
- break;
- case SELECT:
- //no update happened, do nothing
- break;
- default:
- logger.error(op.getOperationType() + "not implemented for replay");
+ case INSERT:
+ sql = constructInsert(op.getTable(), op.getOperationType(), cols, vals);
+ break;
+ case UPDATE:
+ sql = constructUpdate(op.getTable(), op.getOperationType(), op.getKey(), cols, vals);
+ break;
+ case DELETE:
+ sql = constructDelete(op.getTable(), op.getOperationType(), op.getKey());
+ break;
+ case SELECT:
+ // no update happened, do nothing
+ break;
+ default:
+ logger.error(op.getOperationType() + "not implemented for replay");
}
return sql;
}
+
private StringBuilder constructDelete(String r, SQLOperation op, JSONObject key) {
StringBuilder sql = new StringBuilder();
sql.append(op + " FROM ");
- sql.append(r + " WHERE ");
+ sql.append(r + " WHERE ");
sql.append(getPrimaryKeyConditional(key));
sql.append(";");
return sql;
}
- private StringBuilder constructInsert(String r, SQLOperation op, ArrayList<String> cols,
- ArrayList<Object> vals) {
+
+ private StringBuilder constructInsert(String r, SQLOperation op, ArrayList<String> cols, ArrayList<Object> vals) {
StringBuilder sql = new StringBuilder();
String sep;
sql.append(op + " INTO ");
- sql.append(r + " (") ;
+ sql.append(r + " (");
sep = "";
- for (String col: cols) {
+ for (String col : cols) {
sql.append(sep + col);
sep = ", ";
- }
+ }
sql.append(") VALUES (");
sep = "";
- for (Object val: vals) {
+ for (Object val : vals) {
sql.append(sep + "\"" + val + "\"");
sep = ", ";
}
sql.append(");");
return sql;
}
+
private StringBuilder constructUpdate(String r, SQLOperation op, JSONObject key, ArrayList<String> cols,
ArrayList<Object> vals) {
StringBuilder sql = new StringBuilder();
String sep;
sql.append(op + " ");
sql.append(r + " SET ");
- sep="";
- for (int i=0; i<cols.size(); i++) {
- sql.append(sep + cols.get(i) + "=\"" + vals.get(i) +"\"");
+ sep = "";
+ for (int i = 0; i < cols.size(); i++) {
+ sql.append(sep + cols.get(i) + "=\"" + vals.get(i) + "\"");
sep = ", ";
}
sql.append(" WHERE ");
sql.append(getPrimaryKeyConditional(key));
sql.append(";");
-
+
return sql;
}
-
- /**
- * Create an SQL string for AND'ing all of the primary keys
- * @param primaryKeys Json of primary keys and their values
- * @return string in the form of PK1=Val1 AND PK2=Val2 AND PK3=Val3
- */
+
+ /**
+ * Create an SQL string for AND'ing all of the primary keys
+ *
+ * @param primaryKeys Json of primary keys and their values
+ * @return string in the form of PK1=Val1 AND PK2=Val2 AND PK3=Val3
+ */
private String getPrimaryKeyConditional(JSONObject primaryKeys) {
- StringBuilder keyCondStmt = new StringBuilder();
- String and = "";
- for (String key: primaryKeys.keySet()) {
- // We cannot use the default primary key for the sql table and operations
- if(!key.equals(mi.getMusicDefaultPrimaryKeyName())) {
+ StringBuilder keyCondStmt = new StringBuilder();
+ String and = "";
+ for (String key : primaryKeys.keySet()) {
+ // We cannot use the default primary key for the sql table and operations
+ if (!key.equals(mi.getMusicDefaultPrimaryKeyName())) {
Object val = primaryKeys.get(key);
keyCondStmt.append(and + key + "=\"" + val + "\"");
and = " AND ";
}
- }
- return keyCondStmt.toString();
- }
-
- /**
- * Cleans out the transaction table, removing the replayed operations
- * @param jdbcStmt
- * @throws SQLException
- */
- private void clearReplayedOperations(Statement jdbcStmt) throws SQLException {
- logger.info("Clearing replayed operations");
- String sql = "DELETE FROM " + TRANS_TBL + " WHERE CONNECTION_ID = " + this.connId;
- jdbcStmt.executeUpdate(sql);
- }
-
- @Override
- public Connection getSQLConnection(){
- return jdbcConn;
- }
+ }
+ return keyCondStmt.toString();
+ }
+
+ /**
+ * Cleans out the transaction table, removing the replayed operations
+ *
+ * @param jdbcStmt
+ * @throws SQLException
+ */
+ private void clearReplayedOperations(Statement jdbcStmt) throws SQLException {
+ logger.info("Clearing replayed operations");
+ String sql = "DELETE FROM " + TRANS_TBL + " WHERE CONNECTION_ID = " + this.connId;
+ jdbcStmt.executeUpdate(sql);
+ }
+
+ @Override
+ public Connection getSQLConnection() {
+ return jdbcConn;
+ }
+
+ @Override
+ public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) {
+ String query = "UPDATE " + CKPT_TBL + " SET MRIROW=?, DIGESTINDEX=? where RANGENAME=?;";
+ try {
+ PreparedStatement stmt = jdbcConn.prepareStatement(query);
+ stmt.setString(1, playbackPointer.getLeft().toString());
+ stmt.setInt(2, playbackPointer.getRight());
+ stmt.setString(3, r.getTable());
+ stmt.execute();
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Unable to update replay checkpoint location");
+ }
+ }
+
+ @Override
+ public void initTables() {
+ try {
+ Statement stmt = jdbcConn.createStatement();
+ stmt.execute(CREATE_TBL_SQL);
+ stmt.execute(CREATE_CKPT_SQL);
+ stmt.close();
+
+ //prepare checkpoint table
+ String query = "INSERT INTO " + CKPT_TBL + " (RANGENAME) VALUES (?);";
+ for (Range range: getSQLRangeSet()) {
+ if (getReservedTblNames().contains(range.getTable().toUpperCase())) {
+ continue;
+ }
+ PreparedStatement prepstmt = jdbcConn.prepareStatement(query);
+ prepstmt.setString(1, range.getTable());
+ prepstmt.execute();
+ prepstmt.close();
+ }
+
+ logger.info(EELFLoggerDelegate.applicationLogger, "initTables: Server side dirty table created.");
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "initTables: problem creating th mdbc tables!");
+ }
+ }
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java
index 0f66731..4afaa71 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java
@@ -32,6 +32,8 @@ import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.delete.Delete;
import net.sf.jsqlparser.statement.insert.Insert;
import net.sf.jsqlparser.statement.update.Update;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.zookeeper.KeeperException.UnimplementedException;
import org.json.JSONObject;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
@@ -814,7 +816,8 @@ public class PostgresMixin implements DBInterface {
*
* @param transaction - base 64 encoded, serialized digest
*/
- public void replayTransaction(StagingTable transaction, List<Range> ranges)
+ @Override
+ public void replayTransaction(StagingTable transaction, Set<Range> ranges)
throws SQLException, MDBCServiceException {
boolean autocommit = jdbcConn.getAutoCommit();
jdbcConn.setAutoCommit(false);
@@ -856,7 +859,7 @@ public class PostgresMixin implements DBInterface {
}
@Override
- public void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException {
+ public void applyTxDigest(StagingTable txDigest, Set<Range> ranges) throws SQLException, MDBCServiceException {
replayTransaction(txDigest, ranges);
}
@@ -1063,4 +1066,14 @@ public class PostgresMixin implements DBInterface {
return set;
}
+ @Override
+ public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) {
+ throw new org.apache.commons.lang.NotImplementedException();
+ }
+
+ @Override
+ public void initTables() {
+ throw new org.apache.commons.lang.NotImplementedException();
+ }
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
index 07a5fe6..9d1685c 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
@@ -62,7 +62,7 @@ public class Dag {
rowsPerLatestRange = null;
}
- private void createDag(List<MusicRangeInformationRow> rows, List<Range> ranges){
+ private void createDag(List<MusicRangeInformationRow> rows, Set<Range> ranges){
this.ranges = new ArrayList<>(ranges);
Map<Range,DagNode> latestRow = new HashMap<>();
//sort to make sure rows are in chronological order
@@ -72,7 +72,7 @@ public class Dag {
DagNode node = new DagNode(row);
nodes.put(row.getPartitionIndex(),node);
for(Range range : ranges){
- List<Range> nodeRanges = row.getDBPartition().getSnapshot();
+ Set<Range> nodeRanges = row.getDBPartition().getSnapshot();
for(Range nRange : nodeRanges){
if(nRange.overlaps(range)){
if(latestRow.containsKey(range)){
@@ -88,7 +88,7 @@ public class Dag {
}
}
- public static Dag getDag(List<MusicRangeInformationRow> rows, List<Range> ranges){
+ public static Dag getDag(List<MusicRangeInformationRow> rows, Set<Range> ranges){
Dag newDag = new Dag(true);
newDag.createDag(rows,ranges);
return newDag;
@@ -116,7 +116,7 @@ public class Dag {
});
}
- public DagNode getNode(UUID rowId) throws MDBCServiceException {
+ public DagNode getNode(UUID rowId) {
if(!nodes.containsKey(rowId)){
return null;
}
@@ -141,7 +141,7 @@ public class Dag {
return nextNode;
}
- public synchronized DagNode nextToApply(List<Range> ranges){
+ public synchronized DagNode nextToApply(Set<Range> ranges){
if(!readyInit){
initApplyDatastructures();
}
@@ -283,7 +283,7 @@ public class Dag {
}
}
- public void addNewNodeWithSearch(MusicRangeInformationRow row, List<Range> ranges) throws MDBCServiceException {
+ public void addNewNodeWithSearch(MusicRangeInformationRow row, Set<Range> ranges) throws MDBCServiceException {
Map<Range,DagNode> newestNode = new HashMap<>();
for(DagNode node : nodes.values()){
for(Range range : ranges) {
@@ -304,6 +304,10 @@ public class Dag {
addNewNode(row,dependencies);
}
+ /**
+ *
+ * @return All ranges in every node of the DAG
+ */
public Set<Range> getAllRanges(){
Set<Range> ranges = new HashSet<>();
for(DagNode node : nodes.values()){
@@ -385,8 +389,8 @@ public class Dag {
return toDisable;
}
- public Pair<List<Range>,Set<DagNode>> getIncompleteRangesAndDependents() throws MDBCServiceException {
- List<Range> incomplete = new ArrayList<>();
+ public Pair<Set<Range>, Set<DagNode>> getIncompleteRangesAndDependents() throws MDBCServiceException {
+ Set<Range> incomplete = new HashSet<>();
Set<DagNode> dependents = new HashSet<>();
Map<Range,Set<DagNode>> rowsPerLatestRange = getIsLatestPerRange();
List<DagNode> toDisable = getOldestDoubleRows(rowsPerLatestRange);
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java
index e737b26..78c68e1 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java
@@ -61,7 +61,9 @@ public class DagNode {
startIndex = new HashMap<>();
}
- public MusicRangeInformationRow getRow() { return row;}
+ public MusicRangeInformationRow getRow() {
+ return row;
+ }
public synchronized void setOwned(){
owned = true;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
index 854eb5f..b848964 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
@@ -25,7 +25,6 @@ import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
@@ -36,7 +35,6 @@ import org.onap.music.mdbc.mixins.LockRequest;
import org.onap.music.mdbc.mixins.LockResult;
import org.onap.music.mdbc.mixins.MusicInterface;
import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn;
-import org.onap.music.mdbc.mixins.MusicMixin;
import org.onap.music.mdbc.query.SQLOperationType;
import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
@@ -90,14 +88,14 @@ public class OwnershipAndCheckpoint{
}
- private List<MusicRangeInformationRow> extractRowsForRange(List<MusicRangeInformationRow> allMriRows, List<Range> ranges,
+ private List<MusicRangeInformationRow> extractRowsForRange(List<MusicRangeInformationRow> allMriRows, Set<Range> ranges,
boolean onlyIsLatest){
List<MusicRangeInformationRow> rows = new ArrayList<>();
for(MusicRangeInformationRow row : allMriRows){
if(onlyIsLatest && !row.getIsLatest()){
continue;
}
- final List<Range> rowRanges = row.getDBPartition().getSnapshot();
+ final Set<Range> rowRanges = row.getDBPartition().getSnapshot();
boolean found = false;
for(Range sRange : ranges){
for(Range rRange: rowRanges) {
@@ -120,7 +118,7 @@ public class OwnershipAndCheckpoint{
* @param onlyIsLatest - only return the "latest" rows
* @return
*/
- private List<MusicRangeInformationRow> extractRowsForRange(MusicInterface music, List<Range> ranges, boolean onlyIsLatest)
+ private List<MusicRangeInformationRow> extractRowsForRange(MusicInterface music, Set<Range> ranges, boolean onlyIsLatest)
throws MDBCServiceException {
final List<MusicRangeInformationRow> allMriRows = music.getAllMriRows();
return extractRowsForRange(allMriRows,ranges,onlyIsLatest);
@@ -136,7 +134,7 @@ public class OwnershipAndCheckpoint{
* @param ownOpId
* @throws MDBCServiceException
*/
- public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, List<Range> ranges,
+ public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, Set<Range> ranges,
Map<MusicRangeInformationRow, LockResult> locks, UUID ownOpId) throws MDBCServiceException {
if(ranges.isEmpty()){
return;
@@ -173,7 +171,7 @@ public class OwnershipAndCheckpoint{
}
}
- private void applyTxDigest(List<Range> ranges, DBInterface di, StagingTable txDigest)
+ private void applyTxDigest(Set<Range> ranges, DBInterface di, StagingTable txDigest)
throws MDBCServiceException {
try {
di.applyTxDigest(txDigest,ranges);
@@ -189,7 +187,7 @@ public class OwnershipAndCheckpoint{
* @param rangesToWarmup
* @throws MDBCServiceException
*/
- public void warmup(MusicInterface mi, DBInterface di, List<Range> rangesToWarmup) throws MDBCServiceException {
+ public void warmup(MusicInterface mi, DBInterface di, Set<Range> rangesToWarmup) throws MDBCServiceException {
if(rangesToWarmup.isEmpty()){
return;
}
@@ -229,7 +227,16 @@ public class OwnershipAndCheckpoint{
}
}
- private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface di, List<Range> ranges, DagNode node,
+ /**
+ * Apply tx digest for ranges, update checkpoint location (alreadyApplied)
+ * @param mi
+ * @param di
+ * @param ranges
+ * @param node
+ * @param pair
+ * @throws MDBCServiceException
+ */
+ private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface di, Set<Range> ranges, DagNode node,
Pair<MusicTxDigestId, List<Range>> pair) throws MDBCServiceException {
final StagingTable txDigest;
try {
@@ -244,10 +251,34 @@ public class OwnershipAndCheckpoint{
for (Range r : pair.getValue()) {
MusicRangeInformationRow row = node.getRow();
alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index));
+
+ updateCheckpointLocations(mi, di, r, row.getPartitionIndex(), pair.getKey().index);
}
}
- private void applyRequiredChanges(MusicInterface mi, DBInterface db, Dag extendedDag, List<Range> ranges, UUID ownOpId)
+ /**
+ * Update external checkpoint markers in sql db and music
+ * @param mi
+ * @param di
+ * @param r
+ * @param partitionIndex
+ * @param index
+ */
+ private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, int index) {
+ dbi.updateCheckpointLocations(r, Pair.of(partitionIndex, index));
+ mi.updateCheckpointLocations(r, Pair.of(partitionIndex, index));
+ }
+
+ /**
+ * Forceably apply changes in tx digest for ranges
+ * @param mi
+ * @param db
+ * @param extendedDag
+ * @param ranges
+ * @param ownOpId
+ * @throws MDBCServiceException
+ */
+ private void applyRequiredChanges(MusicInterface mi, DBInterface db, Dag extendedDag, Set<Range> ranges, UUID ownOpId)
throws MDBCServiceException {
Set<Range> rangeSet = new HashSet<Range>(ranges);
disableForeignKeys(db);
@@ -278,7 +309,7 @@ public class OwnershipAndCheckpoint{
* @return an object indicating the status of the own function result
* @throws MDBCServiceException
*/
- public OwnershipReturn own(MusicInterface mi, List<Range> ranges,
+ public OwnershipReturn own(MusicInterface mi, Set<Range> ranges,
DatabasePartition currPartition, UUID opId, SQLOperationType lockType) throws MDBCServiceException {
if (ranges == null || ranges.isEmpty()) {
@@ -292,15 +323,15 @@ public class OwnershipAndCheckpoint{
currPartition.getSnapshot(),null);
}
//Find
- Map<UUID,LockResult> newLocks = new HashMap<>();
- List<Range> rangesToOwn = mi.getRangeDependencies(ranges);
+ Map<UUID,LockResult> locksForOwnership = new HashMap<>();
+ Set<Range> rangesToOwn = mi.getRangeDependencies(ranges);
List<MusicRangeInformationRow> rangesToOwnRows = extractRowsForRange(mi,rangesToOwn, false);
Dag toOwn = Dag.getDag(rangesToOwnRows,rangesToOwn);
Dag currentlyOwn = new Dag();
while ( (toOwn.isDifferent(currentlyOwn) || !currentlyOwn.isOwned() ) &&
!timeout(opId)
) {
- takeOwnershipOfDag(mi, currPartition, opId, newLocks, toOwn, lockType);
+ takeOwnershipOfDag(mi, currPartition, opId, locksForOwnership, toOwn, lockType);
currentlyOwn=toOwn;
//TODO instead of comparing dags, compare rows
rangesToOwnRows = extractRowsForRange(mi, rangesToOwn, false);
@@ -308,29 +339,30 @@ public class OwnershipAndCheckpoint{
}
if (!currentlyOwn.isOwned() || toOwn.isDifferent(currentlyOwn)) {
//hold on to previous partition
- newLocks.remove(currPartition.getMRIIndex());
- mi.releaseLocks(newLocks);
+ locksForOwnership.remove(currPartition.getMRIIndex());
+ mi.releaseLocks(locksForOwnership);
stopOwnershipTimeoutClock(opId);
logger.error("Error when owning a range: Timeout");
throw new MDBCServiceException("Ownership timeout");
}
Set<Range> allRanges = currentlyOwn.getAllRanges();
- List<MusicRangeInformationRow> latestRows = extractRowsForRange(mi, new ArrayList<>(allRanges), true);
+ //TODO: we shouldn't need to go back to music at this point
+ List<MusicRangeInformationRow> latestRows = extractRowsForRange(mi, new HashSet<>(allRanges), true);
currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,latestRows));
- return mi.mergeLatestRowsIfNecessary(currentlyOwn,latestRows,ranges,newLocks,opId);
+ return mi.mergeLatestRowsIfNecessary(currentlyOwn,locksForOwnership,opId);
}
-
+
/**
* Step through dag and take lock ownership of each range
* @param partition current partition owned by system
* @param opId
- * @param newLocks
+ * @param ownershipLocks
* @param toOwn
* @param lockType
* @throws MDBCServiceException
*/
private void takeOwnershipOfDag(MusicInterface mi, DatabasePartition partition, UUID opId,
- Map<UUID, LockResult> newLocks, Dag toOwn, SQLOperationType lockType) throws MDBCServiceException {
+ Map<UUID, LockResult> ownershipLocks, Dag toOwn, SQLOperationType lockType) throws MDBCServiceException {
while(toOwn.hasNextToOwn()){
DagNode node = toOwn.nextToOwn();
@@ -338,9 +370,9 @@ public class OwnershipAndCheckpoint{
UUID uuidToOwn = row.getPartitionIndex();
if (partition.isLocked() && partition.getMRIIndex().equals(uuidToOwn) ) {
toOwn.setOwn(node);
- newLocks.put(uuidToOwn, new LockResult(true, uuidToOwn, partition.getLockId(),
+ ownershipLocks.put(uuidToOwn, new LockResult(true, uuidToOwn, partition.getLockId(),
false, partition.getSnapshot()));
- } else if ( newLocks.containsKey(uuidToOwn) || !row.getIsLatest() ) {
+ } else if ( ownershipLocks.containsKey(uuidToOwn) || !row.getIsLatest() ) {
toOwn.setOwn(node);
} else {
LockRequest request = new LockRequest(uuidToOwn,
@@ -370,7 +402,7 @@ public class OwnershipAndCheckpoint{
// TODO look into updating the partition object with the latest lockId;
if(owned){
toOwn.setOwn(node);
- newLocks.put(uuidToOwn,result);
+ ownershipLocks.put(uuidToOwn,result);
}
else{
mi.relinquish(lockId,uuidToOwn.toString());
@@ -380,9 +412,54 @@ public class OwnershipAndCheckpoint{
}
}
+ public String getDebugInfo(MusicInterface mi, String rangesStr) {
+
+ Set<Range> ranges = new HashSet<>();
+ Arrays.stream(rangesStr.split(",")).forEach(a -> ranges.add(new Range(a)));
+
+ StringBuffer buffer = new StringBuffer();
+ Set<Range> rangesToOwn;
+ try {
+ rangesToOwn = mi.getRangeDependencies(ranges);
+ List<MusicRangeInformationRow> rangesToOwnRows = extractRowsForRange(mi,rangesToOwn, false);
+ Dag toOwn = Dag.getDag(rangesToOwnRows,rangesToOwn);
+ while(toOwn.hasNextToOwn()){
+ DagNode node = null;
+ try {
+ node = toOwn.nextToOwn();
+ MusicRangeInformationRow row = node.getRow();
+
+ buffer.append("\n-------------\n");
+ buffer.append(row.getDBPartition()).append(",");
+ buffer.append(row.getPrevRowIndexes()).append(",");
+ buffer.append(row.getIsLatest()).append("");
+
+
+ } catch (Exception e) {
+ buffer.append("\n------missing MRI------\n");
+ } finally {
+
+ if(node != null) {
+ toOwn.setOwn(node);
+ }
+
+ }
+
+ }
+
+
+ } catch (MDBCServiceException e) {
+ buffer.setLength(0);
+ buffer.append(" Debugging info could not be determined");
+ }
+
+ return buffer.toString();
+
+ }
+
public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException {
- List<Range> snapshot = partition.getSnapshot();
+ Set<Range> snapshot = partition.getSnapshot();
UUID row = partition.getMRIIndex();
for(Range r : snapshot){
alreadyApplied.put(r,Pair.of(new MriReference(row),-1));
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java b/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java
index a31a2a0..6d6c661 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java
@@ -35,6 +35,7 @@ import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlJoin;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOrderBy;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.SqlUpdate;
import org.apache.calcite.sql.fun.SqlInOperator;
@@ -122,6 +123,9 @@ public class QueryProcessor {
case SELECT:
parseSelect((SqlSelect) sqlNode, tableOpsMap);
break;
+ case ORDER_BY:
+ parseSelect((SqlSelect)((SqlOrderBy) sqlNode).query, tableOpsMap);
+ break;
default:
logger.error("Unhandled sql query type " + sqlNode.getKind() +" for query " + query);
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java
index 6e6ade6..de711ef 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java
@@ -19,6 +19,7 @@
*/
package org.onap.music.mdbc.tables;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -49,8 +50,15 @@ public final class MusicRangeInformationRow implements Comparable<MusicRangeInfo
this.prevRowIndexes = prevPartitions;
}
+ public MusicRangeInformationRow(DatabasePartition dbPartition, boolean isLatest, Set<UUID> prevPartitions) {
+ this.dbPartition = dbPartition;
+ this.redoLog = new ArrayList<MusicTxDigestId>();
+ this.isLatest = isLatest;
+ this.prevRowIndexes = prevPartitions;
+ }
+
public UUID getPartitionIndex() {
- return dbPartition.getMRIIndex();
+ return dbPartition.getMRIIndex();
}
public boolean getIsLatest(){ return isLatest; }
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
index 5c6fae4..6f95d3c 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
@@ -51,7 +51,7 @@ public class MusicTxDigestDaemon implements Runnable {
* @param dbi interface to the database that will replay the operations
* @param ranges only these ranges will be applied from the digests
*/
- public void replayDigest(MusicInterface mi, DBInterface dbi, List<Range> ranges) throws MDBCServiceException {
+ public void replayDigest(MusicInterface mi, DBInterface dbi, Set<Range> ranges) throws MDBCServiceException {
StagingTable transaction;
String nodeName = stateManager.getMdbcServerName();
@@ -117,11 +117,11 @@ public class MusicTxDigestDaemon implements Runnable {
List<Range> missingRanges = new ArrayList<>();
if (currentPartitions.size() != 0) {
for (DatabasePartition part : currentPartitions) {
- List<Range> partitionRanges = part.getSnapshot();
+ Set<Range> partitionRanges = part.getSnapshot();
warmupRanges.removeAll(partitionRanges);
}
try {
- stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
+ stateManager.getOwnAndCheck().warmup(mi, dbi, new HashSet<>(warmupRanges));
} catch (MDBCServiceException e) {
logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
continue;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
index 4ef9d30..9ff7a0f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
@@ -266,7 +266,7 @@ public class StagingTable {
digestBuilder.clear();
}
- synchronized public boolean areEventualContained(List<Range> ranges){
+ synchronized public boolean areEventualContained(Set<Range> ranges){
return eventuallyConsistentRanges.containsAll(ranges);
}
}
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/StateManagerTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/StateManagerTest.java
index 899fff2..280c733 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/StateManagerTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/StateManagerTest.java
@@ -53,7 +53,7 @@ public class StateManagerTest {
@Test
public void testGetEventualRanges() throws NoSuchFieldException, SecurityException {
- List<Range> evList = new ArrayList<>();
+ Set<Range> evList = new HashSet<>();
evList.add(new Range("eventualRange"));
FieldSetter.setField(stateManager, stateManager.getClass().getDeclaredField("eventualRanges"), evList);
assertEquals(evList, stateManager.getEventualRanges());
@@ -61,7 +61,7 @@ public class StateManagerTest {
@Test
public void testSetEventualRanges() {
- List<Range> evList = new ArrayList<>();
+ Set<Range> evList = new HashSet<>();
evList.add(new Range("eventualRange"));
stateManager.setEventualRanges(evList);
assertEquals(evList, stateManager.getEventualRanges());
@@ -116,7 +116,7 @@ public class StateManagerTest {
allRanges.add(new Range("eventualRange"));
Mockito.when(dbiMock.getSQLRangeSet()).thenReturn(allRanges);
- List<Range> eventualRanges = new ArrayList<Range>();
+ Set<Range> eventualRanges = new HashSet<Range>();
eventualRanges.add(new Range("eventualRange"));
stateManager.setEventualRanges(eventualRanges);
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java
index f2bbdcd..ef26cb6 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java
@@ -115,7 +115,7 @@ public class MusicMixinTest {
@Test
public void own() {
Range range = new Range("TEST.TABLE1");
- List<Range> ranges = new ArrayList<>();
+ Set<Range> ranges = new HashSet<>();
ranges.add(range);
DatabasePartition partition=null;
try {
@@ -139,7 +139,7 @@ public class MusicMixinTest {
}
}
- private DatabasePartition addRow(List<Range> ranges,boolean isLatest){
+ private DatabasePartition addRow(Set<Range> ranges,boolean isLatest){
final UUID uuid = MDBCUtils.generateTimebasedUniqueKey();
DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null);
MusicRangeInformationRow newRow = new MusicRangeInformationRow(dbPartition, new ArrayList<>(), isLatest);
@@ -161,19 +161,19 @@ public class MusicMixinTest {
@Ignore // TODO: Move ownership tests to OwnershipAndCheckpointTest
@Test(timeout=1000)
public void own2() throws InterruptedException, MDBCServiceException {
- List<Range> range12 = new ArrayList<>( Arrays.asList(
+ Set<Range> range12 = new HashSet<>( Arrays.asList(
new Range("TEST.RANGE1"),
new Range("TEST.RANGE2")
));
- List<Range> range34 = new ArrayList<>( Arrays.asList(
+ Set<Range> range34 = new HashSet<>( Arrays.asList(
new Range("TEST.RANGE3"),
new Range("TEST.RANGE4")
));
- List<Range> range24 = new ArrayList<>( Arrays.asList(
+ Set<Range> range24 = new HashSet<>( Arrays.asList(
new Range("TEST.RANGE2"),
new Range("TEST.RANGE4")
));
- List<Range> range123 = new ArrayList<>( Arrays.asList(
+ Set<Range> range123 = new HashSet<>( Arrays.asList(
new Range("TEST.RANGE1"),
new Range("TEST.RANGE2"),
new Range("TEST.RANGE3")
@@ -225,7 +225,7 @@ public class MusicMixinTest {
MusicRangeInformationRow row = mixin.getMusicRangeInformation(own.getRangeId());
assertTrue(row.getIsLatest());
DatabasePartition dbPartition = row.getDBPartition();
- List<Range> snapshot = dbPartition.getSnapshot();
+ Set<Range> snapshot = dbPartition.getSnapshot();
assertEquals(3,snapshot.size());
MusicRangeInformationRow node5row = mixin.getMusicRangeInformation(node5.getId());
assertFalse(node5row.getIsLatest());
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java
index 2134a79..a1cf2b1 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.*;
import java.sql.*;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
@@ -207,7 +208,7 @@ public class PostgresMixinTest {
assertFalse(st.isEmpty());
cleanTestTable();
checkEmptyTestTable();
- List<Range> ranges = new ArrayList<>();
+ Set<Range> ranges = new HashSet<>();
ranges.add(new Range("public.testtable"));
try {
mixin.applyTxDigest(st,ranges);
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
index fa5583c..ee50dca 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
@@ -43,12 +43,12 @@ import static org.junit.Assert.*;
public class DagTest {
- private MusicRangeInformationRow createNewRow(List<Range> ranges, String lockid, boolean isLatest){
+ private MusicRangeInformationRow createNewRow(Set<Range> ranges, String lockid, boolean isLatest){
List<MusicTxDigestId> redoLog = new ArrayList<>();
return createNewRow(ranges,lockid,isLatest,redoLog);
}
- private MusicRangeInformationRow createNewRow(List<Range> ranges, String lockid, boolean isLatest,
+ private MusicRangeInformationRow createNewRow(Set<Range> ranges, String lockid, boolean isLatest,
List<MusicTxDigestId> redoLog) {
UUID id = MDBCUtils.generateTimebasedUniqueKey();
DatabasePartition dbPartition = new DatabasePartition(ranges, id, lockid);
@@ -58,14 +58,14 @@ public class DagTest {
@Test
public void getDag() throws InterruptedException, MDBCServiceException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
- List<Range> ranges = new ArrayList<>( Arrays.asList(
+ Set<Range> ranges = new HashSet<>( Arrays.asList(
new Range("schema.range1")
));
- rows.add(createNewRow(new ArrayList<>(ranges),"",false));
+ rows.add(createNewRow(new HashSet<>(ranges),"",false));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(ranges),"",false));
+ rows.add(createNewRow(new HashSet<>(ranges),"",false));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(ranges),"",true));
+ rows.add(createNewRow(new HashSet<>(ranges),"",true));
Dag dag = Dag.getDag(rows, ranges);
DagNode node1 = dag.getNode(rows.get(0).getPartitionIndex());
DagNode node2 = dag.getNode(rows.get(1).getPartitionIndex());
@@ -92,15 +92,15 @@ public class DagTest {
List<Range> range2 = new ArrayList<>( Arrays.asList(
new Range("schema.range2")
));
- List<Range> ranges = new ArrayList<>( Arrays.asList(
+ Set<Range> ranges = new HashSet<>( Arrays.asList(
new Range("schema.range2"),
new Range("schema.range1")
));
- rows.add(createNewRow(new ArrayList<>(range1),"",false));
+ rows.add(createNewRow(new HashSet<>(range1),"",false));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(range2),"",false));
+ rows.add(createNewRow(new HashSet<>(range2),"",false));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(ranges),"",true));
+ rows.add(createNewRow(new HashSet<>(ranges),"",true));
Dag dag = Dag.getDag(rows, ranges);
DagNode node1 = dag.getNode(rows.get(0).getPartitionIndex());
DagNode node2 = dag.getNode(rows.get(1).getPartitionIndex());
@@ -122,14 +122,14 @@ public class DagTest {
@Test
public void nextToOwn() throws InterruptedException, MDBCServiceException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
- List<Range> ranges = new ArrayList<>( Arrays.asList(
+ Set<Range> ranges = new HashSet<>( Arrays.asList(
new Range("schema.range1")
));
- rows.add(createNewRow(new ArrayList<>(ranges),"",false));
+ rows.add(createNewRow(new HashSet<>(ranges),"",false));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(ranges),"",false));
+ rows.add(createNewRow(new HashSet<>(ranges),"",false));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(ranges),"",true));
+ rows.add(createNewRow(new HashSet<>(ranges),"",true));
Dag dag = Dag.getDag(rows, ranges);
int counter = 0;
while(dag.hasNextToOwn()){
@@ -148,23 +148,23 @@ public class DagTest {
@Test
public void nextToApply() throws InterruptedException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
- List<Range> ranges = new ArrayList<>( Arrays.asList(
+ Set<Range> ranges = new HashSet<>( Arrays.asList(
new Range("schema.range1")
));
List<MusicTxDigestId> redo1 = new ArrayList<>(Arrays.asList(
new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
));
- rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo1));
+ rows.add(createNewRow(new HashSet<>(ranges),"",false,redo1));
MILLISECONDS.sleep(10);
List<MusicTxDigestId> redo2 = new ArrayList<>(Arrays.asList(
new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
));
- rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo2));
+ rows.add(createNewRow(new HashSet<>(ranges),"",false,redo2));
MILLISECONDS.sleep(10);
List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList(
new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
));
- rows.add(createNewRow(new ArrayList<>(ranges),"",true,redo3));
+ rows.add(createNewRow(new HashSet<>(ranges),"",true,redo3));
Dag dag = Dag.getDag(rows, ranges);
int nodeCounter = 0;
HashSet<Range> rangesSet = new HashSet<>(ranges);
@@ -194,26 +194,26 @@ public class DagTest {
public void nextToApply2() throws InterruptedException, MDBCServiceException {
Map<Range, Pair<MriReference, Integer>> alreadyApplied = new HashMap<>();
List<MusicRangeInformationRow> rows = new ArrayList<>();
- List<Range> ranges = new ArrayList<>( Arrays.asList(
+ Set<Range> ranges = new HashSet<>( Arrays.asList(
new Range("schema.range1")
));
List<MusicTxDigestId> redo1 = new ArrayList<>(Arrays.asList(
new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
));
- rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo1));
+ rows.add(createNewRow(new HashSet<>(ranges),"",false,redo1));
MILLISECONDS.sleep(10);
List<MusicTxDigestId> redo2 = new ArrayList<>(Arrays.asList(
new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0),
new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),1)
));
- MusicRangeInformationRow newRow = createNewRow(new ArrayList<>(ranges), "", false, redo2);
+ MusicRangeInformationRow newRow = createNewRow(new HashSet<>(ranges), "", false, redo2);
alreadyApplied.put(new Range("schema.range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0));
rows.add(newRow);
MILLISECONDS.sleep(10);
List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList(
new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
));
- rows.add(createNewRow(new ArrayList<>(ranges),"",true,redo3));
+ rows.add(createNewRow(new HashSet<>(ranges),"",true,redo3));
Dag dag = Dag.getDag(rows, ranges);
HashSet<Range> rangesSet = new HashSet<>(ranges);
dag.setAlreadyApplied(alreadyApplied, rangesSet);
@@ -249,22 +249,22 @@ public class DagTest {
List<Range> range2 = new ArrayList<>( Arrays.asList(
new Range("schema.range2")
));
- List<Range> ranges = new ArrayList<>( Arrays.asList(
+ Set<Range> ranges = new HashSet<>( Arrays.asList(
new Range("schema.range2"),
new Range("schema.range1")
));
- rows.add(createNewRow(new ArrayList<>(range1),"",false));
+ rows.add(createNewRow(new HashSet<>(range1),"",false));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(range2),"",false));
+ rows.add(createNewRow(new HashSet<>(range2),"",false));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(ranges),"",true));
+ rows.add(createNewRow(new HashSet<>(ranges),"",true));
List<MusicRangeInformationRow> rows2 = new ArrayList<>(rows);
List<MusicRangeInformationRow> rows3 = new ArrayList<>(rows);
MILLISECONDS.sleep(10);
- rows3.add(createNewRow(new ArrayList<>(ranges),"",true));
+ rows3.add(createNewRow(new HashSet<>(ranges),"",true));
Dag dag = Dag.getDag(rows, ranges);
- Dag dag2 = Dag.getDag(rows2, new ArrayList<>(ranges));
- Dag dag3 = Dag.getDag(rows3, new ArrayList<>(ranges));
+ Dag dag2 = Dag.getDag(rows2, new HashSet<>(ranges));
+ Dag dag3 = Dag.getDag(rows3, new HashSet<>(ranges));
assertFalse(dag.isDifferent(dag2));
assertFalse(dag2.isDifferent(dag));
assertTrue(dag.isDifferent(dag3));
@@ -282,19 +282,19 @@ public class DagTest {
List<Range> range2 = new ArrayList<>( Arrays.asList(
new Range("schema.range2")
));
- List<Range> ranges = new ArrayList<>( Arrays.asList(
+ Set<Range> ranges = new HashSet<>( Arrays.asList(
new Range("schema.range2"),
new Range("schema.range1")
));
- rows.add(createNewRow(new ArrayList<>(range1),"",false));
+ rows.add(createNewRow(new HashSet<>(range1),"",false));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(range2),"",false));
+ rows.add(createNewRow(new HashSet<>(range2),"",false));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(range1),"",true));
+ rows.add(createNewRow(new HashSet<>(range1),"",true));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(range2),"",true));
+ rows.add(createNewRow(new HashSet<>(range2),"",true));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(ranges),"",true));
+ rows.add(createNewRow(new HashSet<>(ranges),"",true));
Dag dag = Dag.getDag(rows, ranges);
List<DagNode> oldestDoubles = dag.getOldestDoubles();
assertTrue(oldestDoubles.contains(dag.getNode(rows.get(2).getPartitionIndex())));
@@ -312,22 +312,22 @@ public class DagTest {
new Range("schema.range2"),
new Range("schema.range3")
));
- List<Range> ranges = new ArrayList<>( Arrays.asList(
+ Set<Range> ranges = new HashSet<>( Arrays.asList(
new Range("schema.range2"),
new Range("schema.range1")
));
- rows.add(createNewRow(new ArrayList<>(range1),"",false));
+ rows.add(createNewRow(new HashSet<>(range1),"",false));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(range2),"",false));
+ rows.add(createNewRow(new HashSet<>(range2),"",false));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(range1),"",true));
+ rows.add(createNewRow(new HashSet<>(range1),"",true));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(range2),"",true));
+ rows.add(createNewRow(new HashSet<>(range2),"",true));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(ranges),"",true));
+ rows.add(createNewRow(new HashSet<>(ranges),"",true));
Dag dag = Dag.getDag(rows, ranges);
- Pair<List<Range>, Set<DagNode>> incompleteRangesAndDependents = dag.getIncompleteRangesAndDependents();
- List<Range> incomplete = incompleteRangesAndDependents.getKey();
+ Pair<Set<Range>, Set<DagNode>> incompleteRangesAndDependents = dag.getIncompleteRangesAndDependents();
+ Set<Range> incomplete = incompleteRangesAndDependents.getKey();
Set<DagNode> dependents = incompleteRangesAndDependents.getValue();
assertEquals(1,incomplete.size());
assertTrue(incomplete.contains(new Range("schema.range3")));
@@ -346,22 +346,22 @@ public class DagTest {
new Range("schema.range2"),
new Range("schema.range3")
));
- List<Range> ranges = new ArrayList<>( Arrays.asList(
+ Set<Range> ranges = new HashSet<>( Arrays.asList(
new Range("schema.range2"),
new Range("schema.range1")
));
- rows.add(createNewRow(new ArrayList<>(range1),"",false));
+ rows.add(createNewRow(new HashSet<>(range1),"",false));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(range2),"",false));
+ rows.add(createNewRow(new HashSet<>(range2),"",false));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(range1),"",true));
+ rows.add(createNewRow(new HashSet<>(range1),"",true));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(range2),"",true));
+ rows.add(createNewRow(new HashSet<>(range2),"",true));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(ranges),"",true));
+ rows.add(createNewRow(new HashSet<>(ranges),"",true));
Dag dag = Dag.getDag(rows, ranges);
- Pair<List<Range>, Set<DagNode>> incompleteRangesAndDependents = dag.getIncompleteRangesAndDependents();
- List<Range> incomplete = incompleteRangesAndDependents.getKey();
+ Pair<Set<Range>, Set<DagNode>> incompleteRangesAndDependents = dag.getIncompleteRangesAndDependents();
+ Set<Range> incomplete = incompleteRangesAndDependents.getKey();
Set<DagNode> dependents = incompleteRangesAndDependents.getValue();
assertEquals(2,incomplete.size());
assertTrue(incomplete.contains(new Range("schema.range3")));
@@ -381,26 +381,26 @@ public class DagTest {
new Range("schema.range2"),
new Range("schema.range3")
));
- List<Range> ranges = new ArrayList<>( Arrays.asList(
+ Set<Range> ranges = new HashSet<>( Arrays.asList(
new Range("schema.range2"),
new Range("schema.range1")
));
- List<Range> allRanges = new ArrayList<>( Arrays.asList(
+ Set<Range> allRanges = new HashSet<>( Arrays.asList(
new Range("schema.range2"),
new Range("schema.range3"),
new Range("schema.range1")
));
- rows.add(createNewRow(new ArrayList<>(range1),"",false));
+ rows.add(createNewRow(new HashSet<>(range1),"",false));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(range2),"",false));
+ rows.add(createNewRow(new HashSet<>(range2),"",false));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(range1),"",true));
+ rows.add(createNewRow(new HashSet<>(range1),"",true));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(range2),"",true));
+ rows.add(createNewRow(new HashSet<>(range2),"",true));
MILLISECONDS.sleep(10);
- rows.add(createNewRow(new ArrayList<>(ranges),"",true));
+ rows.add(createNewRow(new HashSet<>(ranges),"",true));
Dag dag = Dag.getDag(rows, ranges);
- MusicRangeInformationRow newRow = createNewRow(new ArrayList<>(allRanges), "", true);
+ MusicRangeInformationRow newRow = createNewRow(new HashSet<>(allRanges), "", true);
dag.addNewNodeWithSearch(newRow,allRanges);
DagNode newNode = dag.getNode(newRow.getPartitionIndex());
DagNode node = dag.getNode(rows.get(4).getPartitionIndex());
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java
index 2435762..2443d1e 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java
@@ -142,6 +142,7 @@ public class OwnershipAndCheckpointTest {
this.conn = MdbcTestUtils.getConnection(DBType.MySQL);
Properties info = new Properties();
this.mysqlMixin = new MySQLMixin(musicMixin, "localhost:"+MdbcTestUtils.getMariaDbPort()+"/"+DATABASE, conn, info);
+ this.mysqlMixin.initTables();
dropAndCreateTable();
}
@@ -170,7 +171,7 @@ public class OwnershipAndCheckpointTest {
}
}
- private OwnershipReturn cleanAndOwnPartition(List<Range> ranges, UUID ownOpId) throws SQLException {
+ private OwnershipReturn cleanAndOwnPartition(Set<Range> ranges, UUID ownOpId) throws SQLException {
dropAndCreateTable();
cleanAlreadyApplied(ownAndCheck);
DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey());
@@ -213,7 +214,7 @@ public class OwnershipAndCheckpointTest {
initDatabase(range);
- List<Range> ranges = new ArrayList<>();
+ Set<Range> ranges = new HashSet<>();
ranges.add(range);
UUID ownOpId = MDBCUtils.generateTimebasedUniqueKey();
OwnershipReturn own = cleanAndOwnPartition(ranges,ownOpId);
@@ -237,7 +238,7 @@ public class OwnershipAndCheckpointTest {
initDatabase(range);
- List<Range> ranges = new ArrayList<>();
+ Set<Range> ranges = new HashSet<>();
ranges.add(range);
UUID ownOpId = MDBCUtils.generateTimebasedUniqueKey();
OwnershipReturn own = cleanAndOwnPartition(ranges,ownOpId);
@@ -259,7 +260,7 @@ public class OwnershipAndCheckpointTest {
public void readOwn() throws Exception {
Range range = new Range("TABLE1");
MusicInterface mi = MdbcTestUtils.getMusicMixin();
- List<Range> ranges = new ArrayList<>();
+ Set<Range> ranges = new HashSet<>();
ranges.add(range);
final DatabasePartition partition = TestUtils.createBasicRow(range, mi, MdbcTestUtils.getServerName());
TestUtils.unlockRow(MdbcTestUtils.getKeyspace(), MdbcTestUtils.getMriTableName(), partition);