diff options
Diffstat (limited to 'mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java')
-rwxr-xr-x | mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java | 137 |
1 files changed, 93 insertions, 44 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java index ec91ceb..b544b94 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java @@ -35,9 +35,9 @@ import java.util.Properties; import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang3.tuple.Pair; import org.json.JSONObject; -import org.json.JSONTokener; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.mdbc.Configuration; @@ -45,7 +45,8 @@ import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; import org.onap.music.mdbc.query.SQLOperation; -import org.onap.music.mdbc.query.SQLOperationType; +import org.onap.music.mdbc.tables.MriReference; +import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.Operation; import org.onap.music.mdbc.tables.StagingTable; import net.sf.jsqlparser.JSQLParserException; @@ -87,7 +88,7 @@ public class MySQLMixin implements DBInterface { + "CONNECTION_ID INT, PRIMARY KEY (IX));"; private static final String CKPT_TBL = "MDBC_CHECKPOINT"; private static final String CREATE_CKPT_SQL = - "CREATE TABLE IF NOT EXISTS " + CKPT_TBL + " (RANGENAME VARCHAR(64) PRIMARY KEY, MRIROW VARCHAR(36), DIGESTINDEX INT);"; + "CREATE TABLE IF NOT EXISTS " + CKPT_TBL + " (RANGENAME VARCHAR(64) PRIMARY KEY, MRIROW VARCHAR(36), DIGESTID VARCHAR(36));"; private final MusicInterface mi; private final int connId; @@ -187,7 +188,7 @@ public class MySQLMixin implements DBInterface { String dbname = "mdbc"; // default name try { Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT DATABASE() AS DB"); + ResultSet rs = stmt.executeQuery("SELECT UPPER(DATABASE()) AS DB"); if (rs.next()) { dbname = rs.getString("DB"); } @@ -214,7 +215,7 @@ public class MySQLMixin implements DBInterface { public Set<String> getSQLTableSet() { Set<String> set = new TreeSet<String>(); String sql = - "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'"; + "SELECT CONCAT(UPPER(TABLE_SCHEMA), '.', UPPER(TABLE_NAME)) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE';"; try { Statement stmt = jdbcConn.createStatement(); ResultSet rs = stmt.executeQuery(sql); @@ -234,7 +235,7 @@ public class MySQLMixin implements DBInterface { public Set<Range> getSQLRangeSet() { Set<String> set = new TreeSet<String>(); String sql = - "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'"; + "SELECT CONCAT(UPPER(TABLE_SCHEMA), '.', UPPER(TABLE_NAME)) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE';"; try { Statement stmt = jdbcConn.createStatement(); ResultSet rs = stmt.executeQuery(sql); @@ -249,7 +250,10 @@ public class MySQLMixin implements DBInterface { logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set); Set<Range> rangeSet = new HashSet<>(); for (String table : set) { - rangeSet.add(new Range(table)); + if (!getReservedTblNames().contains(table)) { + // Don't create triggers for the table the triggers write into!!! + rangeSet.add(new Range(table)); + } } return rangeSet; } @@ -814,15 +818,31 @@ public class MySQLMixin implements DBInterface { private ArrayList<String> getMusicKey(String tbl, String cmd, String sql) { ArrayList<String> musicKeys = new ArrayList<String>(); /* - * if (cmd.equalsIgnoreCase("insert")) { //create key, return key musicKeys.add(msm.generatePrimaryKey()); } - * else if (cmd.equalsIgnoreCase("update") || cmd.equalsIgnoreCase("delete")) { try { - * net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql); String where; if (stmt instanceof - * Update) { where = ((Update) stmt).getWhere().toString(); } else if (stmt instanceof Delete) { where = - * ((Delete) stmt).getWhere().toString(); } else { System.err.println("Unknown type: " +stmt.getClass()); where - * = ""; } ResultSet rs = executeSQLRead("SELECT * FROM " + tbl + " WHERE " + where); musicKeys = - * msm.getMusicKeysWhere(tbl, Utils.parseResults(getTableInfo(tbl), rs)); } catch (JSQLParserException e) { - * - * e.printStackTrace(); } catch (SQLException e) { //Not a valid sql query e.printStackTrace(); } } + if (cmd.equalsIgnoreCase("insert")) { + //create key, return key + musicKeys.add(msm.generatePrimaryKey()); + } else if (cmd.equalsIgnoreCase("update") || cmd.equalsIgnoreCase("delete")) { + try { + net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql); + String where; + if (stmt instanceof Update) { + where = ((Update) stmt).getWhere().toString(); + } else if (stmt instanceof Delete) { + where = ((Delete) stmt).getWhere().toString(); + } else { + System.err.println("Unknown type: " +stmt.getClass()); + where = ""; + } + ResultSet rs = executeSQLRead("SELECT * FROM " + tbl + " WHERE " + where); + musicKeys = msm.getMusicKeysWhere(tbl, Utils.parseResults(getTableInfo(tbl), rs)); + } catch (JSQLParserException e) { + + e.printStackTrace(); + } catch (SQLException e) { + //Not a valid sql query + e.printStackTrace(); + } + } */ return musicKeys; } @@ -877,7 +897,7 @@ public class MySQLMixin implements DBInterface { * @param transaction - base 64 encoded, serialized digest * @throws MDBCServiceException */ - public void replayTransaction(StagingTable transaction, Set<Range> ranges) + public void replayTransaction(StagingTable transaction) throws SQLException, MDBCServiceException { boolean autocommit = jdbcConn.getAutoCommit(); jdbcConn.setAutoCommit(false); @@ -885,7 +905,6 @@ public class MySQLMixin implements DBInterface { ArrayList<Operation> opList = transaction.getOperationList(); for (Operation op : opList) { - if (Range.overlaps(ranges, op.getTable())) { try { replayOperationIntoDB(jdbcStmt, op); } catch (SQLException | MDBCServiceException e) { @@ -895,7 +914,6 @@ public class MySQLMixin implements DBInterface { jdbcConn.rollback(); throw e; } - } } clearReplayedOperations(jdbcStmt); @@ -920,8 +938,8 @@ public class MySQLMixin implements DBInterface { } @Override - public void applyTxDigest(StagingTable txDigest, Set<Range> ranges) throws SQLException, MDBCServiceException { - replayTransaction(txDigest, ranges); + public void applyTxDigest(StagingTable txDigest) throws SQLException, MDBCServiceException { + replayTransaction(txDigest); } /** @@ -939,17 +957,7 @@ public class MySQLMixin implements DBInterface { ArrayList<String> cols = new ArrayList<String>(); ArrayList<Object> vals = new ArrayList<Object>(); - Iterator<String> colIterator = jsonOp.keys(); - while (colIterator.hasNext()) { - String col = colIterator.next(); - // FIXME: should not explicitly refer to cassandramixin - if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) { - // reserved name - continue; - } - cols.add(col); - vals.add(jsonOp.get(col)); - } + constructColValues(jsonOp, cols, vals); // build and replay the queries StringBuilder sql = constructSQL(op, cols, vals); @@ -965,7 +973,11 @@ public class MySQLMixin implements DBInterface { logger.warn("Error Replaying operation: " + sql.toString() + "; Replacing insert/replace/viceversa and replaying "); - buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals); + try { + buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals); + } catch (Exception e) { + logger.warn(" Error replaying inverse operation; " + sql + "Ignore the exception"); + } } } catch (SQLException sqlE) { // This applies for replaying transactions involving Eventually Consistent tables @@ -977,6 +989,20 @@ public class MySQLMixin implements DBInterface { } } + public void constructColValues(JSONObject jsonOp, ArrayList<String> cols, + ArrayList<Object> vals) { + Iterator<String> colIterator = jsonOp.keys(); + while(colIterator.hasNext()) { + String col = colIterator.next(); + //FIXME: should not explicitly refer to cassandramixin + if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) { + //reserved name + continue; + } + cols.add(col); + vals.add(jsonOp.get(col)); + } + } protected void buildAndExecuteSQLInverse(Statement jdbcStmt, Operation op, ArrayList<String> cols, ArrayList<Object> vals) throws SQLException, MDBCServiceException { @@ -999,7 +1025,7 @@ public class MySQLMixin implements DBInterface { * @throws MDBCServiceException */ - protected StringBuilder constructSQLInverse(Operation op, ArrayList<String> cols, ArrayList<Object> vals) + public StringBuilder constructSQLInverse(Operation op, ArrayList<String> cols, ArrayList<Object> vals) throws MDBCServiceException { StringBuilder sqlInverse = null; switch (op.getOperationType()) { @@ -1015,7 +1041,7 @@ public class MySQLMixin implements DBInterface { return sqlInverse; } - protected StringBuilder constructSQL(Operation op, ArrayList<String> cols, ArrayList<Object> vals) + public StringBuilder constructSQL(Operation op, ArrayList<String> cols, ArrayList<Object> vals) throws MDBCServiceException { StringBuilder sql = null; switch (op.getOperationType()) { @@ -1059,7 +1085,7 @@ public class MySQLMixin implements DBInterface { sql.append(") VALUES ("); sep = ""; for (Object val : vals) { - sql.append(sep + "\"" + val + "\""); + sql.append(sep + (val!=JSONObject.NULL?"\"" + val +"\"":"null")); sep = ", "; } sql.append(");"); @@ -1074,7 +1100,7 @@ public class MySQLMixin implements DBInterface { sql.append(r + " SET "); sep = ""; for (int i = 0; i < cols.size(); i++) { - sql.append(sep + cols.get(i) + "=\"" + vals.get(i) + "\""); + sql.append(sep + cols.get(i) + (vals.get(i)!=JSONObject.NULL?"=\"" + vals.get(i) +"\"":"=null")); sep = ", "; } sql.append(" WHERE "); @@ -1095,7 +1121,7 @@ public class MySQLMixin implements DBInterface { String and = ""; for (String key : primaryKeys.keySet()) { // We cannot use the default primary key for the sql table and operations - if (!key.equals(mi.getMusicDefaultPrimaryKeyName())) { + if(!key.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) { Object val = primaryKeys.get(key); keyCondStmt.append(and + key + "=\"" + val + "\""); and = " AND "; @@ -1122,12 +1148,12 @@ public class MySQLMixin implements DBInterface { } @Override - public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) { - String query = "UPDATE " + CKPT_TBL + " SET MRIROW=?, DIGESTINDEX=? where RANGENAME=?;"; + public void updateCheckpointLocations(Range r, Pair<MriReference, MusicTxDigestId> playbackPointer) { + String query = "UPDATE " + CKPT_TBL + " SET MRIROW=?, DIGESTID=? where RANGENAME=?;"; try { PreparedStatement stmt = jdbcConn.prepareStatement(query); - stmt.setString(1, playbackPointer.getLeft().toString()); - stmt.setInt(2, playbackPointer.getRight()); + stmt.setString(1, playbackPointer.getLeft().getIndex().toString()); + stmt.setString(2, playbackPointer.getRight().transactionId.toString()); stmt.setString(3, r.getTable()); stmt.execute(); stmt.close(); @@ -1137,6 +1163,30 @@ public class MySQLMixin implements DBInterface { } @Override + public Map<Range, Pair<MriReference, MusicTxDigestId>> getCheckpointLocations() { + Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = new ConcurrentHashMap<>(); + try { + Statement stmt = jdbcConn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT * FROM " + CKPT_TBL + ";"); + while (rs.next()) { + Range r = new Range(rs.getString("RANGENAME")); + String mrirow = rs.getString("MRIROW"); + String txId = rs.getString("DIGESTID"); + if (mrirow!=null) { + logger.info(EELFLoggerDelegate.applicationLogger, + "Previously checkpointed: " + r.getTable() + " at (" + mrirow + ", " + txId + ")"); + alreadyApplied.put(r, Pair.of(new MriReference(mrirow), new MusicTxDigestId(mrirow, txId, -1))); + } + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Unable to get replay checkpoint location", e); + } + + return alreadyApplied; + } + + @Override public void initTables() { try { Statement stmt = jdbcConn.createStatement(); @@ -1161,5 +1211,4 @@ public class MySQLMixin implements DBInterface { logger.error(EELFLoggerDelegate.errorLogger, "initTables: problem creating th mdbc tables!"); } } - -} +}
\ No newline at end of file |