aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java49
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java7
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java10
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java220
4 files changed, 177 insertions, 109 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java
index 1913811..102072c 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java
@@ -25,7 +25,6 @@ import java.math.BigDecimal;
import java.net.URL;
import java.sql.Array;
import java.sql.Blob;
-import java.sql.CallableStatement;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.Date;
@@ -43,9 +42,7 @@ import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Calendar;
-
import org.apache.commons.lang3.StringUtils;
-
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
@@ -673,117 +670,117 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
@Override
public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
- ((CallableStatement)stmt).setTimestamp(parameterIndex, x, cal);
+ ((PreparedStatement)stmt).setTimestamp(parameterIndex, x, cal);
}
@Override
public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {
- ((CallableStatement)stmt).setNull(parameterIndex, sqlType, typeName);
+ ((PreparedStatement)stmt).setNull(parameterIndex, sqlType, typeName);
}
@Override
public void setURL(int parameterIndex, URL x) throws SQLException {
- ((CallableStatement)stmt).setURL(parameterIndex, x);
+ ((PreparedStatement)stmt).setURL(parameterIndex, x);
}
@Override
public ParameterMetaData getParameterMetaData() throws SQLException {
- return ((CallableStatement)stmt).getParameterMetaData();
+ return ((PreparedStatement)stmt).getParameterMetaData();
}
@Override
public void setRowId(int parameterIndex, RowId x) throws SQLException {
- ((CallableStatement)stmt).setRowId(parameterIndex, x);
+ ((PreparedStatement)stmt).setRowId(parameterIndex, x);
}
@Override
public void setNString(int parameterIndex, String value) throws SQLException {
- ((CallableStatement)stmt).setNString(parameterIndex, value);
+ ((PreparedStatement)stmt).setNString(parameterIndex, value);
}
@Override
public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException {
- ((CallableStatement)stmt).setNCharacterStream(parameterIndex, value, length);
+ ((PreparedStatement)stmt).setNCharacterStream(parameterIndex, value, length);
}
@Override
public void setNClob(int parameterIndex, NClob value) throws SQLException {
- ((CallableStatement)stmt).setNClob(parameterIndex, value);
+ ((PreparedStatement)stmt).setNClob(parameterIndex, value);
}
@Override
public void setClob(int parameterIndex, Reader reader, long length) throws SQLException {
- ((CallableStatement)stmt).setClob(parameterIndex, reader, length);
+ ((PreparedStatement)stmt).setClob(parameterIndex, reader, length);
}
@Override
public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException {
- ((CallableStatement)stmt).setBlob(parameterIndex, inputStream, length);
+ ((PreparedStatement)stmt).setBlob(parameterIndex, inputStream, length);
}
@Override
public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {
- ((CallableStatement)stmt).setNClob(parameterIndex, reader, length);
+ ((PreparedStatement)stmt).setNClob(parameterIndex, reader, length);
}
@Override
public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
- ((CallableStatement)stmt).setSQLXML(parameterIndex, xmlObject);
+ ((PreparedStatement)stmt).setSQLXML(parameterIndex, xmlObject);
}
@Override
public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException {
- ((CallableStatement)stmt).setObject(parameterIndex, x, targetSqlType, scaleOrLength);
+ ((PreparedStatement)stmt).setObject(parameterIndex, x, targetSqlType, scaleOrLength);
}
@Override
public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {
- ((CallableStatement)stmt).setAsciiStream(parameterIndex, x, length);
+ ((PreparedStatement)stmt).setAsciiStream(parameterIndex, x, length);
}
@Override
public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
- ((CallableStatement)stmt).setBinaryStream(parameterIndex, x, length);
+ ((PreparedStatement)stmt).setBinaryStream(parameterIndex, x, length);
}
@Override
public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException {
- ((CallableStatement)stmt).setCharacterStream(parameterIndex, reader, length);
+ ((PreparedStatement)stmt).setCharacterStream(parameterIndex, reader, length);
}
@Override
public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
- ((CallableStatement)stmt).setAsciiStream(parameterIndex, x);
+ ((PreparedStatement)stmt).setAsciiStream(parameterIndex, x);
}
@Override
public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
- ((CallableStatement)stmt).setBinaryStream(parameterIndex, x);
+ ((PreparedStatement)stmt).setBinaryStream(parameterIndex, x);
}
@Override
public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {
- ((CallableStatement)stmt).setCharacterStream(parameterIndex, reader);
+ ((PreparedStatement)stmt).setCharacterStream(parameterIndex, reader);
}
@Override
public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {
- ((CallableStatement)stmt).setNCharacterStream(parameterIndex, value);
+ ((PreparedStatement)stmt).setNCharacterStream(parameterIndex, value);
}
@Override
public void setClob(int parameterIndex, Reader reader) throws SQLException {
- ((CallableStatement)stmt).setClob(parameterIndex, reader);
+ ((PreparedStatement)stmt).setClob(parameterIndex, reader);
}
@Override
public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {
- ((CallableStatement)stmt).setBlob(parameterIndex, inputStream);
+ ((PreparedStatement)stmt).setBlob(parameterIndex, inputStream);
}
@Override
public void setNClob(int parameterIndex, Reader reader) throws SQLException {
- ((CallableStatement)stmt).setNClob(parameterIndex, reader);
+ ((PreparedStatement)stmt).setNClob(parameterIndex, reader);
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
index 85645f3..5ef2bc7 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
+import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
import org.onap.music.mdbc.tables.StagingTable;
@@ -115,12 +115,13 @@ public interface DBInterface {
* Replay a given TxDigest into the local DB
* @param digest
* @throws SQLException if replay cannot occur correctly
+ * @throws MDBCServiceException
*/
- void replayTransaction(StagingTable digest, List<Range> ranges) throws SQLException;
+ void replayTransaction(StagingTable digest, List<Range> ranges) throws SQLException, MDBCServiceException;
void disableForeignKeyChecks() throws SQLException;
void enableForeignKeyChecks() throws SQLException;
- void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException;
+ void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index ffb6c87..c6cc512 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -1300,10 +1300,15 @@ public class MusicMixin implements MusicInterface {
}
protected void changeIsLatestToMRI(MusicRangeInformationRow row, boolean isLatest, LockResult lock) throws MDBCServiceException{
+
+ if(lock == null)
+ return;
PreparedQueryObject appendQuery = createChangeIsLatestToMriQuery(musicRangeInformationTableName, row.getPartitionIndex(),
musicTxDigestTableName, isLatest);
ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, row.getPartitionIndex().toString(),
- appendQuery, lock.getOwnerId(), null);
+ appendQuery,
+ lock.getOwnerId()
+ , null);
if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
logger.error(EELFLoggerDelegate.errorLogger, "Error when executing change isLatest operation with return type: "+returnType.getMessage());
throw new MDBCServiceException("Error when executing change isLatest operation with return type: "+returnType.getMessage());
@@ -2057,8 +2062,11 @@ public class MusicMixin implements MusicInterface {
}
private void releaseLocks(List<MusicRangeInformationRow> changed, Map<UUID,LockResult> newLocks) throws MDBCServiceException{
+
for(MusicRangeInformationRow r : changed) {
LockResult lock = newLocks.get(r.getPartitionIndex());
+ if(lock == null)
+ continue;
unlockKeyInMusic(musicRangeInformationTableName, r.getPartitionIndex().toString(),
lock.getOwnerId());
newLocks.remove(r.getPartitionIndex());
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
index 420f9d4..928ffa1 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
@@ -792,8 +792,9 @@ NEW.field refers to the new value
/**
* Parse the transaction digest into individual events
* @param transaction - base 64 encoded, serialized digest
+ * @throws MDBCServiceException
*/
- public void replayTransaction(StagingTable transaction, List<Range> ranges) throws SQLException {
+ public void replayTransaction(StagingTable transaction, List<Range> ranges) throws SQLException, MDBCServiceException {
boolean autocommit = jdbcConn.getAutoCommit();
jdbcConn.setAutoCommit(false);
Statement jdbcStmt = jdbcConn.createStatement();
@@ -803,7 +804,7 @@ NEW.field refers to the new value
if(Range.overlaps(ranges,op.getTable())) {
try {
replayOperationIntoDB(jdbcStmt, op);
- } catch (SQLException e) {
+ } catch (SQLException | MDBCServiceException e) {
//rollback transaction
logger.error("Unable to replay: " + op.getOperationType() + "->" + op.getVal() + "."
+ "Rolling back the entire digest replay.");
@@ -835,87 +836,148 @@ NEW.field refers to the new value
}
@Override
- public void applyTxDigest(StagingTable txDigest,List<Range> ranges) throws SQLException {
+ public void applyTxDigest(StagingTable txDigest,List<Range> ranges) throws SQLException, MDBCServiceException {
replayTransaction(txDigest,ranges);
}
/**
- * Replays operation into database, usually from txDigest
- * @param jdbcStmt
- * @param r
- * @param op
- * @throws SQLException
- */
- private void replayOperationIntoDB(Statement jdbcStmt, Operation op) throws SQLException {
- logger.info("Replaying Operation: " + op.getOperationType() + "->" + op.getVal());
- JSONObject jsonOp = op.getVal();
-
- ArrayList<String> cols = new ArrayList<String>();
- ArrayList<Object> vals = new ArrayList<Object>();
- Iterator<String> colIterator = jsonOp.keys();
- while(colIterator.hasNext()) {
- String col = colIterator.next();
- //FIXME: should not explicitly refer to cassandramixin
- if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) {
- //reserved name
- continue;
- }
- cols.add(col);
- vals.add(jsonOp.get(col));
- }
-
- //build the queries
- StringBuilder sql = new StringBuilder();
- String sep = "";
- switch (op.getOperationType()) {
- case INSERT:
- sql.append(op.getOperationType() + " INTO ");
- sql.append(op.getTable() + " (") ;
- sep = "";
- for (String col: cols) {
- sql.append(sep + col);
- sep = ", ";
- }
- sql.append(") VALUES (");
- sep = "";
- for (Object val: vals) {
- sql.append(sep + "\"" + val + "\"");
- sep = ", ";
- }
- sql.append(");");
- break;
- case UPDATE:
- sql.append(op.getOperationType() + " ");
- sql.append(op.getTable() + " SET ");
- sep="";
- for (int i=0; i<cols.size(); i++) {
- sql.append(sep + cols.get(i) + "=\"" + vals.get(i) +"\"");
- sep = ", ";
- }
- sql.append(" WHERE ");
- try {
- sql.append(getPrimaryKeyConditional(op.getKey()));
- } catch (MDBCServiceException e) {
- throw new SQLException("Update operatoin doesn't contain the required primary key",e);
- }
- sql.append(";");
- break;
- case DELETE:
- sql.append(op.getOperationType() + " FROM ");
- sql.append(op.getTable() + " WHERE ");
- sql.append(getPrimaryKeyConditional(op.getVal()));
- sql.append(";");
- break;
- case SELECT:
- //no update happened, do nothing
- return;
- default:
- logger.error(op.getOperationType() + "not implemented for replay");
- }
- logger.info("Replaying operation: " + sql.toString());
-
- jdbcStmt.executeQuery(sql.toString());
- }
+ * Replays operation into database, usually from txDigest
+ * @param jdbcStmt
+ * @param r
+ * @param op
+ * @throws SQLException
+ * @throws MDBCServiceException
+ */
+ private void replayOperationIntoDB(Statement jdbcStmt, Operation op) throws SQLException, MDBCServiceException {
+ logger.info("Replaying Operation: " + op.getOperationType() + "->" + op.getVal());
+ JSONObject jsonOp = op.getVal();
+
+ ArrayList<String> cols = new ArrayList<String>();
+ ArrayList<Object> vals = new ArrayList<Object>();
+ Iterator<String> colIterator = jsonOp.keys();
+ while(colIterator.hasNext()) {
+ String col = colIterator.next();
+ //FIXME: should not explicitly refer to cassandramixin
+ if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) {
+ //reserved name
+ continue;
+ }
+ cols.add(col);
+ vals.add(jsonOp.get(col));
+ }
+
+ //build and replay the queries
+ StringBuilder sql = constructSQL(op, cols, vals);
+ if(sql == null)
+ return;
+
+ try {
+ logger.info("Replaying operation: " + sql.toString());
+ int updated = jdbcStmt.executeUpdate(sql.toString());
+
+ if(updated == 0) {
+ // This applies only for replaying transactions involving Eventually Consistent tables
+ logger.warn("Error Replaying operation: " + sql.toString() + "; Replacing insert/replace/viceversa and replaying ");
+
+ buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals);
+ }
+ } catch (SQLException sqlE) {
+ // This applies only for replaying transactions involving Eventually Consistent tables
+ logger.warn("Error Replaying operation: " + sql.toString() + "; Replacing insert/replace/viceversa and replaying ");
+
+ buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals);
+
+ }
+ }
+ protected void buildAndExecuteSQLInverse(Statement jdbcStmt, Operation op,
+ ArrayList<String> cols, ArrayList<Object> vals) throws SQLException, MDBCServiceException {
+ StringBuilder sqlInverse = constructSQLInverse( op, cols, vals);
+ if(sqlInverse == null)
+ return;
+ logger.info("Replaying operation: " + sqlInverse.toString());
+ jdbcStmt.executeUpdate(sqlInverse.toString());
+ }
+ protected StringBuilder constructSQLInverse(Operation op, ArrayList<String> cols,
+ ArrayList<Object> vals) throws MDBCServiceException {
+ StringBuilder sqlInverse = null;
+ switch (op.getOperationType()) {
+ case INSERT:
+ sqlInverse = constructUpdate(op.getTable() , OperationType.UPDATE, op.getKey(), cols, vals);
+ break;
+ case UPDATE:
+ sqlInverse = constructInsert(op.getTable() , OperationType.INSERT, cols, vals);
+ break;
+ default:
+ break;
+ }
+ return sqlInverse;
+ }
+ protected StringBuilder constructSQL(Operation op, ArrayList<String> cols,
+ ArrayList<Object> vals) throws MDBCServiceException {
+ StringBuilder sql = null;
+ switch (op.getOperationType()) {
+ case INSERT:
+ sql = constructInsert(op.getTable(), op.getOperationType(), cols, vals);
+ break;
+ case UPDATE:
+ sql = constructUpdate(op.getTable(), op.getOperationType(), op.getKey(), cols, vals);
+ break;
+ case DELETE:
+ sql = constructDelete(op.getTable(), op.getOperationType(), op.getKey());
+ break;
+ case SELECT:
+ //no update happened, do nothing
+ break;
+ default:
+ logger.error(op.getOperationType() + "not implemented for replay");
+ }
+ return sql;
+ }
+ private StringBuilder constructDelete(String r, OperationType op, JSONObject key) {
+ StringBuilder sql = new StringBuilder();
+ sql.append(op + " FROM ");
+ sql.append(r + " WHERE ");
+ sql.append(getPrimaryKeyConditional(key));
+ sql.append(";");
+ return sql;
+ }
+ private StringBuilder constructInsert(String r, OperationType op, ArrayList<String> cols,
+ ArrayList<Object> vals) {
+ StringBuilder sql = new StringBuilder();
+ String sep;
+ sql.append(op + " INTO ");
+ sql.append(r + " (") ;
+ sep = "";
+ for (String col: cols) {
+ sql.append(sep + col);
+ sep = ", ";
+ }
+ sql.append(") VALUES (");
+ sep = "";
+ for (Object val: vals) {
+ sql.append(sep + "\"" + val + "\"");
+ sep = ", ";
+ }
+ sql.append(");");
+ return sql;
+ }
+ private StringBuilder constructUpdate(String r, OperationType op, JSONObject key, ArrayList<String> cols,
+ ArrayList<Object> vals) {
+ StringBuilder sql = new StringBuilder();
+ String sep;
+ sql.append(op + " ");
+ sql.append(r + " SET ");
+ sep="";
+ for (int i=0; i<cols.size(); i++) {
+ sql.append(sep + cols.get(i) + "=\"" + vals.get(i) +"\"");
+ sep = ", ";
+ }
+ sql.append(" WHERE ");
+ sql.append(getPrimaryKeyConditional(key));
+ sql.append(";");
+
+ return sql;
+ }
/**
* Create an SQL string for AND'ing all of the primary keys