aboutsummaryrefslogtreecommitdiffstats
path: root/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
diff options
context:
space:
mode:
Diffstat (limited to 'mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java')
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java137
1 files changed, 93 insertions, 44 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
index ec91ceb..b544b94 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
@@ -35,9 +35,9 @@ import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONObject;
-import org.json.JSONTokener;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.Configuration;
@@ -45,7 +45,8 @@ import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
import org.onap.music.mdbc.query.SQLOperation;
-import org.onap.music.mdbc.query.SQLOperationType;
+import org.onap.music.mdbc.tables.MriReference;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.Operation;
import org.onap.music.mdbc.tables.StagingTable;
import net.sf.jsqlparser.JSQLParserException;
@@ -87,7 +88,7 @@ public class MySQLMixin implements DBInterface {
+ "CONNECTION_ID INT, PRIMARY KEY (IX));";
private static final String CKPT_TBL = "MDBC_CHECKPOINT";
private static final String CREATE_CKPT_SQL =
- "CREATE TABLE IF NOT EXISTS " + CKPT_TBL + " (RANGENAME VARCHAR(64) PRIMARY KEY, MRIROW VARCHAR(36), DIGESTINDEX INT);";
+ "CREATE TABLE IF NOT EXISTS " + CKPT_TBL + " (RANGENAME VARCHAR(64) PRIMARY KEY, MRIROW VARCHAR(36), DIGESTID VARCHAR(36));";
private final MusicInterface mi;
private final int connId;
@@ -187,7 +188,7 @@ public class MySQLMixin implements DBInterface {
String dbname = "mdbc"; // default name
try {
Statement stmt = conn.createStatement();
- ResultSet rs = stmt.executeQuery("SELECT DATABASE() AS DB");
+ ResultSet rs = stmt.executeQuery("SELECT UPPER(DATABASE()) AS DB");
if (rs.next()) {
dbname = rs.getString("DB");
}
@@ -214,7 +215,7 @@ public class MySQLMixin implements DBInterface {
public Set<String> getSQLTableSet() {
Set<String> set = new TreeSet<String>();
String sql =
- "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
+ "SELECT CONCAT(UPPER(TABLE_SCHEMA), '.', UPPER(TABLE_NAME)) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE';";
try {
Statement stmt = jdbcConn.createStatement();
ResultSet rs = stmt.executeQuery(sql);
@@ -234,7 +235,7 @@ public class MySQLMixin implements DBInterface {
public Set<Range> getSQLRangeSet() {
Set<String> set = new TreeSet<String>();
String sql =
- "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
+ "SELECT CONCAT(UPPER(TABLE_SCHEMA), '.', UPPER(TABLE_NAME)) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE';";
try {
Statement stmt = jdbcConn.createStatement();
ResultSet rs = stmt.executeQuery(sql);
@@ -249,7 +250,10 @@ public class MySQLMixin implements DBInterface {
logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set);
Set<Range> rangeSet = new HashSet<>();
for (String table : set) {
- rangeSet.add(new Range(table));
+ if (!getReservedTblNames().contains(table)) {
+ // Don't create triggers for the table the triggers write into!!!
+ rangeSet.add(new Range(table));
+ }
}
return rangeSet;
}
@@ -814,15 +818,31 @@ public class MySQLMixin implements DBInterface {
private ArrayList<String> getMusicKey(String tbl, String cmd, String sql) {
ArrayList<String> musicKeys = new ArrayList<String>();
/*
- * if (cmd.equalsIgnoreCase("insert")) { //create key, return key musicKeys.add(msm.generatePrimaryKey()); }
- * else if (cmd.equalsIgnoreCase("update") || cmd.equalsIgnoreCase("delete")) { try {
- * net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql); String where; if (stmt instanceof
- * Update) { where = ((Update) stmt).getWhere().toString(); } else if (stmt instanceof Delete) { where =
- * ((Delete) stmt).getWhere().toString(); } else { System.err.println("Unknown type: " +stmt.getClass()); where
- * = ""; } ResultSet rs = executeSQLRead("SELECT * FROM " + tbl + " WHERE " + where); musicKeys =
- * msm.getMusicKeysWhere(tbl, Utils.parseResults(getTableInfo(tbl), rs)); } catch (JSQLParserException e) {
- *
- * e.printStackTrace(); } catch (SQLException e) { //Not a valid sql query e.printStackTrace(); } }
+ if (cmd.equalsIgnoreCase("insert")) {
+ //create key, return key
+ musicKeys.add(msm.generatePrimaryKey());
+ } else if (cmd.equalsIgnoreCase("update") || cmd.equalsIgnoreCase("delete")) {
+ try {
+ net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql);
+ String where;
+ if (stmt instanceof Update) {
+ where = ((Update) stmt).getWhere().toString();
+ } else if (stmt instanceof Delete) {
+ where = ((Delete) stmt).getWhere().toString();
+ } else {
+ System.err.println("Unknown type: " +stmt.getClass());
+ where = "";
+ }
+ ResultSet rs = executeSQLRead("SELECT * FROM " + tbl + " WHERE " + where);
+ musicKeys = msm.getMusicKeysWhere(tbl, Utils.parseResults(getTableInfo(tbl), rs));
+ } catch (JSQLParserException e) {
+
+ e.printStackTrace();
+ } catch (SQLException e) {
+ //Not a valid sql query
+ e.printStackTrace();
+ }
+ }
*/
return musicKeys;
}
@@ -877,7 +897,7 @@ public class MySQLMixin implements DBInterface {
* @param transaction - base 64 encoded, serialized digest
* @throws MDBCServiceException
*/
- public void replayTransaction(StagingTable transaction, Set<Range> ranges)
+ public void replayTransaction(StagingTable transaction)
throws SQLException, MDBCServiceException {
boolean autocommit = jdbcConn.getAutoCommit();
jdbcConn.setAutoCommit(false);
@@ -885,7 +905,6 @@ public class MySQLMixin implements DBInterface {
ArrayList<Operation> opList = transaction.getOperationList();
for (Operation op : opList) {
- if (Range.overlaps(ranges, op.getTable())) {
try {
replayOperationIntoDB(jdbcStmt, op);
} catch (SQLException | MDBCServiceException e) {
@@ -895,7 +914,6 @@ public class MySQLMixin implements DBInterface {
jdbcConn.rollback();
throw e;
}
- }
}
clearReplayedOperations(jdbcStmt);
@@ -920,8 +938,8 @@ public class MySQLMixin implements DBInterface {
}
@Override
- public void applyTxDigest(StagingTable txDigest, Set<Range> ranges) throws SQLException, MDBCServiceException {
- replayTransaction(txDigest, ranges);
+ public void applyTxDigest(StagingTable txDigest) throws SQLException, MDBCServiceException {
+ replayTransaction(txDigest);
}
/**
@@ -939,17 +957,7 @@ public class MySQLMixin implements DBInterface {
ArrayList<String> cols = new ArrayList<String>();
ArrayList<Object> vals = new ArrayList<Object>();
- Iterator<String> colIterator = jsonOp.keys();
- while (colIterator.hasNext()) {
- String col = colIterator.next();
- // FIXME: should not explicitly refer to cassandramixin
- if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) {
- // reserved name
- continue;
- }
- cols.add(col);
- vals.add(jsonOp.get(col));
- }
+ constructColValues(jsonOp, cols, vals);
// build and replay the queries
StringBuilder sql = constructSQL(op, cols, vals);
@@ -965,7 +973,11 @@ public class MySQLMixin implements DBInterface {
logger.warn("Error Replaying operation: " + sql.toString()
+ "; Replacing insert/replace/viceversa and replaying ");
- buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals);
+ try {
+ buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals);
+ } catch (Exception e) {
+ logger.warn(" Error replaying inverse operation; " + sql + "Ignore the exception");
+ }
}
} catch (SQLException sqlE) {
// This applies for replaying transactions involving Eventually Consistent tables
@@ -977,6 +989,20 @@ public class MySQLMixin implements DBInterface {
}
}
+ public void constructColValues(JSONObject jsonOp, ArrayList<String> cols,
+ ArrayList<Object> vals) {
+ Iterator<String> colIterator = jsonOp.keys();
+ while(colIterator.hasNext()) {
+ String col = colIterator.next();
+ //FIXME: should not explicitly refer to cassandramixin
+ if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) {
+ //reserved name
+ continue;
+ }
+ cols.add(col);
+ vals.add(jsonOp.get(col));
+ }
+ }
protected void buildAndExecuteSQLInverse(Statement jdbcStmt, Operation op, ArrayList<String> cols,
ArrayList<Object> vals) throws SQLException, MDBCServiceException {
@@ -999,7 +1025,7 @@ public class MySQLMixin implements DBInterface {
* @throws MDBCServiceException
*/
- protected StringBuilder constructSQLInverse(Operation op, ArrayList<String> cols, ArrayList<Object> vals)
+ public StringBuilder constructSQLInverse(Operation op, ArrayList<String> cols, ArrayList<Object> vals)
throws MDBCServiceException {
StringBuilder sqlInverse = null;
switch (op.getOperationType()) {
@@ -1015,7 +1041,7 @@ public class MySQLMixin implements DBInterface {
return sqlInverse;
}
- protected StringBuilder constructSQL(Operation op, ArrayList<String> cols, ArrayList<Object> vals)
+ public StringBuilder constructSQL(Operation op, ArrayList<String> cols, ArrayList<Object> vals)
throws MDBCServiceException {
StringBuilder sql = null;
switch (op.getOperationType()) {
@@ -1059,7 +1085,7 @@ public class MySQLMixin implements DBInterface {
sql.append(") VALUES (");
sep = "";
for (Object val : vals) {
- sql.append(sep + "\"" + val + "\"");
+ sql.append(sep + (val!=JSONObject.NULL?"\"" + val +"\"":"null"));
sep = ", ";
}
sql.append(");");
@@ -1074,7 +1100,7 @@ public class MySQLMixin implements DBInterface {
sql.append(r + " SET ");
sep = "";
for (int i = 0; i < cols.size(); i++) {
- sql.append(sep + cols.get(i) + "=\"" + vals.get(i) + "\"");
+ sql.append(sep + cols.get(i) + (vals.get(i)!=JSONObject.NULL?"=\"" + vals.get(i) +"\"":"=null"));
sep = ", ";
}
sql.append(" WHERE ");
@@ -1095,7 +1121,7 @@ public class MySQLMixin implements DBInterface {
String and = "";
for (String key : primaryKeys.keySet()) {
// We cannot use the default primary key for the sql table and operations
- if (!key.equals(mi.getMusicDefaultPrimaryKeyName())) {
+ if(!key.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) {
Object val = primaryKeys.get(key);
keyCondStmt.append(and + key + "=\"" + val + "\"");
and = " AND ";
@@ -1122,12 +1148,12 @@ public class MySQLMixin implements DBInterface {
}
@Override
- public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) {
- String query = "UPDATE " + CKPT_TBL + " SET MRIROW=?, DIGESTINDEX=? where RANGENAME=?;";
+ public void updateCheckpointLocations(Range r, Pair<MriReference, MusicTxDigestId> playbackPointer) {
+ String query = "UPDATE " + CKPT_TBL + " SET MRIROW=?, DIGESTID=? where RANGENAME=?;";
try {
PreparedStatement stmt = jdbcConn.prepareStatement(query);
- stmt.setString(1, playbackPointer.getLeft().toString());
- stmt.setInt(2, playbackPointer.getRight());
+ stmt.setString(1, playbackPointer.getLeft().getIndex().toString());
+ stmt.setString(2, playbackPointer.getRight().transactionId.toString());
stmt.setString(3, r.getTable());
stmt.execute();
stmt.close();
@@ -1137,6 +1163,30 @@ public class MySQLMixin implements DBInterface {
}
@Override
+ public Map<Range, Pair<MriReference, MusicTxDigestId>> getCheckpointLocations() {
+ Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = new ConcurrentHashMap<>();
+ try {
+ Statement stmt = jdbcConn.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT * FROM " + CKPT_TBL + ";");
+ while (rs.next()) {
+ Range r = new Range(rs.getString("RANGENAME"));
+ String mrirow = rs.getString("MRIROW");
+ String txId = rs.getString("DIGESTID");
+ if (mrirow!=null) {
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "Previously checkpointed: " + r.getTable() + " at (" + mrirow + ", " + txId + ")");
+ alreadyApplied.put(r, Pair.of(new MriReference(mrirow), new MusicTxDigestId(mrirow, txId, -1)));
+ }
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Unable to get replay checkpoint location", e);
+ }
+
+ return alreadyApplied;
+ }
+
+ @Override
public void initTables() {
try {
Statement stmt = jdbcConn.createStatement();
@@ -1161,5 +1211,4 @@ public class MySQLMixin implements DBInterface {
logger.error(EELFLoggerDelegate.errorLogger, "initTables: problem creating th mdbc tables!");
}
}
-
-}
+} \ No newline at end of file