diff options
author | statta <statta@research.att.com> | 2019-03-21 17:57:57 -0400 |
---|---|---|
committer | Tschaen, Brendan <ctschaen@att.com> | 2019-03-22 11:07:17 -0400 |
commit | 66e0e407c48d2288ce0d7d6f8129487a29de8c02 (patch) | |
tree | 94555aadfdbc71c8574bbc509634ab0cb4f7e386 | |
parent | 6f363a89863a91ac104e5c77f0096636a8db5343 (diff) |
Insert/update interchange and PS issue
Issue-ID: MUSIC-276
Change-Id: I9c9b990cb362adae81e621db527a0f890e00c7eb
Signed-off-by: statta <statta@research.att.com>
4 files changed, 177 insertions, 109 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java index 1913811..102072c 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java @@ -25,7 +25,6 @@ import java.math.BigDecimal; import java.net.URL; import java.sql.Array; import java.sql.Blob; -import java.sql.CallableStatement; import java.sql.Clob; import java.sql.Connection; import java.sql.Date; @@ -43,9 +42,7 @@ import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; import java.util.Calendar; - import org.apache.commons.lang3.StringUtils; - import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; @@ -673,117 +670,117 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat @Override public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException { - ((CallableStatement)stmt).setTimestamp(parameterIndex, x, cal); + ((PreparedStatement)stmt).setTimestamp(parameterIndex, x, cal); } @Override public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException { - ((CallableStatement)stmt).setNull(parameterIndex, sqlType, typeName); + ((PreparedStatement)stmt).setNull(parameterIndex, sqlType, typeName); } @Override public void setURL(int parameterIndex, URL x) throws SQLException { - ((CallableStatement)stmt).setURL(parameterIndex, x); + ((PreparedStatement)stmt).setURL(parameterIndex, x); } @Override public ParameterMetaData getParameterMetaData() throws SQLException { - return ((CallableStatement)stmt).getParameterMetaData(); + return ((PreparedStatement)stmt).getParameterMetaData(); } @Override public void setRowId(int parameterIndex, RowId x) throws SQLException { - ((CallableStatement)stmt).setRowId(parameterIndex, x); + ((PreparedStatement)stmt).setRowId(parameterIndex, x); } @Override public void setNString(int parameterIndex, String value) throws SQLException { - ((CallableStatement)stmt).setNString(parameterIndex, value); + ((PreparedStatement)stmt).setNString(parameterIndex, value); } @Override public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException { - ((CallableStatement)stmt).setNCharacterStream(parameterIndex, value, length); + ((PreparedStatement)stmt).setNCharacterStream(parameterIndex, value, length); } @Override public void setNClob(int parameterIndex, NClob value) throws SQLException { - ((CallableStatement)stmt).setNClob(parameterIndex, value); + ((PreparedStatement)stmt).setNClob(parameterIndex, value); } @Override public void setClob(int parameterIndex, Reader reader, long length) throws SQLException { - ((CallableStatement)stmt).setClob(parameterIndex, reader, length); + ((PreparedStatement)stmt).setClob(parameterIndex, reader, length); } @Override public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException { - ((CallableStatement)stmt).setBlob(parameterIndex, inputStream, length); + ((PreparedStatement)stmt).setBlob(parameterIndex, inputStream, length); } @Override public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException { - ((CallableStatement)stmt).setNClob(parameterIndex, reader, length); + ((PreparedStatement)stmt).setNClob(parameterIndex, reader, length); } @Override public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException { - ((CallableStatement)stmt).setSQLXML(parameterIndex, xmlObject); + ((PreparedStatement)stmt).setSQLXML(parameterIndex, xmlObject); } @Override public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException { - ((CallableStatement)stmt).setObject(parameterIndex, x, targetSqlType, scaleOrLength); + ((PreparedStatement)stmt).setObject(parameterIndex, x, targetSqlType, scaleOrLength); } @Override public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException { - ((CallableStatement)stmt).setAsciiStream(parameterIndex, x, length); + ((PreparedStatement)stmt).setAsciiStream(parameterIndex, x, length); } @Override public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException { - ((CallableStatement)stmt).setBinaryStream(parameterIndex, x, length); + ((PreparedStatement)stmt).setBinaryStream(parameterIndex, x, length); } @Override public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException { - ((CallableStatement)stmt).setCharacterStream(parameterIndex, reader, length); + ((PreparedStatement)stmt).setCharacterStream(parameterIndex, reader, length); } @Override public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException { - ((CallableStatement)stmt).setAsciiStream(parameterIndex, x); + ((PreparedStatement)stmt).setAsciiStream(parameterIndex, x); } @Override public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException { - ((CallableStatement)stmt).setBinaryStream(parameterIndex, x); + ((PreparedStatement)stmt).setBinaryStream(parameterIndex, x); } @Override public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException { - ((CallableStatement)stmt).setCharacterStream(parameterIndex, reader); + ((PreparedStatement)stmt).setCharacterStream(parameterIndex, reader); } @Override public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException { - ((CallableStatement)stmt).setNCharacterStream(parameterIndex, value); + ((PreparedStatement)stmt).setNCharacterStream(parameterIndex, value); } @Override public void setClob(int parameterIndex, Reader reader) throws SQLException { - ((CallableStatement)stmt).setClob(parameterIndex, reader); + ((PreparedStatement)stmt).setClob(parameterIndex, reader); } @Override public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException { - ((CallableStatement)stmt).setBlob(parameterIndex, inputStream); + ((PreparedStatement)stmt).setBlob(parameterIndex, inputStream); } @Override public void setNClob(int parameterIndex, Reader reader) throws SQLException { - ((CallableStatement)stmt).setNClob(parameterIndex, reader); + ((PreparedStatement)stmt).setNClob(parameterIndex, reader); } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java index 85645f3..5ef2bc7 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java @@ -25,7 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; - +import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; import org.onap.music.mdbc.tables.StagingTable; @@ -115,12 +115,13 @@ public interface DBInterface { * Replay a given TxDigest into the local DB * @param digest * @throws SQLException if replay cannot occur correctly + * @throws MDBCServiceException */ - void replayTransaction(StagingTable digest, List<Range> ranges) throws SQLException; + void replayTransaction(StagingTable digest, List<Range> ranges) throws SQLException, MDBCServiceException; void disableForeignKeyChecks() throws SQLException; void enableForeignKeyChecks() throws SQLException; - void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException; + void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index ffb6c87..c6cc512 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -1300,10 +1300,15 @@ public class MusicMixin implements MusicInterface { } protected void changeIsLatestToMRI(MusicRangeInformationRow row, boolean isLatest, LockResult lock) throws MDBCServiceException{ + + if(lock == null) + return; PreparedQueryObject appendQuery = createChangeIsLatestToMriQuery(musicRangeInformationTableName, row.getPartitionIndex(), musicTxDigestTableName, isLatest); ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, row.getPartitionIndex().toString(), - appendQuery, lock.getOwnerId(), null); + appendQuery, + lock.getOwnerId() + , null); if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){ logger.error(EELFLoggerDelegate.errorLogger, "Error when executing change isLatest operation with return type: "+returnType.getMessage()); throw new MDBCServiceException("Error when executing change isLatest operation with return type: "+returnType.getMessage()); @@ -2057,8 +2062,11 @@ public class MusicMixin implements MusicInterface { } private void releaseLocks(List<MusicRangeInformationRow> changed, Map<UUID,LockResult> newLocks) throws MDBCServiceException{ + for(MusicRangeInformationRow r : changed) { LockResult lock = newLocks.get(r.getPartitionIndex()); + if(lock == null) + continue; unlockKeyInMusic(musicRangeInformationTableName, r.getPartitionIndex().toString(), lock.getOwnerId()); newLocks.remove(r.getPartitionIndex()); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java index 420f9d4..928ffa1 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java @@ -792,8 +792,9 @@ NEW.field refers to the new value /** * Parse the transaction digest into individual events * @param transaction - base 64 encoded, serialized digest + * @throws MDBCServiceException */ - public void replayTransaction(StagingTable transaction, List<Range> ranges) throws SQLException { + public void replayTransaction(StagingTable transaction, List<Range> ranges) throws SQLException, MDBCServiceException { boolean autocommit = jdbcConn.getAutoCommit(); jdbcConn.setAutoCommit(false); Statement jdbcStmt = jdbcConn.createStatement(); @@ -803,7 +804,7 @@ NEW.field refers to the new value if(Range.overlaps(ranges,op.getTable())) { try { replayOperationIntoDB(jdbcStmt, op); - } catch (SQLException e) { + } catch (SQLException | MDBCServiceException e) { //rollback transaction logger.error("Unable to replay: " + op.getOperationType() + "->" + op.getVal() + "." + "Rolling back the entire digest replay."); @@ -835,87 +836,148 @@ NEW.field refers to the new value } @Override - public void applyTxDigest(StagingTable txDigest,List<Range> ranges) throws SQLException { + public void applyTxDigest(StagingTable txDigest,List<Range> ranges) throws SQLException, MDBCServiceException { replayTransaction(txDigest,ranges); } /** - * Replays operation into database, usually from txDigest - * @param jdbcStmt - * @param r - * @param op - * @throws SQLException - */ - private void replayOperationIntoDB(Statement jdbcStmt, Operation op) throws SQLException { - logger.info("Replaying Operation: " + op.getOperationType() + "->" + op.getVal()); - JSONObject jsonOp = op.getVal(); - - ArrayList<String> cols = new ArrayList<String>(); - ArrayList<Object> vals = new ArrayList<Object>(); - Iterator<String> colIterator = jsonOp.keys(); - while(colIterator.hasNext()) { - String col = colIterator.next(); - //FIXME: should not explicitly refer to cassandramixin - if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) { - //reserved name - continue; - } - cols.add(col); - vals.add(jsonOp.get(col)); - } - - //build the queries - StringBuilder sql = new StringBuilder(); - String sep = ""; - switch (op.getOperationType()) { - case INSERT: - sql.append(op.getOperationType() + " INTO "); - sql.append(op.getTable() + " (") ; - sep = ""; - for (String col: cols) { - sql.append(sep + col); - sep = ", "; - } - sql.append(") VALUES ("); - sep = ""; - for (Object val: vals) { - sql.append(sep + "\"" + val + "\""); - sep = ", "; - } - sql.append(");"); - break; - case UPDATE: - sql.append(op.getOperationType() + " "); - sql.append(op.getTable() + " SET "); - sep=""; - for (int i=0; i<cols.size(); i++) { - sql.append(sep + cols.get(i) + "=\"" + vals.get(i) +"\""); - sep = ", "; - } - sql.append(" WHERE "); - try { - sql.append(getPrimaryKeyConditional(op.getKey())); - } catch (MDBCServiceException e) { - throw new SQLException("Update operatoin doesn't contain the required primary key",e); - } - sql.append(";"); - break; - case DELETE: - sql.append(op.getOperationType() + " FROM "); - sql.append(op.getTable() + " WHERE "); - sql.append(getPrimaryKeyConditional(op.getVal())); - sql.append(";"); - break; - case SELECT: - //no update happened, do nothing - return; - default: - logger.error(op.getOperationType() + "not implemented for replay"); - } - logger.info("Replaying operation: " + sql.toString()); - - jdbcStmt.executeQuery(sql.toString()); - } + * Replays operation into database, usually from txDigest + * @param jdbcStmt + * @param r + * @param op + * @throws SQLException + * @throws MDBCServiceException + */ + private void replayOperationIntoDB(Statement jdbcStmt, Operation op) throws SQLException, MDBCServiceException { + logger.info("Replaying Operation: " + op.getOperationType() + "->" + op.getVal()); + JSONObject jsonOp = op.getVal(); + + ArrayList<String> cols = new ArrayList<String>(); + ArrayList<Object> vals = new ArrayList<Object>(); + Iterator<String> colIterator = jsonOp.keys(); + while(colIterator.hasNext()) { + String col = colIterator.next(); + //FIXME: should not explicitly refer to cassandramixin + if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) { + //reserved name + continue; + } + cols.add(col); + vals.add(jsonOp.get(col)); + } + + //build and replay the queries + StringBuilder sql = constructSQL(op, cols, vals); + if(sql == null) + return; + + try { + logger.info("Replaying operation: " + sql.toString()); + int updated = jdbcStmt.executeUpdate(sql.toString()); + + if(updated == 0) { + // This applies only for replaying transactions involving Eventually Consistent tables + logger.warn("Error Replaying operation: " + sql.toString() + "; Replacing insert/replace/viceversa and replaying "); + + buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals); + } + } catch (SQLException sqlE) { + // This applies only for replaying transactions involving Eventually Consistent tables + logger.warn("Error Replaying operation: " + sql.toString() + "; Replacing insert/replace/viceversa and replaying "); + + buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals); + + } + } + protected void buildAndExecuteSQLInverse(Statement jdbcStmt, Operation op, + ArrayList<String> cols, ArrayList<Object> vals) throws SQLException, MDBCServiceException { + StringBuilder sqlInverse = constructSQLInverse( op, cols, vals); + if(sqlInverse == null) + return; + logger.info("Replaying operation: " + sqlInverse.toString()); + jdbcStmt.executeUpdate(sqlInverse.toString()); + } + protected StringBuilder constructSQLInverse(Operation op, ArrayList<String> cols, + ArrayList<Object> vals) throws MDBCServiceException { + StringBuilder sqlInverse = null; + switch (op.getOperationType()) { + case INSERT: + sqlInverse = constructUpdate(op.getTable() , OperationType.UPDATE, op.getKey(), cols, vals); + break; + case UPDATE: + sqlInverse = constructInsert(op.getTable() , OperationType.INSERT, cols, vals); + break; + default: + break; + } + return sqlInverse; + } + protected StringBuilder constructSQL(Operation op, ArrayList<String> cols, + ArrayList<Object> vals) throws MDBCServiceException { + StringBuilder sql = null; + switch (op.getOperationType()) { + case INSERT: + sql = constructInsert(op.getTable(), op.getOperationType(), cols, vals); + break; + case UPDATE: + sql = constructUpdate(op.getTable(), op.getOperationType(), op.getKey(), cols, vals); + break; + case DELETE: + sql = constructDelete(op.getTable(), op.getOperationType(), op.getKey()); + break; + case SELECT: + //no update happened, do nothing + break; + default: + logger.error(op.getOperationType() + "not implemented for replay"); + } + return sql; + } + private StringBuilder constructDelete(String r, OperationType op, JSONObject key) { + StringBuilder sql = new StringBuilder(); + sql.append(op + " FROM "); + sql.append(r + " WHERE "); + sql.append(getPrimaryKeyConditional(key)); + sql.append(";"); + return sql; + } + private StringBuilder constructInsert(String r, OperationType op, ArrayList<String> cols, + ArrayList<Object> vals) { + StringBuilder sql = new StringBuilder(); + String sep; + sql.append(op + " INTO "); + sql.append(r + " (") ; + sep = ""; + for (String col: cols) { + sql.append(sep + col); + sep = ", "; + } + sql.append(") VALUES ("); + sep = ""; + for (Object val: vals) { + sql.append(sep + "\"" + val + "\""); + sep = ", "; + } + sql.append(");"); + return sql; + } + private StringBuilder constructUpdate(String r, OperationType op, JSONObject key, ArrayList<String> cols, + ArrayList<Object> vals) { + StringBuilder sql = new StringBuilder(); + String sep; + sql.append(op + " "); + sql.append(r + " SET "); + sep=""; + for (int i=0; i<cols.size(); i++) { + sql.append(sep + cols.get(i) + "=\"" + vals.get(i) +"\""); + sep = ", "; + } + sql.append(" WHERE "); + sql.append(getPrimaryKeyConditional(key)); + sql.append(";"); + + return sql; + } /** * Create an SQL string for AND'ing all of the primary keys |