aboutsummaryrefslogtreecommitdiffstats
path: root/mdbc-server/src/main/java
diff options
context:
space:
mode:
authorTschaen, Brendan <ctschaen@att.com>2018-11-20 18:18:18 -0500
committerTschaen, Brendan <ctschaen@att.com>2018-11-20 18:28:12 -0500
commit73c29e3fd2cd218906744987e6ae683d77b092b9 (patch)
tree6597c30895c1adcb8ad20bf327d078eaf8597c6c /mdbc-server/src/main/java
parent1359f1023594c201b91fff73b2baa3f5d5cf6fd6 (diff)
Replay operations into SQL DB
Change-Id: Id90c311b701e27aebd53afbde9cab851fa17ce60 Issue-ID: MUSIC-166 Signed-off-by: Tschaen, Brendan <ctschaen@att.com>
Diffstat (limited to 'mdbc-server/src/main/java')
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java122
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java5
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java10
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java154
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java132
5 files changed, 250 insertions, 173 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
index cca14c0..cac4139 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
@@ -67,7 +67,7 @@ public class MdbcConnection implements Connection {
private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MdbcConnection.class);
private final String id; // This is the transaction id, assigned to this connection. There is no need to change the id, if connection is reused
- private final Connection conn; // the JDBC Connection to the actual underlying database
+ private final Connection jdbcConn; // the JDBC Connection to the actual underlying database
private final MusicInterface mi;
private final TxCommitProgress progressKeeper;
private final DatabasePartition partition;
@@ -83,10 +83,10 @@ public class MdbcConnection implements Connection {
if (c == null) {
throw new MDBCServiceException("Connection is null");
}
- this.conn = c;
+ this.jdbcConn = c;
info.putAll(MDBCUtils.getMdbcProperties());
String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT);
- this.dbi = MixinFactory.createDBInterface(mixinDb, mi, url, conn, info);
+ this.dbi = MixinFactory.createDBInterface(mixinDb, mi, url, jdbcConn, info);
this.mi = mi;
try {
this.setAutoCommit(c.getAutoCommit());
@@ -112,39 +112,39 @@ public class MdbcConnection implements Connection {
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
logger.error(EELFLoggerDelegate.errorLogger, "proxyconn unwrap: " + iface.getName());
- return conn.unwrap(iface);
+ return jdbcConn.unwrap(iface);
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
logger.error(EELFLoggerDelegate.errorLogger, "proxystatement iswrapperfor: " + iface.getName());
- return conn.isWrapperFor(iface);
+ return jdbcConn.isWrapperFor(iface);
}
@Override
public Statement createStatement() throws SQLException {
- return new MdbcCallableStatement(conn.createStatement(), this);
+ return new MdbcCallableStatement(jdbcConn.createStatement(), this);
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
//TODO: grab the sql call from here and all the other preparestatement calls
- return new MdbcPreparedStatement(conn.prepareStatement(sql), sql, this);
+ return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql), sql, this);
}
@Override
public CallableStatement prepareCall(String sql) throws SQLException {
- return new MdbcCallableStatement(conn.prepareCall(sql), this);
+ return new MdbcCallableStatement(jdbcConn.prepareCall(sql), this);
}
@Override
public String nativeSQL(String sql) throws SQLException {
- return conn.nativeSQL(sql);
+ return jdbcConn.nativeSQL(sql);
}
@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
- boolean b = conn.getAutoCommit();
+ boolean b = jdbcConn.getAutoCommit();
if (b != autoCommit) {
if(progressKeeper!=null) progressKeeper.commitRequested(id);
logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b);
@@ -165,7 +165,7 @@ public class MdbcConnection implements Connection {
if(progressKeeper!=null) {
progressKeeper.setMusicDone(id);
}
- conn.setAutoCommit(autoCommit);
+ jdbcConn.setAutoCommit(autoCommit);
if(progressKeeper!=null) {
progressKeeper.setSQLDone(id);
}
@@ -177,7 +177,7 @@ public class MdbcConnection implements Connection {
@Override
public boolean getAutoCommit() throws SQLException {
- return conn.getAutoCommit();
+ return jdbcConn.getAutoCommit();
}
/**
@@ -208,7 +208,7 @@ public class MdbcConnection implements Connection {
progressKeeper.setMusicDone(id);
}
- conn.commit();
+ jdbcConn.commit();
if(progressKeeper != null) {
progressKeeper.setSQLDone(id);
@@ -227,7 +227,7 @@ public class MdbcConnection implements Connection {
public void rollback() throws SQLException {
logger.debug(EELFLoggerDelegate.applicationLogger, "Rollback");;
transactionDigest.clear();
- conn.rollback();
+ jdbcConn.rollback();
progressKeeper.reinitializeTxProgress(id);
}
@@ -240,230 +240,230 @@ public class MdbcConnection implements Connection {
if (dbi != null) {
dbi.close();
}
- if (conn != null && !conn.isClosed()) {
+ if (jdbcConn != null && !jdbcConn.isClosed()) {
logger.debug("Closing jdbc from mdbc with id:"+id);
- conn.close();
+ jdbcConn.close();
logger.debug("Connection was closed for id:" + id);
}
}
@Override
public boolean isClosed() throws SQLException {
- return conn.isClosed();
+ return jdbcConn.isClosed();
}
@Override
public DatabaseMetaData getMetaData() throws SQLException {
- return conn.getMetaData();
+ return jdbcConn.getMetaData();
}
@Override
public void setReadOnly(boolean readOnly) throws SQLException {
- conn.setReadOnly(readOnly);
+ jdbcConn.setReadOnly(readOnly);
}
@Override
public boolean isReadOnly() throws SQLException {
- return conn.isReadOnly();
+ return jdbcConn.isReadOnly();
}
@Override
public void setCatalog(String catalog) throws SQLException {
- conn.setCatalog(catalog);
+ jdbcConn.setCatalog(catalog);
}
@Override
public String getCatalog() throws SQLException {
- return conn.getCatalog();
+ return jdbcConn.getCatalog();
}
@Override
public void setTransactionIsolation(int level) throws SQLException {
- conn.setTransactionIsolation(level);
+ jdbcConn.setTransactionIsolation(level);
}
@Override
public int getTransactionIsolation() throws SQLException {
- return conn.getTransactionIsolation();
+ return jdbcConn.getTransactionIsolation();
}
@Override
public SQLWarning getWarnings() throws SQLException {
- return conn.getWarnings();
+ return jdbcConn.getWarnings();
}
@Override
public void clearWarnings() throws SQLException {
- conn.clearWarnings();
+ jdbcConn.clearWarnings();
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
- return new MdbcCallableStatement(conn.createStatement(resultSetType, resultSetConcurrency), this);
+ return new MdbcCallableStatement(jdbcConn.createStatement(resultSetType, resultSetConcurrency), this);
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
throws SQLException {
- return new MdbcCallableStatement(conn.prepareStatement(sql, resultSetType, resultSetConcurrency), sql, this);
+ return new MdbcCallableStatement(jdbcConn.prepareStatement(sql, resultSetType, resultSetConcurrency), sql, this);
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
- return new MdbcCallableStatement(conn.prepareCall(sql, resultSetType, resultSetConcurrency), this);
+ return new MdbcCallableStatement(jdbcConn.prepareCall(sql, resultSetType, resultSetConcurrency), this);
}
@Override
public Map<String, Class<?>> getTypeMap() throws SQLException {
- return conn.getTypeMap();
+ return jdbcConn.getTypeMap();
}
@Override
public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
- conn.setTypeMap(map);
+ jdbcConn.setTypeMap(map);
}
@Override
public void setHoldability(int holdability) throws SQLException {
- conn.setHoldability(holdability);
+ jdbcConn.setHoldability(holdability);
}
@Override
public int getHoldability() throws SQLException {
- return conn.getHoldability();
+ return jdbcConn.getHoldability();
}
@Override
public Savepoint setSavepoint() throws SQLException {
- return conn.setSavepoint();
+ return jdbcConn.setSavepoint();
}
@Override
public Savepoint setSavepoint(String name) throws SQLException {
- return conn.setSavepoint(name);
+ return jdbcConn.setSavepoint(name);
}
@Override
public void rollback(Savepoint savepoint) throws SQLException {
- conn.rollback(savepoint);
+ jdbcConn.rollback(savepoint);
}
@Override
public void releaseSavepoint(Savepoint savepoint) throws SQLException {
- conn.releaseSavepoint(savepoint);
+ jdbcConn.releaseSavepoint(savepoint);
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
throws SQLException {
- return new MdbcCallableStatement(conn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability), this);
+ return new MdbcCallableStatement(jdbcConn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability), this);
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
int resultSetHoldability) throws SQLException {
- return new MdbcCallableStatement(conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql, this);
+ return new MdbcCallableStatement(jdbcConn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql, this);
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
int resultSetHoldability) throws SQLException {
- return new MdbcCallableStatement(conn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability), this);
+ return new MdbcCallableStatement(jdbcConn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability), this);
}
@Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
- return new MdbcPreparedStatement(conn.prepareStatement(sql, autoGeneratedKeys), sql, this);
+ return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql, autoGeneratedKeys), sql, this);
}
@Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
- return new MdbcPreparedStatement(conn.prepareStatement(sql, columnIndexes), sql, this);
+ return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql, columnIndexes), sql, this);
}
@Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
- return new MdbcPreparedStatement(conn.prepareStatement(sql, columnNames), sql, this);
+ return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql, columnNames), sql, this);
}
@Override
public Clob createClob() throws SQLException {
- return conn.createClob();
+ return jdbcConn.createClob();
}
@Override
public Blob createBlob() throws SQLException {
- return conn.createBlob();
+ return jdbcConn.createBlob();
}
@Override
public NClob createNClob() throws SQLException {
- return conn.createNClob();
+ return jdbcConn.createNClob();
}
@Override
public SQLXML createSQLXML() throws SQLException {
- return conn.createSQLXML();
+ return jdbcConn.createSQLXML();
}
@Override
public boolean isValid(int timeout) throws SQLException {
- return conn.isValid(timeout);
+ return jdbcConn.isValid(timeout);
}
@Override
public void setClientInfo(String name, String value) throws SQLClientInfoException {
- conn.setClientInfo(name, value);
+ jdbcConn.setClientInfo(name, value);
}
@Override
public void setClientInfo(Properties properties) throws SQLClientInfoException {
- conn.setClientInfo(properties);
+ jdbcConn.setClientInfo(properties);
}
@Override
public String getClientInfo(String name) throws SQLException {
- return conn.getClientInfo(name);
+ return jdbcConn.getClientInfo(name);
}
@Override
public Properties getClientInfo() throws SQLException {
- return conn.getClientInfo();
+ return jdbcConn.getClientInfo();
}
@Override
public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
- return conn.createArrayOf(typeName, elements);
+ return jdbcConn.createArrayOf(typeName, elements);
}
@Override
public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
- return conn.createStruct(typeName, attributes);
+ return jdbcConn.createStruct(typeName, attributes);
}
@Override
public void setSchema(String schema) throws SQLException {
- conn.setSchema(schema);
+ jdbcConn.setSchema(schema);
}
@Override
public String getSchema() throws SQLException {
- return conn.getSchema();
+ return jdbcConn.getSchema();
}
@Override
public void abort(Executor executor) throws SQLException {
- conn.abort(executor);
+ jdbcConn.abort(executor);
}
@Override
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
- conn.setNetworkTimeout(executor, milliseconds);
+ jdbcConn.setNetworkTimeout(executor, milliseconds);
}
@Override
public int getNetworkTimeout() throws SQLException {
- return conn.getNetworkTimeout();
+ return jdbcConn.getNetworkTimeout();
}
@@ -517,4 +517,8 @@ public class MdbcConnection implements Connection {
}
}
}
+
+ public DBInterface getDBInterface() {
+ return this.dbi;
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index 1f2ad91..22ddee1 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -165,6 +165,11 @@ public class StateManager {
}
}
+ /**
+ * Opens a connection into database, setting up all necessary triggers, etc
+ * @param id UUID of a connection
+ * @param information
+ */
public void openConnection(String id, Properties information){
if(!mdbcConnections.containsKey(id)){
Connection sqlConnection;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
index 358287f..73622b1 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
@@ -20,12 +20,15 @@
package org.onap.music.mdbc.mixins;
import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
+import org.onap.music.mdbc.tables.Operation;
import org.onap.music.mdbc.tables.StagingTable;
/**
@@ -108,4 +111,11 @@ public interface DBInterface {
List<String> getReservedTblNames();
String getPrimaryKey(String sql, String tableName);
+
+ /**
+ * Replay a given TxDigest into the local DB
+ * @param digest
+ * @throws SQLException if replay cannot occur correctly
+ */
+ public void replayTransaction(HashMap<Range,StagingTable> digest) throws SQLException;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
index 383d522..15384ad 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
@@ -27,6 +27,7 @@ import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -39,6 +40,7 @@ import org.json.JSONTokener;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
+import org.onap.music.mdbc.tables.Operation;
import org.onap.music.mdbc.tables.OperationType;
import org.onap.music.mdbc.tables.StagingTable;
@@ -73,7 +75,7 @@ public class MySQLMixin implements DBInterface {
private final MusicInterface mi;
private final int connId;
private final String dbName;
- private final Connection dbConnection;
+ private final Connection jdbcConn;
private final Map<String, TableInfo> tables;
private boolean server_tbl_created = false;
@@ -81,14 +83,14 @@ public class MySQLMixin implements DBInterface {
this.mi = null;
this.connId = 0;
this.dbName = null;
- this.dbConnection = null;
+ this.jdbcConn = null;
this.tables = null;
}
public MySQLMixin(MusicInterface mi, String url, Connection conn, Properties info) {
this.mi = mi;
this.connId = generateConnID(conn);
this.dbName = getDBName(conn);
- this.dbConnection = conn;
+ this.jdbcConn = conn;
this.tables = new HashMap<String, TableInfo>();
}
// This is used to generate a unique connId for this connection to the DB.
@@ -154,7 +156,7 @@ public class MySQLMixin implements DBInterface {
Set<String> set = new TreeSet<String>();
String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
try {
- Statement stmt = dbConnection.createStatement();
+ Statement stmt = jdbcConn.createStatement();
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
String s = rs.getString("TABLE_NAME");
@@ -267,7 +269,7 @@ mysql> describe tables;
try {
if (!server_tbl_created) {
try {
- Statement stmt = dbConnection.createStatement();
+ Statement stmt = jdbcConn.createStatement();
stmt.execute(CREATE_TBL_SQL);
stmt.close();
logger.info(EELFLoggerDelegate.applicationLogger,"createSQLTriggers: Server side dirty table created.");
@@ -473,7 +475,7 @@ NEW.field refers to the new value
logger.debug("Executing SQL read:"+ sql);
ResultSet rs = null;
try {
- Statement stmt = dbConnection.createStatement();
+ Statement stmt = jdbcConn.createStatement();
rs = stmt.executeQuery(sql);
} catch (SQLException e) {
logger.error(EELFLoggerDelegate.errorLogger,"executeSQLRead"+e);
@@ -489,7 +491,7 @@ NEW.field refers to the new value
protected void executeSQLWrite(String sql) throws SQLException {
logger.debug(EELFLoggerDelegate.applicationLogger, "Executing SQL write:"+ sql);
- Statement stmt = dbConnection.createStatement();
+ Statement stmt = jdbcConn.createStatement();
stmt.execute(sql);
stmt.close();
}
@@ -610,7 +612,7 @@ NEW.field refers to the new value
rs.getStatement().close();
if (rows.size() > 0) {
sql2 = "DELETE FROM "+TRANS_TBL+" WHERE IX = ?";
- PreparedStatement ps = dbConnection.prepareStatement(sql2);
+ PreparedStatement ps = jdbcConn.prepareStatement(sql2);
logger.debug("Executing: "+sql2);
logger.debug(" For ix = "+rows);
for (int ix : rows) {
@@ -801,4 +803,140 @@ NEW.field refers to the new value
}
}
}
+
+ /**
+ * Parse the transaction digest into individual events
+ * @param transaction - base 64 encoded, serialized digest
+ * @param dbi
+ */
+ public void replayTransaction(HashMap<Range,StagingTable> transaction) throws SQLException {
+ boolean autocommit = jdbcConn.getAutoCommit();
+ jdbcConn.setAutoCommit(false);
+ Statement jdbcStmt = jdbcConn.createStatement();
+ for (Map.Entry<Range,StagingTable> entry: transaction.entrySet()) {
+ Range r = entry.getKey();
+ StagingTable st = entry.getValue();
+ ArrayList<Operation> opList = st.getOperationList();
+
+ for (Operation op: opList) {
+ try {
+ replayOperationIntoDB(jdbcStmt, r, op);
+ } catch (SQLException e) {
+ //rollback transaction
+ logger.error("Unable to replay: " + op.getOperationType() + "->" + op.getNewVal() + "."
+ + "Rolling back the entire digest replay.");
+ jdbcConn.rollback();
+ throw new SQLException(e);
+ }
+ }
+ }
+
+ clearReplayedOperations(jdbcStmt);
+ jdbcConn.commit();
+ jdbcStmt.close();
+
+ jdbcConn.setAutoCommit(autocommit);
+ }
+
+ /**
+ * Replays operation into database, usually from txDigest
+ * @param stmt
+ * @param r
+ * @param op
+ * @throws SQLException
+ */
+ private void replayOperationIntoDB(Statement jdbcStmt, Range r, Operation op) throws SQLException {
+ logger.info("Replaying Operation: " + op.getOperationType() + "->" + op.getNewVal());
+ JSONObject jsonOp = op.getNewVal();
+ JSONObject key = op.getKey();
+
+ ArrayList<String> cols = new ArrayList<String>();
+ ArrayList<Object> vals = new ArrayList<Object>();
+ Iterator<String> colIterator = jsonOp.keys();
+ while(colIterator.hasNext()) {
+ String col = colIterator.next();
+ //FIXME: should not explicitly refer to cassandramixin
+ if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) {
+ //reserved name
+ continue;
+ }
+ cols.add(col);
+ vals.add(jsonOp.get(col));
+ }
+
+ //build the queries
+ StringBuilder sql = new StringBuilder();
+ String sep = "";
+ switch (op.getOperationType()) {
+ case INSERT:
+ sql.append(op.getOperationType() + " INTO ");
+ sql.append(r.table + " (") ;
+ sep = "";
+ for (String col: cols) {
+ sql.append(sep + col);
+ sep = ", ";
+ }
+ sql.append(") VALUES (");
+ sep = "";
+ for (Object val: vals) {
+ sql.append(sep + "\"" + val + "\"");
+ sep = ", ";
+ }
+ sql.append(");");
+ break;
+ case UPDATE:
+ sql.append(op.getOperationType() + " ");
+ sql.append(r.table + " SET ");
+ sep="";
+ for (int i=0; i<cols.size(); i++) {
+ sql.append(sep + cols.get(i) + "=\"" + vals.get(i) +"\"");
+ sep = ", ";
+ }
+ sql.append(" WHERE ");
+ sql.append(getPrimaryKeyConditional(op.getKey()));
+ sql.append(";");
+ break;
+ case DELETE:
+ sql.append(op.getOperationType() + " FROM ");
+ sql.append(r.table + " WHERE ");
+ sql.append(getPrimaryKeyConditional(op.getKey()));
+ sql.append(";");
+ break;
+ case SELECT:
+ //no update happened, do nothing
+ return;
+ default:
+ logger.error(op.getOperationType() + "not implemented for replay");
+ }
+ logger.info("Replaying operation: " + sql.toString());
+
+ jdbcStmt.executeQuery(sql.toString());
+ }
+
+ /**
+ * Create an SQL string for AND'ing all of the primary keys
+ * @param primaryKeys Json of primary keys and their values
+ * @return string in the form of PK1=Val1 AND PK2=Val2 AND PK3=Val3
+ */
+ private String getPrimaryKeyConditional(JSONObject primaryKeys) {
+ StringBuilder keyCondStmt = new StringBuilder();
+ String and = "";
+ for (String key: primaryKeys.keySet()) {
+ Object val = primaryKeys.get(key);
+ keyCondStmt.append(and + key + "=\"" + val + "\"");
+ and = " AND ";
+ }
+ return keyCondStmt.toString();
+ }
+
+ /**
+ * Cleans out the transaction table, removing the replayed operations
+ * @param jdbcStmt
+ * @throws SQLException
+ */
+ private void clearReplayedOperations(Statement jdbcStmt) throws SQLException {
+ logger.info("Clearing replayed operations");
+ String sql = "DELETE FROM " + TRANS_TBL + " WHERE CONNECTION_ID = " + this.connId;
+ jdbcStmt.executeQuery(sql);
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
index cb5d28e..210cb9e 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
@@ -20,12 +20,14 @@
package org.onap.music.mdbc.tables;
import java.io.IOException;
+import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -36,11 +38,13 @@ import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.MDBCUtils;
+import org.onap.music.mdbc.MdbcConnection;
import org.onap.music.mdbc.MdbcServerLogic;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.StateManager;
import org.onap.music.mdbc.configurations.NodeConfiguration;
import org.onap.music.mdbc.mixins.MusicMixin;
+import org.onap.music.mdbc.mixins.DBInterface;
import org.onap.music.mdbc.mixins.MusicInterface;
import com.datastax.driver.core.Row;
@@ -56,106 +60,6 @@ public class MusicTxDigest {
this.stateManager = stateManager;
}
- /**
- * Parse the transaction digest into individual events
- * @param digest - base 64 encoded, serialized digest
- */
- public void replayTxDigest(HashMap<Range,StagingTable> digest) {
- for (Map.Entry<Range,StagingTable> entry: digest.entrySet()) {
- Range r = entry.getKey();
- StagingTable st = entry.getValue();
- ArrayList<Operation> opList = st.getOperationList();
-
- for (Operation op: opList) {
- replayOperation(r, op);
- }
- }
- }
-
- /**
- * Replays operation into local database
- * @param r
- * @param op
- */
- private void replayOperation(Range r, Operation op) {
- logger.info("Operation: " + op.getOperationType() + "->" + op.getNewVal());
- JSONObject jsonOp = op.getNewVal();
- JSONObject key = op.getKey();
-
- ArrayList<String> cols = new ArrayList<String>();
- ArrayList<Object> vals = new ArrayList<Object>();
- Iterator<String> colIterator = jsonOp.keys();
- while(colIterator.hasNext()) {
- String col = colIterator.next();
- //FIXME: should not explicitly refer to cassandramixin
- if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) {
- //reserved name
- continue;
- }
- cols.add(col);
- vals.add(jsonOp.get(col));
- }
-
- //build the queries
- StringBuilder sql = new StringBuilder();
- String sep = "";
- switch (op.getOperationType()) {
- case INSERT:
- sql.append(op.getOperationType() + " INTO ");
- sql.append(r.table + " (") ;
- sep = "";
- for (String col: cols) {
- sql.append(sep + col);
- sep = ", ";
- }
- sql.append(") VALUES (");
- sep = "";
- for (Object val: vals) {
- sql.append(sep + "\"" + val + "\"");
- sep = ", ";
- }
- sql.append(");");
- logger.info(sql.toString());
- break;
- case UPDATE:
- sql.append(op.getOperationType() + " ");
- sql.append(r.table + " SET ");
- sep="";
- for (int i=0; i<cols.size(); i++) {
- sql.append(sep + cols.get(i) + "=\"" + vals.get(i) +"\"");
- sep = ", ";
- }
- sql.append(" WHERE ");
- sql.append(getPrimaryKeyConditional(op.getKey()));
- sql.append(";");
- logger.info(sql.toString());
- break;
- case DELETE:
- sql.append(op.getOperationType() + " FROM ");
- sql.append(r.table + " WHERE ");
- sql.append(getPrimaryKeyConditional(op.getKey()));
- sql.append(";");
- logger.info(sql.toString());
- break;
- case SELECT:
- //no update happened, do nothing
- break;
- default:
- logger.error(op.getOperationType() + "not implemented for replay");
- }
- }
-
- private String getPrimaryKeyConditional(JSONObject primaryKeys) {
- StringBuilder keyCondStmt = new StringBuilder();
- String and = "";
- for (String key: primaryKeys.keySet()) {
- Object val = primaryKeys.get(key);
- keyCondStmt.append(and + key + "=\"" + val + "\"");
- and = " AND ";
- }
- return keyCondStmt.toString();
- }
-
/**
* Runs the body of the background daemon
* @param daemonSleepTimeS time, in seconds, between updates
@@ -163,6 +67,9 @@ public class MusicTxDigest {
*/
public void backgroundDaemon(int daemonSleepTimeS) throws InterruptedException {
MusicInterface mi = stateManager.getMusicInterface();
+ stateManager.openConnection("daemon", new Properties());
+ DBInterface dbi = ((MdbcConnection) stateManager.getConnection("daemon")).getDBInterface();
+
while (true) {
//update
logger.info(String.format("[%s] Background MusicTxDigest daemon updating local db",
@@ -175,7 +82,7 @@ public class MusicTxDigest {
for (UUID partition: partitions) {
if (!partition.equals(myPartition.getMusicRangeInformationIndex())){
try {
- replayDigestForPartition(mi, partition);
+ replayDigestForPartition(mi, partition, dbi);
} catch (MDBCServiceException e) {
logger.error("Unable to update for partition : " + partition + ". " + e.getMessage());
continue;
@@ -186,11 +93,24 @@ public class MusicTxDigest {
}
}
- public void replayDigestForPartition(MusicInterface mi, UUID partitionId) throws MDBCServiceException {
- List<MusicTxDigestId> redoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog();
- for (MusicTxDigestId txId: redoLogTxIds) {
- HashMap<Range, StagingTable> digest = mi.getTxDigest(txId);
- replayTxDigest(digest);
+ /**
+ * Replay the digest for a given partition
+ * @param mi
+ * @param partitionId
+ * @param dbi
+ * @throws MDBCServiceException
+ */
+ public void replayDigestForPartition(MusicInterface mi, UUID partitionId, DBInterface dbi) throws MDBCServiceException {
+ List<MusicTxDigestId> partitionsRedoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog();
+ for (MusicTxDigestId txId: partitionsRedoLogTxIds) {
+ HashMap<Range, StagingTable> transaction = mi.getTxDigest(txId);
+ try {
+ dbi.replayTransaction(transaction);
+ } catch (SQLException e) {
+ logger.error("Rolling back the entire digest replay. " + partitionId);
+ return;
+ }
+ logger.info("Successfully replayed transaction " + txId);
}
//todo, keep track of where I am in pointer
}