aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTschaen, Brendan <ctschaen@att.com>2018-11-20 15:29:12 -0500
committerTschaen, Brendan <ctschaen@att.com>2018-11-20 15:43:30 -0500
commit1359f1023594c201b91fff73b2baa3f5d5cf6fd6 (patch)
tree30c3ac2940f82414a76e59bc8ed916ab9599b5f2
parent83db3eb4ccf7e636f586a6873a966e14ba1685ae (diff)
MusicSqlManager Removal, othe code refactors
MusicSqlManager not required in current setup Current code condensed and absorbed into MdbcConnection and MusicMixin Modify deprecated names "CassandraMixin" to "MusicMixin" Change-Id: If68f8937a385c19d901c56e5795ddc0acf45d399 Issue-ID: MUSIC-198 Signed-off-by: Tschaen, Brendan <ctschaen@att.com>
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java2
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcCallableStatement.java8
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java162
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java56
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java10
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java50
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java329
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java46
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java9
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java2
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java (renamed from mdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java)3
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java1502
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java8
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java (renamed from mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java)10
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java3
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java1493
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java21
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java6
-rwxr-xr-xmdbc-server/src/main/resources/mdbc.properties6
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java8
-rwxr-xr-xmdbc-server/src/test/java/org/onap/music/mdbc/test/TestCommon.java8
21 files changed, 1636 insertions, 2106 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
index 2442af1..2723490 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
@@ -34,7 +34,7 @@ import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.logging.format.AppMessages;
import org.onap.music.logging.format.ErrorSeverity;
import org.onap.music.logging.format.ErrorTypes;
-import org.onap.music.mdbc.mixins.CassandraMixin;
+import org.onap.music.mdbc.mixins.MusicMixin;
import org.onap.music.mdbc.mixins.Utils;
import org.onap.music.mdbc.tables.Operation;
import org.onap.music.mdbc.tables.StagingTable;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcCallableStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcCallableStatement.java
index b820686..d4d20ad 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcCallableStatement.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcCallableStatement.java
@@ -53,12 +53,12 @@ public class MdbcCallableStatement extends MdbcPreparedStatement implements Call
@SuppressWarnings("unused")
private static final String DATASTAX_PREFIX = "com.datastax.driver";
- public MdbcCallableStatement(Statement stmt, MusicSqlManager m) {
- super(stmt, m);
+ public MdbcCallableStatement(Statement stmt, MdbcConnection mConn) {
+ super(stmt, mConn);
}
- public MdbcCallableStatement(Statement stmt, String sql, MusicSqlManager mgr) {
- super(stmt, sql, mgr);
+ public MdbcCallableStatement(Statement stmt, String sql, MdbcConnection mConn) {
+ super(stmt, sql, mConn);
}
@Override
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
index 352eacf..cca14c0 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
@@ -34,8 +34,12 @@ import java.sql.SQLXML;
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.Executor;
import org.onap.music.exceptions.MDBCServiceException;
@@ -44,7 +48,10 @@ import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.logging.format.AppMessages;
import org.onap.music.logging.format.ErrorSeverity;
import org.onap.music.logging.format.ErrorTypes;
+import org.onap.music.mdbc.mixins.DBInterface;
+import org.onap.music.mdbc.mixins.MixinFactory;
import org.onap.music.mdbc.mixins.MusicInterface;
+import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.TxCommitProgress;
@@ -61,25 +68,28 @@ public class MdbcConnection implements Connection {
private final String id; // This is the transaction id, assigned to this connection. There is no need to change the id, if connection is reused
private final Connection conn; // the JDBC Connection to the actual underlying database
- private final MusicSqlManager mgr; // there should be one MusicSqlManager in use per Connection
+ private final MusicInterface mi;
private final TxCommitProgress progressKeeper;
private final DatabasePartition partition;
+ private final DBInterface dbi;
+ private final HashMap<Range,StagingTable> transactionDigest;
+ private final Set<String> table_set;
public MdbcConnection(String id, String url, Connection c, Properties info, MusicInterface mi, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException {
this.id = id;
+ this.table_set = Collections.synchronizedSet(new HashSet<String>());
+ this.transactionDigest = new HashMap<Range,StagingTable>();
+
if (c == null) {
throw new MDBCServiceException("Connection is null");
}
this.conn = c;
+ info.putAll(MDBCUtils.getMdbcProperties());
+ String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT);
+ this.dbi = MixinFactory.createDBInterface(mixinDb, mi, url, conn, info);
+ this.mi = mi;
try {
- this.mgr = new MusicSqlManager(url, c, info, mi);
- } catch (MDBCServiceException e) {
- logger.error("Failure in creating Music SQL Manager");
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
- throw e;
- }
- try {
- this.mgr.setAutoCommit(c.getAutoCommit(),null,null,null);
+ this.setAutoCommit(c.getAutoCommit());
} catch (SQLException e) {
logger.error("Failure in autocommit");
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
@@ -87,19 +97,15 @@ public class MdbcConnection implements Connection {
// Verify the tables in MUSIC match the tables in the database
// and create triggers on any tables that need them
- //mgr.synchronizeTableData();
- if ( mgr != null ) try {
- mgr.synchronizeTables();
+ try {
+ this.synchronizeTables();
} catch (QueryException e) {
logger.error("Error syncrhonizing tables");
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
}
- else {
- logger.error(EELFLoggerDelegate.errorLogger, "MusicSqlManager was not correctly created", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
- throw new MDBCServiceException("Music SQL Manager object is null or invalid");
- }
this.progressKeeper = progressKeeper;
this.partition = partition;
+
logger.debug("Mdbc connection created with id: "+id);
}
@@ -117,18 +123,18 @@ public class MdbcConnection implements Connection {
@Override
public Statement createStatement() throws SQLException {
- return new MdbcCallableStatement(conn.createStatement(), mgr);
+ return new MdbcCallableStatement(conn.createStatement(), this);
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
//TODO: grab the sql call from here and all the other preparestatement calls
- return new MdbcPreparedStatement(conn.prepareStatement(sql), sql, mgr);
+ return new MdbcPreparedStatement(conn.prepareStatement(sql), sql, this);
}
@Override
public CallableStatement prepareCall(String sql) throws SQLException {
- return new MdbcCallableStatement(conn.prepareCall(sql), mgr);
+ return new MdbcCallableStatement(conn.prepareCall(sql), this);
}
@Override
@@ -141,13 +147,23 @@ public class MdbcConnection implements Connection {
boolean b = conn.getAutoCommit();
if (b != autoCommit) {
if(progressKeeper!=null) progressKeeper.commitRequested(id);
- try {
- mgr.setAutoCommit(autoCommit,id,progressKeeper,partition);
- if(progressKeeper!=null)
- progressKeeper.setMusicDone(id);
- } catch (MDBCServiceException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
- throw new SQLException("Failure commiting to MUSIC");
+ logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b);
+ if (b) {
+ // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction
+ if(id == null || id.isEmpty()) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
+ throw new SQLException("tx id is null");
+ }
+ try {
+ mi.commitLog(partition, transactionDigest, id, progressKeeper);
+ } catch (MDBCServiceException e) {
+ // TODO Auto-generated catch block
+ logger.error("Cannot commit log to music" + e.getStackTrace());
+ throw new SQLException(e.getMessage());
+ }
+ }
+ if(progressKeeper!=null) {
+ progressKeeper.setMusicDone(id);
}
conn.setAutoCommit(autoCommit);
if(progressKeeper!=null) {
@@ -164,6 +180,11 @@ public class MdbcConnection implements Connection {
return conn.getAutoCommit();
}
+ /**
+ * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed,
+ * they are performed now and copied into MUSIC.
+ * @throws SQLException
+ */
@Override
public void commit() throws SQLException {
if(progressKeeper.isComplete(id)) {
@@ -174,7 +195,9 @@ public class MdbcConnection implements Connection {
}
try {
- mgr.commit(id,progressKeeper,partition);
+ logger.debug(EELFLoggerDelegate.applicationLogger, " commit ");
+ // transaction was committed -- add all the updates into the REDO-Log in MUSIC
+ mi.commitLog(partition, transactionDigest, id, progressKeeper);
} catch (MDBCServiceException e) {
//If the commit fail, then a new commitId should be used
logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
@@ -196,19 +219,26 @@ public class MdbcConnection implements Connection {
}
}
+ /**
+ * Perform a rollback, as requested by the JDBC driver. If any row updates have been delayed,
+ * they are discarded.
+ */
@Override
public void rollback() throws SQLException {
- mgr.rollback();
+ logger.debug(EELFLoggerDelegate.applicationLogger, "Rollback");;
+ transactionDigest.clear();
conn.rollback();
progressKeeper.reinitializeTxProgress(id);
}
+ /**
+ * Close this MdbcConnection.
+ */
@Override
public void close() throws SQLException {
logger.debug("Closing mdbc connection with id:"+id);
- if (mgr != null) {
- logger.debug("Closing mdbc manager with id:"+id);
- mgr.close();
+ if (dbi != null) {
+ dbi.close();
}
if (conn != null && !conn.isClosed()) {
logger.debug("Closing jdbc from mdbc with id:"+id);
@@ -269,18 +299,18 @@ public class MdbcConnection implements Connection {
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
- return new MdbcCallableStatement(conn.createStatement(resultSetType, resultSetConcurrency), mgr);
+ return new MdbcCallableStatement(conn.createStatement(resultSetType, resultSetConcurrency), this);
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
throws SQLException {
- return new MdbcCallableStatement(conn.prepareStatement(sql, resultSetType, resultSetConcurrency), sql, mgr);
+ return new MdbcCallableStatement(conn.prepareStatement(sql, resultSetType, resultSetConcurrency), sql, this);
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
- return new MdbcCallableStatement(conn.prepareCall(sql, resultSetType, resultSetConcurrency), mgr);
+ return new MdbcCallableStatement(conn.prepareCall(sql, resultSetType, resultSetConcurrency), this);
}
@Override
@@ -326,34 +356,34 @@ public class MdbcConnection implements Connection {
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
throws SQLException {
- return new MdbcCallableStatement(conn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability), mgr);
+ return new MdbcCallableStatement(conn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability), this);
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
int resultSetHoldability) throws SQLException {
- return new MdbcCallableStatement(conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql, mgr);
+ return new MdbcCallableStatement(conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql, this);
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
int resultSetHoldability) throws SQLException {
- return new MdbcCallableStatement(conn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability), mgr);
+ return new MdbcCallableStatement(conn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability), this);
}
@Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
- return new MdbcPreparedStatement(conn.prepareStatement(sql, autoGeneratedKeys), sql, mgr);
+ return new MdbcPreparedStatement(conn.prepareStatement(sql, autoGeneratedKeys), sql, this);
}
@Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
- return new MdbcPreparedStatement(conn.prepareStatement(sql, columnIndexes), sql, mgr);
+ return new MdbcPreparedStatement(conn.prepareStatement(sql, columnIndexes), sql, this);
}
@Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
- return new MdbcPreparedStatement(conn.prepareStatement(sql, columnNames), sql, mgr);
+ return new MdbcPreparedStatement(conn.prepareStatement(sql, columnNames), sql, this);
}
@Override
@@ -435,4 +465,56 @@ public class MdbcConnection implements Connection {
public int getNetworkTimeout() throws SQLException {
return conn.getNetworkTimeout();
}
+
+
+ /**
+ * Code to be run within the DB driver before a SQL statement is executed. This is where tables
+ * can be synchronized before a SELECT, for those databases that do not support SELECT triggers.
+ * @param sql the SQL statement that is about to be executed
+ */
+ public void preStatementHook(String sql) {
+ dbi.preStatementHook(sql);
+ }
+
+ /**
+ * Code to be run within the DB driver after a SQL statement has been executed. This is where remote
+ * statement actions can be copied back to Cassandra/MUSIC.
+ * @param sql the SQL statement that was executed
+ */
+ public void postStatementHook(String sql) {
+ dbi.postStatementHook(sql, transactionDigest);
+ }
+
+ /**
+ * Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the
+ * proxy first starts, and whenever there is the possibility that tables were created or dropped. It is synchronized
+ * in order to prevent multiple threads from running this code in parallel.
+ */
+ public void synchronizeTables() throws QueryException {
+ Set<String> set1 = dbi.getSQLTableSet(); // set of tables in the database
+ logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1);
+ for (String tableName : set1) {
+ // This map will be filled in if this table was previously discovered
+ tableName = tableName.toUpperCase();
+ if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) {
+ logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName);
+ try {
+ TableInfo ti = dbi.getTableInfo(tableName);
+ mi.initializeMusicForTable(ti,tableName);
+ //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted
+ ti = dbi.getTableInfo(tableName);
+ mi.createDirtyRowTable(ti,tableName);
+ dbi.createSQLTriggers(tableName);
+ table_set.add(tableName);
+ dbi.synchronizeData(tableName);
+ logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" +
+ table_set.size() + "/" + set1.size() + "tables uploaded");
+ } catch (Exception e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
+ //logger.error(EELFLoggerDelegate.errorLogger, "Exception synchronizeTables: "+e);
+ throw new QueryException();
+ }
+ }
+ }
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java
index fa08d3c..092aa94 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java
@@ -62,13 +62,13 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
String[] params; // holds the parameters if prepared statement, indexing starts at 1
- public MdbcPreparedStatement(Statement stmt, MusicSqlManager m) {
- super(stmt, m);
+ public MdbcPreparedStatement(Statement stmt, MdbcConnection mConn) {
+ super(stmt, mConn);
this.sql = null;
}
- public MdbcPreparedStatement(Statement stmt, String sql, MusicSqlManager mgr) {
- super(stmt, sql, mgr);
+ public MdbcPreparedStatement(Statement stmt, String sql, MdbcConnection mConn) {
+ super(stmt, sql, mConn);
this.sql = sql;
//indexing starts at 1
params = new String[StringUtils.countMatches(sql, "?")+1];
@@ -89,9 +89,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"executeQuery: "+sql);
ResultSet r = null;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
r = stmt.executeQuery(sql);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -108,9 +108,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -189,9 +189,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -301,9 +301,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql, autoGeneratedKeys);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -319,9 +319,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql, columnIndexes);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -337,9 +337,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql, columnNames);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -355,9 +355,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql, autoGeneratedKeys);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -373,9 +373,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql, columnIndexes);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -391,9 +391,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql, columnNames);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -439,9 +439,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"executeQuery: "+sql);
ResultSet r = null;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
r = ((PreparedStatement)stmt).executeQuery();;
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
e.printStackTrace();
@@ -458,9 +458,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = ((PreparedStatement)stmt).executeUpdate();
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
e.printStackTrace();
@@ -579,9 +579,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = ((PreparedStatement)stmt).execute();
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
e.printStackTrace();
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
index cccea92..757b468 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
@@ -97,7 +97,7 @@ public class MdbcServerLogic extends JdbcMeta{
//\TODO: don't use connectionCache, use this.manager internal state
Connection conn = connectionCache.getIfPresent(id);
if (conn == null) {
- this.manager.CloseConnection(id);
+ this.manager.closeConnection(id);
logger.error(EELFLoggerDelegate.errorLogger,"Connection not found: invalid id, closed, or expired: "
+ id);
throw new RuntimeException(" Connection not found: invalid id, closed, or expired: " + id);
@@ -119,8 +119,8 @@ public class MdbcServerLogic extends JdbcMeta{
}
// Avoid global synchronization of connection opening
try {
- this.manager.OpenConnection(ch.id, info);
- Connection conn = this.manager.GetConnection(ch.id);
+ this.manager.openConnection(ch.id, info);
+ Connection conn = this.manager.getConnection(ch.id);
if(conn == null) {
logger.error(EELFLoggerDelegate.errorLogger, "Connection created was null");
throw new RuntimeException("Connection created was null for connection: " + ch.id);
@@ -130,7 +130,7 @@ public class MdbcServerLogic extends JdbcMeta{
// Race condition: someone beat us to storing the connection in the cache.
if (loadedConn != null) {
//\TODO check if we added an additional race condition for this
- this.manager.CloseConnection(ch.id);
+ this.manager.closeConnection(ch.id);
conn.close();
throw new RuntimeException("Connection already exists: " + ch.id);
}
@@ -156,7 +156,7 @@ public class MdbcServerLogic extends JdbcMeta{
throw new RuntimeException(e.getMessage());
} finally {
connectionCache.invalidate(ch.id);
- this.manager.CloseConnection(ch.id);
+ this.manager.closeConnection(ch.id);
logger.info("connection closed with id {}", ch.id);
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java
index 320531f..97f4be4 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java
@@ -42,18 +42,18 @@ public class MdbcStatement implements Statement {
private static final String DATASTAX_PREFIX = "com.datastax.driver";
final Statement stmt; // the Statement that we are proxying
- final MusicSqlManager mgr;
+ final MdbcConnection mConn;
//\TODO We may need to all pass the connection object to support autocommit
- public MdbcStatement(Statement s, MusicSqlManager m) {
+ public MdbcStatement(Statement s, MdbcConnection mConn) {
this.stmt = s;
- this.mgr = m;
+ this.mConn = mConn;
}
- public MdbcStatement(Statement stmt, String sql, MusicSqlManager mgr) {
+ public MdbcStatement(Statement stmt, String sql, MdbcConnection mConn) {
//\TODO why there is a constructor with a sql parameter in a not PreparedStatement
this.stmt = stmt;
- this.mgr = mgr;
+ this.mConn = mConn;
}
@Override
@@ -73,9 +73,9 @@ public class MdbcStatement implements Statement {
logger.debug(EELFLoggerDelegate.applicationLogger,"executeQuery: "+sql);
ResultSet r = null;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
r = stmt.executeQuery(sql);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -92,9 +92,9 @@ public class MdbcStatement implements Statement {
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -175,9 +175,9 @@ public class MdbcStatement implements Statement {
boolean b = false;
//\TODO Add the result of the postStatementHook to b
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -287,9 +287,9 @@ public class MdbcStatement implements Statement {
logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql, autoGeneratedKeys);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -305,9 +305,9 @@ public class MdbcStatement implements Statement {
logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql, columnIndexes);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -323,9 +323,9 @@ public class MdbcStatement implements Statement {
logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql, columnNames);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -341,9 +341,9 @@ public class MdbcStatement implements Statement {
logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql, autoGeneratedKeys);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -359,9 +359,9 @@ public class MdbcStatement implements Statement {
logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql, columnIndexes);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -378,9 +378,9 @@ public class MdbcStatement implements Statement {
//\TODO Idem to the other execute without columnNames
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql, columnNames);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -423,9 +423,9 @@ public class MdbcStatement implements Statement {
protected void synchronizeTables(String sql) {
if (sql == null || sql.trim().toLowerCase().startsWith("create")) {
- if (mgr != null) {
+ if (mConn != null) {
try {
- mgr.synchronizeTables();
+ mConn.synchronizeTables();
} catch (QueryException e) {
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java
deleted file mode 100755
index c2019cf..0000000
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * ============LICENSE_START====================================================
- * org.onap.music.mdbc
- * =============================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END======================================================
- */
-package org.onap.music.mdbc;
-
-import java.sql.Connection;
-import java.util.*;
-
-import org.json.JSONObject;
-
-import org.onap.music.mdbc.mixins.DBInterface;
-import org.onap.music.mdbc.mixins.MixinFactory;
-import org.onap.music.mdbc.mixins.MusicInterface;
-import org.onap.music.mdbc.mixins.Utils;
-import org.onap.music.mdbc.tables.StagingTable;
-import org.onap.music.mdbc.tables.TxCommitProgress;
-import org.onap.music.exceptions.MDBCServiceException;
-import org.onap.music.exceptions.QueryException;
-import org.onap.music.logging.*;
-import org.onap.music.logging.format.AppMessages;
-import org.onap.music.logging.format.ErrorSeverity;
-import org.onap.music.logging.format.ErrorTypes;
-
-/**
-* <p>
-* MUSIC SQL Manager - code that helps take data written to a SQL database and seamlessly integrates it
-* with <a href="https://github.com/att/music">MUSIC</a> that maintains data in a No-SQL data-store
-* (<a href="http://cassandra.apache.org/">Cassandra</a>) and protects access to it with a distributed
-* locking service (based on <a href="https://zookeeper.apache.org/">Zookeeper</a>).
-* </p>
-* <p>
-* This code will support transactions by taking note of the value of the autoCommit flag, and of calls
-* to <code>commit()</code> and <code>rollback()</code>. These calls should be made by the user's JDBC
-* client.
-* </p>
-*
-* @author Bharath Balasubramanian, Robert Eby
-*/
-public class MusicSqlManager {
-
- private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicSqlManager.class);
-
- private final DBInterface dbi;
- private final MusicInterface mi;
- private final Set<String> table_set;
- private final HashMap<Range,StagingTable> transactionDigest;
- private boolean autocommit; // a copy of the autocommit flag from the JDBC Connection
-
- /**
- * Build a MusicSqlManager for a DB connection. This construct may only be called by getMusicSqlManager(),
- * which will ensure that only one MusicSqlManager is created per URL.
- * This is the location where the appropriate mixins to use for the MusicSqlManager should be determined.
- * They should be picked based upon the URL and the properties passed to this constructor.
- * <p>
- * At the present time, we only support the use of the H2Mixin (for access to a local H2 database),
- * with the CassandraMixin (for direct access to a Cassandra noSQL DB as the persistence layer).
- * </p>
- *
- * @param url the JDBC URL which was used to connection to the database
- * @param conn the actual connection to the database
- * @param info properties passed from the initial JDBC connect() call
- * @throws MDBCServiceException
- */
- public MusicSqlManager(String url, Connection conn, Properties info, MusicInterface mi) throws MDBCServiceException {
- try {
- info.putAll(MDBCUtils.getMdbcProperties());
- String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT);
- this.dbi = MixinFactory.createDBInterface(mixinDb, this, url, conn, info);
- this.mi = mi;
- this.table_set = Collections.synchronizedSet(new HashSet<String>());
- this.autocommit = true;
- this.transactionDigest = new HashMap<Range,StagingTable>();
-
- }catch(Exception e) {
- throw new MDBCServiceException(e.getMessage());
- }
- }
-
- public void setAutoCommit(boolean b,String txId, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException {
- if (b != autocommit) {
- autocommit = b;
- logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b);
- if (b) {
- // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction
- if(txId == null || txId.isEmpty()) {
- logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
- throw new MDBCServiceException("tx id is null");
- }
- commit(txId,progressKeeper,partition);
- }
- }
- }
-
- /**
- * Close this MusicSqlManager.
- */
- public void close() {
- if (dbi != null) {
- dbi.close();
- }
- }
-
- /**
- * Code to be run within the DB driver before a SQL statement is executed. This is where tables
- * can be synchronized before a SELECT, for those databases that do not support SELECT triggers.
- * @param sql the SQL statement that is about to be executed
- */
- public void preStatementHook(final String sql) {
- dbi.preStatementHook(sql);
- }
- /**
- * Code to be run within the DB driver after a SQL statement has been executed. This is where remote
- * statement actions can be copied back to Cassandra/MUSIC.
- * @param sql the SQL statement that was executed
- */
- public void postStatementHook(final String sql) {
- dbi.postStatementHook(sql,transactionDigest);
- }
- /**
- * Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the
- * proxy first starts, and whenever there is the possibility that tables were created or dropped. It is synchronized
- * in order to prevent multiple threads from running this code in parallel.
- */
- public synchronized void synchronizeTables() throws QueryException {
- Set<String> set1 = dbi.getSQLTableSet(); // set of tables in the database
- logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1);
- for (String tableName : set1) {
- // This map will be filled in if this table was previously discovered
- tableName = tableName.toUpperCase();
- if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) {
- logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName);
- try {
- TableInfo ti = dbi.getTableInfo(tableName);
- mi.initializeMusicForTable(ti,tableName);
- //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted
- ti = dbi.getTableInfo(tableName);
- mi.createDirtyRowTable(ti,tableName);
- dbi.createSQLTriggers(tableName);
- table_set.add(tableName);
- synchronizeTableData(tableName);
- logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" +
- table_set.size() + "/" + set1.size() + "tables uploaded");
- } catch (Exception e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
- //logger.error(EELFLoggerDelegate.errorLogger, "Exception synchronizeTables: "+e);
- throw new QueryException();
- }
- }
- }
-
-// Set<String> set2 = getMusicTableSet(music_ns);
- // not working - fix later
-// for (String tbl : set2) {
-// if (!set1.contains(tbl)) {
-// logger.debug("Old table dropped: "+tbl);
-// dropSQLTriggers(tbl, conn);
-// // ZZTODO drop camunda table ?
-// }
-// }
- }
-
- /**
- * On startup, copy dirty data from Cassandra to H2. May not be needed.
- * @param tableName
- */
- public void synchronizeTableData(String tableName) {
- // TODO - copy MUSIC -> H2
- dbi.synchronizeData(tableName);
- }
- /**
- * This method is called whenever there is a SELECT on a local SQL table, and should be called by the underlying databases
- * triggering mechanism. It first checks the local dirty bits table to see if there are any keys in Cassandra whose value
- * has not yet been sent to SQL. If there are, the appropriate values are copied from Cassandra to the local database.
- * Under normal execution, this function behaves as a NOP operation.
- * @param tableName This is the table on which the SELECT is being performed
- */
- public void readDirtyRowsAndUpdateDb(String tableName) {
- mi.readDirtyRowsAndUpdateDb(dbi,tableName);
- }
-
-
-
-
- /**
- * This method gets the primary key that the music interfaces uses by default.
- * If the front end uses a primary key, this will not match what is used in the MUSIC interface
- * @return
- */
- public String getMusicDefaultPrimaryKeyName() {
- return mi.getMusicDefaultPrimaryKeyName();
- }
-
- /**
- * Asks music interface to provide the function to create a primary key
- * e.g. uuid(), 1, "unique_aksd419fjc"
- * @return
- */
- public String generateUniqueKey() {
- //
- return mi.generateUniqueKey();
- }
-
-
- /**
- * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed,
- * they are performed now and copied into MUSIC.
- * @throws MDBCServiceException
- */
- public synchronized void commit(String txId, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException {
- logger.debug(EELFLoggerDelegate.applicationLogger, " commit ");
- // transaction was committed -- add all the updates into the REDO-Log in MUSIC
- try {
- mi.commitLog(dbi, partition, transactionDigest, txId, progressKeeper);
- }catch(MDBCServiceException e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
- throw e;
- }
- }
-
- /**
- * Perform a rollback, as requested by the JDBC driver. If any row updates have been delayed,
- * they are discarded.
- */
- public synchronized void rollback() {
- // transaction was rolled back - discard the updates
- logger.debug(EELFLoggerDelegate.applicationLogger, "Rollback");;
- transactionDigest.clear();
- }
-
- /**
- * Get all
- * @param table
- * @param dbRow
- * @return
- */
- public String getMusicKeyFromRowWithoutPrimaryIndexes(String table, JSONObject dbRow) {
- TableInfo ti = dbi.getTableInfo(table);
- return mi.getMusicKeyFromRowWithoutPrimaryIndexes(ti,table, dbRow);
- }
-
- @Deprecated
- public String getMusicKeyFromRow(String table, JSONObject dbRow) {
- TableInfo ti = dbi.getTableInfo(table);
- return mi.getMusicKeyFromRow(ti,table, dbRow);
- }
-
- /**
- * Returns all keys that matches the current sql statement, and not in already updated keys.
- *
- * @param sql the query that we are getting keys for
- * @deprecated
- */
- public ArrayList<String> getMusicKeys(String sql) {
- ArrayList<String> musicKeys = new ArrayList<String>();
- //\TODO See if this is required
- /*
- try {
- net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql);
- if (stmt instanceof Insert) {
- Insert s = (Insert) stmt;
- String tbl = s.getTable().getName();
- musicKeys.add(generatePrimaryKey());
- } else {
- String tbl;
- String where = "";
- if (stmt instanceof Update){
- Update u = (Update) stmt;
- tbl = u.getTables().get(0).getName();
- where = u.getWhere().toString();
- } else if (stmt instanceof Delete) {
- Delete d = (Delete) stmt;
- tbl = d.getTable().getName();
- if (d.getWhere()!=null) {
- where = d.getWhere().toString();
- }
- } else {
- System.err.println("Not recognized sql type");
- tbl = "";
- }
- String dbiSelect = "SELECT * FROM " + tbl;
- if (!where.equals("")) {
- dbiSelect += "WHERE" + where;
- }
- ResultSet rs = dbi.executeSQLRead(dbiSelect);
- musicKeys.addAll(getMusicKeysWhere(tbl, Utils.parseResults(dbi.getTableInfo(tbl), rs)));
- rs.getStatement().close();
- }
- } catch (JSQLParserException | SQLException e) {
-
- e.printStackTrace();
- }
- System.err.print("MusicKeys:");
- for(String musicKey:musicKeys) {
- System.out.print(musicKey + ",");
- }
- */
- return musicKeys;
- }
-
- public void own(List<Range> ranges) {
- throw new java.lang.UnsupportedOperationException("function not implemented yet");
- }
-
- public void appendRange(String rangeId, List<Range> ranges) {
- throw new java.lang.UnsupportedOperationException("function not implemented yet");
- }
-
- public void relinquish(String ownerId, String rangeId) {
- throw new java.lang.UnsupportedOperationException("function not implemented yet");
- }
-
-
-}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java
index 49b032e..3175f94 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java
@@ -60,11 +60,11 @@ public class ProxyStatement implements CallableStatement {
private static final String DATASTAX_PREFIX = "com.datastax.driver";
private final Statement stmt; // the Statement that we are proxying
- private final MusicSqlManager mgr;
+ private final MdbcConnection mConn;
- public ProxyStatement(Statement s, MusicSqlManager m) {
+ public ProxyStatement(Statement s, MdbcConnection mConn) {
this.stmt = s;
- this.mgr = m;
+ this.mConn = mConn;
}
@Override
@@ -82,9 +82,9 @@ public class ProxyStatement implements CallableStatement {
logger.debug("executeQuery: "+sql);
ResultSet r = null;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
r = stmt.executeQuery(sql);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -100,9 +100,9 @@ public class ProxyStatement implements CallableStatement {
logger.debug("executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -178,9 +178,9 @@ public class ProxyStatement implements CallableStatement {
logger.debug("execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -289,9 +289,9 @@ public class ProxyStatement implements CallableStatement {
logger.debug("executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql, autoGeneratedKeys);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -307,9 +307,9 @@ public class ProxyStatement implements CallableStatement {
logger.debug("executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql, columnIndexes);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -325,9 +325,9 @@ public class ProxyStatement implements CallableStatement {
logger.debug("executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql, columnNames);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -343,9 +343,9 @@ public class ProxyStatement implements CallableStatement {
logger.debug("execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql, autoGeneratedKeys);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -361,9 +361,9 @@ public class ProxyStatement implements CallableStatement {
logger.debug("execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql, columnIndexes);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -379,9 +379,9 @@ public class ProxyStatement implements CallableStatement {
logger.debug("execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql, columnNames);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -1268,9 +1268,9 @@ public class ProxyStatement implements CallableStatement {
private void synchronizeTables(String sql) {
if (sql == null || sql.trim().toLowerCase().startsWith("create")) {
- if (mgr != null) {
+ if (mConn != null) {
try {
- mgr.synchronizeTables();
+ mConn.synchronizeTables();
} catch (QueryException e) {
e.printStackTrace();
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index 4bb0c85..1f2ad91 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -27,7 +27,6 @@ import org.onap.music.logging.format.ErrorSeverity;
import org.onap.music.logging.format.ErrorTypes;
import org.onap.music.mdbc.mixins.MixinFactory;
import org.onap.music.mdbc.mixins.MusicInterface;
-import org.onap.music.mdbc.mixins.MusicMixin;
import org.onap.music.mdbc.tables.MusicTxDigest;
import org.onap.music.mdbc.tables.TxCommitProgress;
@@ -151,7 +150,7 @@ public class StateManager {
}
- public void CloseConnection(String connectionId){
+ public void closeConnection(String connectionId){
//\TODO check if there is a race condition
if(mdbcConnections.containsKey(connectionId)) {
transactionInfo.deleteTxProgress(connectionId);
@@ -166,7 +165,7 @@ public class StateManager {
}
}
- public void OpenConnection(String id, Properties information){
+ public void openConnection(String id, Properties information){
if(!mdbcConnections.containsKey(id)){
Connection sqlConnection;
MdbcConnection newConnection;
@@ -208,7 +207,7 @@ public class StateManager {
* @param id of the transaction, created using
* @return
*/
- public Connection GetConnection(String id) {
+ public Connection getConnection(String id) {
if(mdbcConnections.containsKey(id)) {
//\TODO: Verify if this make sense
// Intent: reinitialize transaction progress, when it already completed the previous tx for the same connection
@@ -253,7 +252,7 @@ public class StateManager {
return newConnection;
}
- public void InitializeSystem() {
+ public void initializeSystem() {
//\TODO Prefetch data to system using the data ranges as guide
throw new UnsupportedOperationException("Function initialize system needs to be implemented id MdbcStateManager");
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
index f3f5d22..3e3447d 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
@@ -24,7 +24,7 @@ import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.RedoRow;
-import org.onap.music.mdbc.mixins.CassandraMixin;
+import org.onap.music.mdbc.mixins.MusicMixin;
import org.onap.music.mdbc.tables.MusicTxDigest;
import com.google.gson.Gson;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java
index eede9be..35293ef 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java
@@ -22,7 +22,7 @@ package org.onap.music.mdbc.examples;
import java.sql.*;
import org.apache.calcite.avatica.remote.Driver;
-public class EtdbTestClient {
+public class MdbcTestClient {
public static class Hr {
public final Employee[] emps = {
@@ -145,7 +145,6 @@ public class EtdbTestClient {
}
try {
- connection.commit();
connection.close();
} catch (SQLException e) {
e.printStackTrace();
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java
deleted file mode 100755
index 3b1651f..0000000
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java
+++ /dev/null
@@ -1,1502 +0,0 @@
-/*
- * ============LICENSE_START====================================================
- * org.onap.music.mdbc
- * =============================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END======================================================
- */
-package org.onap.music.mdbc.mixins;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.nio.ByteBuffer;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
-
-import org.onap.music.mdbc.*;
-import org.onap.music.mdbc.tables.MusicTxDigestId;
-import org.onap.music.mdbc.tables.StagingTable;
-import org.onap.music.mdbc.tables.MriReference;
-import org.onap.music.mdbc.tables.MusicRangeInformationRow;
-import org.onap.music.mdbc.tables.MusicTxDigest;
-import org.onap.music.mdbc.tables.TxCommitProgress;
-
-import org.json.JSONObject;
-import org.onap.music.datastore.CassaLockStore;
-import org.onap.music.datastore.PreparedQueryObject;
-import org.onap.music.exceptions.MusicLockingException;
-import org.onap.music.exceptions.MusicQueryException;
-import org.onap.music.exceptions.MusicServiceException;
-import org.onap.music.main.MusicCore;
-import org.onap.music.main.MusicCore.Condition;
-import org.onap.music.main.ResultType;
-import org.onap.music.main.ReturnType;
-
-import org.onap.music.exceptions.MDBCServiceException;
-import org.onap.music.logging.EELFLoggerDelegate;
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.ColumnDefinitions;
-import com.datastax.driver.core.DataType;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.TupleValue;
-
-/**
- * This class provides the methods that MDBC needs to access Cassandra directly in order to provide persistence
- * to calls to the user's DB. It does not do any table or row locking.
- *
- * <p>This code only supports the following limited list of H2 and Cassandra data types:</p>
- * <table summary="">
- * <tr><th>H2 Data Type</th><th>Mapped to Cassandra Data Type</th></tr>
- * <tr><td>BIGINT</td><td>BIGINT</td></tr>
- * <tr><td>BOOLEAN</td><td>BOOLEAN</td></tr>
- * <tr><td>CLOB</td><td>BLOB</td></tr>
- * <tr><td>DOUBLE</td><td>DOUBLE</td></tr>
- * <tr><td>INTEGER</td><td>INT</td></tr>
- * <tr><td>TIMESTAMP</td><td>TIMESTAMP</td></tr>
- * <tr><td>VARBINARY</td><td>BLOB</td></tr>
- * <tr><td>VARCHAR</td><td>VARCHAR</td></tr>
- * </table>
- *
- * @author Robert P. Eby
- */
-public class CassandraMixin implements MusicInterface {
- /** The property name to use to identify this replica to MusicSqlManager */
- public static final String KEY_MY_ID = "myid";
- /** The property name to use for the comma-separated list of replica IDs. */
- public static final String KEY_REPLICAS = "replica_ids";
- /** The property name to use to identify the IP address for Cassandra. */
- public static final String KEY_MUSIC_ADDRESS = "cassandra.host";
- /** The property name to use to provide the replication factor for Cassandra. */
- public static final String KEY_MUSIC_RFACTOR = "music_rfactor";
- /** The property name to use to provide the replication factor for Cassandra. */
- public static final String KEY_MUSIC_NAMESPACE = "music_namespace";
- /** Namespace for the tables in MUSIC (Cassandra) */
- public static final String DEFAULT_MUSIC_NAMESPACE = "namespace";
- /** The default property value to use for the Cassandra IP address. */
- public static final String DEFAULT_MUSIC_ADDRESS = "localhost";
- /** The default property value to use for the Cassandra replication factor. */
- public static final int DEFAULT_MUSIC_RFACTOR = 1;
- /** The default primary string column, if none is provided. */
- public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid";
- /** Type of the primary key, if none is defined by the user */
- public static final String MDBC_PRIMARYKEY_TYPE = "uuid";
-
-
- //\TODO Add logic to change the names when required and create the tables when necessary
- private String musicTxDigestTableName = "musictxdigest";
- private String musicRangeInformationTableName = "musicrangeinformation";
-
- private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassandraMixin.class);
-
- private static final Map<Integer, String> typemap = new HashMap<>();
- static {
- // We only support the following type mappings currently (from DB -> Cassandra).
- // Anything else will likely cause a NullPointerException
- typemap.put(Types.BIGINT, "BIGINT"); // aka. IDENTITY
- typemap.put(Types.BLOB, "VARCHAR");
- typemap.put(Types.BOOLEAN, "BOOLEAN");
- typemap.put(Types.CLOB, "BLOB");
- typemap.put(Types.DATE, "VARCHAR");
- typemap.put(Types.DOUBLE, "DOUBLE");
- typemap.put(Types.DECIMAL, "DECIMAL");
- typemap.put(Types.INTEGER, "INT");
- //typemap.put(Types.TIMESTAMP, "TIMESTAMP");
- typemap.put(Types.SMALLINT, "SMALLINT");
- typemap.put(Types.TIMESTAMP, "VARCHAR");
- typemap.put(Types.VARBINARY, "BLOB");
- typemap.put(Types.VARCHAR, "VARCHAR");
- typemap.put(Types.CHAR, "VARCHAR");
- //The "Hacks", these don't have a direct mapping
- //typemap.put(Types.DATE, "VARCHAR");
- //typemap.put(Types.DATE, "TIMESTAMP");
- }
-
- protected final String music_ns;
- protected final String myId;
- protected final String[] allReplicaIds;
- private final String musicAddress;
- private final int music_rfactor;
- private MusicConnector mCon = null;
- private Session musicSession = null;
- private boolean keyspace_created = false;
- private Map<String, PreparedStatement> ps_cache = new HashMap<>();
- private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>());
-
- public CassandraMixin() {
- //this.logger = null;
- this.musicAddress = null;
- this.music_ns = null;
- this.music_rfactor = 0;
- this.myId = null;
- this.allReplicaIds = null;
- }
-
- public CassandraMixin(String url, Properties info) throws MDBCServiceException {
- // Default values -- should be overridden in the Properties
- // Default to using the host_ids of the various peers as the replica IDs (this is probably preferred)
- this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS);
- logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: musicAddress="+musicAddress);
-
- String s = info.getProperty(KEY_MUSIC_RFACTOR);
- this.music_rfactor = (s == null) ? DEFAULT_MUSIC_RFACTOR : Integer.parseInt(s);
-
- this.myId = info.getProperty(KEY_MY_ID, getMyHostId());
- logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: myId="+myId);
-
-
- this.allReplicaIds = info.getProperty(KEY_REPLICAS, getAllHostIds()).split(",");
- logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: allReplicaIds="+info.getProperty(KEY_REPLICAS, this.myId));
-
- this.music_ns = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE);
- logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns);
- createKeyspace();
- }
-
- /**
- * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables.
- * The keyspace name comes from the initialization properties passed to the JDBC driver.
- */
- @Override
- public void createKeyspace() throws MDBCServiceException {
-
- Map<String,Object> replicationInfo = new HashMap<>();
- replicationInfo.put("'class'", "'SimpleStrategy'");
- replicationInfo.put("'replication_factor'", music_rfactor);
-
- PreparedQueryObject queryObject = new PreparedQueryObject();
- queryObject.appendQueryString(
- "CREATE KEYSPACE IF NOT EXISTS " + this.music_ns +
- " WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":"));
-
- try {
- MusicCore.nonKeyRelatedPut(queryObject, "eventual");
- } catch (MusicServiceException e) {
- if (!e.getMessage().equals("Keyspace "+music_ns+" already exists")) {
- throw new MDBCServiceException("Error creating namespace: "+music_ns+". Internal error:"+e.getErrorMessage());
- }
- }
- }
-
- private String getMyHostId() {
- ResultSet rs = executeMusicRead("SELECT HOST_ID FROM SYSTEM.LOCAL");
- Row row = rs.one();
- return (row == null) ? "UNKNOWN" : row.getUUID("HOST_ID").toString();
- }
- private String getAllHostIds() {
- ResultSet results = executeMusicRead("SELECT HOST_ID FROM SYSTEM.PEERS");
- StringBuilder sb = new StringBuilder(myId);
- for (Row row : results) {
- sb.append(",");
- sb.append(row.getUUID("HOST_ID").toString());
- }
- return sb.toString();
- }
-
- /**
- * Get the name of this MusicInterface mixin object.
- * @return the name
- */
- @Override
- public String getMixinName() {
- return "cassandra";
- }
- /**
- * Do what is needed to close down the MUSIC connection.
- */
- @Override
- public void close() {
- if (musicSession != null) {
- musicSession.close();
- musicSession = null;
- }
- }
- @Override
- public void initializeMetricDataStructures() throws MDBCServiceException {
- try {
- createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number
- createMusicRangeInformationTable();
- }
- catch(MDBCServiceException e){
- logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC");
- }
- }
-
- /**
- * This method performs all necessary initialization in Music/Cassandra to store the table <i>tableName</i>.
- * @param tableName the table to initialize MUSIC for
- */
- @Override
- public void initializeMusicForTable(TableInfo ti, String tableName) {
- /**
- * This code creates two tables for every table in SQL:
- * (i) a table with the exact same name as the SQL table storing the SQL data.
- * (ii) a "dirty bits" table that stores the keys in the Cassandra table that are yet to be
- * updated in the SQL table (they were written by some other node).
- */
- StringBuilder fields = new StringBuilder();
- StringBuilder prikey = new StringBuilder();
- String pfx = "", pfx2 = "";
- for (int i = 0; i < ti.columns.size(); i++) {
- fields.append(pfx)
- .append(ti.columns.get(i))
- .append(" ")
- .append(typemap.get(ti.coltype.get(i)));
- if (ti.iskey.get(i)) {
- // Primary key column
- prikey.append(pfx2).append(ti.columns.get(i));
- pfx2 = ", ";
- }
- pfx = ", ";
- }
- if (prikey.length()==0) {
- fields.append(pfx).append(MDBC_PRIMARYKEY_NAME)
- .append(" ")
- .append(MDBC_PRIMARYKEY_TYPE);
- prikey.append(MDBC_PRIMARYKEY_NAME);
- }
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", music_ns, tableName, fields.toString(), prikey.toString());
- executeMusicWriteQuery(cql);
- }
-
- // **************************************************
- // Dirty Tables (in MUSIC) methods
- // **************************************************
-
- /**
- * Create a <i>dirty row</i> table for the real table <i>tableName</i>. The primary keys columns from the real table are recreated in
- * the dirty table, along with a "REPLICA__" column that names the replica that should update it's internal state from MUSIC.
- * @param tableName the table to create a "dirty" table for
- */
- @Override
- public void createDirtyRowTable(TableInfo ti, String tableName) {
- // create dirtybitsTable at all replicas
-// for (String repl : allReplicaIds) {
-//// String dirtyRowsTableName = "dirty_"+tableName+"_"+allReplicaIds[i];
-//// String dirtyTableQuery = "CREATE TABLE IF NOT EXISTS "+music_ns+"."+ dirtyRowsTableName+" (dirtyRowKeys text PRIMARY KEY);";
-// cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s_%s (dirtyRowKeys TEXT PRIMARY KEY);", music_ns, tableName, repl);
-// executeMusicWriteQuery(cql);
-// }
- StringBuilder ddl = new StringBuilder("REPLICA__ TEXT");
- StringBuilder cols = new StringBuilder("REPLICA__");
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- // Only use the primary keys columns in the "Dirty" table
- ddl.append(", ")
- .append(ti.columns.get(i))
- .append(" ")
- .append(typemap.get(ti.coltype.get(i)));
- cols.append(", ").append(ti.columns.get(i));
- }
- }
- if(cols.length()==0) {
- //fixme
- System.err.println("Create dirty row table found no primary key");
- }
- ddl.append(", PRIMARY KEY(").append(cols).append(")");
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s (%s);", music_ns, tableName, ddl.toString());
- executeMusicWriteQuery(cql);
- }
- /**
- * Drop the dirty row table for <i>tableName</i> from MUSIC.
- * @param tableName the table being dropped
- */
- @Override
- public void dropDirtyRowTable(String tableName) {
- String cql = String.format("DROP TABLE %s.DIRTY_%s;", music_ns, tableName);
- executeMusicWriteQuery(cql);
- }
- /**
- * Mark rows as "dirty" in the dirty rows table for <i>tableName</i>. Rows are marked for all replicas but
- * this one (this replica already has the up to date data).
- * @param tableName the table we are marking dirty
- * @param keys an ordered list of the values being put into the table. The values that correspond to the tables'
- * primary key are copied into the dirty row table.
- */
- @Override
- public void markDirtyRow(TableInfo ti, String tableName, JSONObject keys) {
- Object[] keyObj = getObjects(ti,tableName, keys);
- StringBuilder cols = new StringBuilder("REPLICA__");
- PreparedQueryObject pQueryObject = null;
- StringBuilder vals = new StringBuilder("?");
- List<Object> vallist = new ArrayList<Object>();
- vallist.add(""); // placeholder for replica
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- cols.append(", ").append(ti.columns.get(i));
- vals.append(", ").append("?");
- vallist.add(keyObj[i]);
- }
- }
- if(cols.length()==0) {
- //FIXME
- System.err.println("markDIrtyRow need to fix primary key");
- }
- String cql = String.format("INSERT INTO %s.DIRTY_%s (%s) VALUES (%s);", music_ns, tableName, cols.toString(), vals.toString());
- /*Session sess = getMusicSession();
- PreparedStatement ps = getPreparedStatementFromCache(cql);*/
- String primaryKey;
- if(ti.hasKey()) {
- primaryKey = getMusicKeyFromRow(ti,tableName, keys);
- }
- else {
- primaryKey = getMusicKeyFromRowWithoutPrimaryIndexes(ti,tableName, keys);
- }
- System.out.println("markDirtyRow: PK value: "+primaryKey);
-
- Object pkObj = null;
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- pkObj = keyObj[i];
- }
- }
- for (String repl : allReplicaIds) {
- pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- pQueryObject.addValue(tableName);
- pQueryObject.addValue(repl);
- pQueryObject.addValue(pkObj);
- updateMusicDB(tableName, primaryKey, pQueryObject);
- //if (!repl.equals(myId)) {
- /*logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
- vallist.set(0, repl);
- BoundStatement bound = ps.bind(vallist.toArray());
- bound.setReadTimeoutMillis(60000);
- synchronized (sess) {
- sess.execute(bound);
- }*/
- //}
-
- }
- }
- /**
- * Remove the entries from the dirty row (for this replica) that correspond to a set of primary keys
- * @param tableName the table we are removing dirty entries from
- * @param keys the primary key values to use in the DELETE. Note: this is *only* the primary keys, not a full table row.
- */
- @Override
- public void cleanDirtyRow(TableInfo ti, String tableName, JSONObject keys) {
- Object[] keysObjects = getObjects(ti,tableName,keys);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- StringBuilder cols = new StringBuilder("REPLICA__=?");
- List<Object> vallist = new ArrayList<Object>();
- vallist.add(myId);
- int n = 0;
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- cols.append(" AND ").append(ti.columns.get(i)).append("=?");
- vallist.add(keysObjects[n++]);
- pQueryObject.addValue(keysObjects[n++]);
- }
- }
- String cql = String.format("DELETE FROM %s.DIRTY_%s WHERE %s;", music_ns, tableName, cols.toString());
- logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
- pQueryObject.appendQueryString(cql);
- ReturnType rt = MusicCore.eventualPut(pQueryObject);
- if(rt.getResult().getResult().toLowerCase().equals("failure")) {
- System.out.println("Failure while cleanDirtyRow..."+rt.getMessage());
- }
- /*Session sess = getMusicSession();
- PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(vallist.toArray());
- bound.setReadTimeoutMillis(60000);
- synchronized (sess) {
- sess.execute(bound);
- }*/
- }
- /**
- * Get a list of "dirty rows" for a table. The dirty rows returned apply only to this replica,
- * and consist of a Map of primary key column names and values.
- * @param tableName the table we are querying for
- * @return a list of maps; each list item is a map of the primary key names and values for that "dirty row".
- */
- @Override
- public List<Map<String,Object>> getDirtyRows(TableInfo ti, String tableName) {
- String cql = String.format("SELECT * FROM %s.DIRTY_%s WHERE REPLICA__=?;", music_ns, tableName);
- ResultSet results = null;
- logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
-
- /*Session sess = getMusicSession();
- PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(new Object[] { myId });
- bound.setReadTimeoutMillis(60000);
- synchronized (sess) {
- results = sess.execute(bound);
- }*/
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- try {
- results = MusicCore.get(pQueryObject);
- } catch (MusicServiceException e) {
-
- e.printStackTrace();
- }
-
- ColumnDefinitions cdef = results.getColumnDefinitions();
- List<Map<String,Object>> list = new ArrayList<Map<String,Object>>();
- for (Row row : results) {
- Map<String,Object> objs = new HashMap<String,Object>();
- for (int i = 0; i < cdef.size(); i++) {
- String colname = cdef.getName(i).toUpperCase();
- String coltype = cdef.getType(i).getName().toString().toUpperCase();
- if (!colname.equals("REPLICA__")) {
- switch (coltype) {
- case "BIGINT":
- objs.put(colname, row.getLong(colname));
- break;
- case "BOOLEAN":
- objs.put(colname, row.getBool(colname));
- break;
- case "BLOB":
- objs.put(colname, row.getString(colname));
- break;
- case "DATE":
- objs.put(colname, row.getString(colname));
- break;
- case "DOUBLE":
- objs.put(colname, row.getDouble(colname));
- break;
- case "DECIMAL":
- objs.put(colname, row.getDecimal(colname));
- break;
- case "INT":
- objs.put(colname, row.getInt(colname));
- break;
- case "TIMESTAMP":
- objs.put(colname, row.getTimestamp(colname));
- break;
- case "VARCHAR":
- default:
- objs.put(colname, row.getString(colname));
- break;
- }
- }
- }
- list.add(objs);
- }
- return list;
- }
-
- /**
- * Drops the named table and its dirty row table (for all replicas) from MUSIC. The dirty row table is dropped first.
- * @param tableName This is the table that has been dropped
- */
- @Override
- public void clearMusicForTable(String tableName) {
- dropDirtyRowTable(tableName);
- String cql = String.format("DROP TABLE %s.%s;", music_ns, tableName);
- executeMusicWriteQuery(cql);
- }
- /**
- * This function is called whenever there is a DELETE to a row on a local SQL table, wherein it updates the
- * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. MUSIC propagates
- * it to the other replicas.
- *
- * @param tableName This is the table that has changed.
- * @param oldRow This is a copy of the old row being deleted
- */
- @Override
- public void deleteFromEntityTableInMusic(TableInfo ti, String tableName, JSONObject oldRow) {
- Object[] objects = getObjects(ti,tableName,oldRow);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- if (ti.hasKey()) {
- assert(ti.columns.size() == objects.length);
- } else {
- assert(ti.columns.size()+1 == objects.length);
- }
-
- StringBuilder where = new StringBuilder();
- List<Object> vallist = new ArrayList<Object>();
- String pfx = "";
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- where.append(pfx)
- .append(ti.columns.get(i))
- .append("=?");
- vallist.add(objects[i]);
- pQueryObject.addValue(objects[i]);
- pfx = " AND ";
- }
- }
- if (!ti.hasKey()) {
- where.append(MDBC_PRIMARYKEY_NAME + "=?");
- //\FIXME this is wrong, old row is not going to contain the UUID, this needs to be fixed
- vallist.add(UUID.fromString((String) objects[0]));
- pQueryObject.addValue(UUID.fromString((String) objects[0]));
- }
-
- String cql = String.format("DELETE FROM %s.%s WHERE %s;", music_ns, tableName, where.toString());
- logger.error(EELFLoggerDelegate.errorLogger,"Executing MUSIC write:"+ cql);
- pQueryObject.appendQueryString(cql);
-
- /*PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(vallist.toArray());
- bound.setReadTimeoutMillis(60000);
- Session sess = getMusicSession();
- synchronized (sess) {
- sess.execute(bound);
- }*/
- String primaryKey = getMusicKeyFromRow(ti,tableName, oldRow);
-
- updateMusicDB(tableName, primaryKey, pQueryObject);
-
- // Mark the dirty rows in music for all the replicas but us
- markDirtyRow(ti,tableName, oldRow);
- }
-
- public Set<String> getMusicTableSet(String ns) {
- Set<String> set = new TreeSet<String>();
- String cql = String.format("SELECT TABLE_NAME FROM SYSTEM_SCHEMA.TABLES WHERE KEYSPACE_NAME = '%s'", ns);
- ResultSet rs = executeMusicRead(cql);
- for (Row row : rs) {
- set.add(row.getString("TABLE_NAME").toUpperCase());
- }
- return set;
- }
- /**
- * This method is called whenever there is a SELECT on a local SQL table, wherein it first checks the local
- * dirty bits table to see if there are any keys in Cassandra whose value has not yet been sent to SQL
- * @param tableName This is the table on which the select is being performed
- */
- @Override
- public void readDirtyRowsAndUpdateDb(DBInterface dbi, String tableName) {
- // Read dirty rows of this table from Music
- TableInfo ti = dbi.getTableInfo(tableName);
- List<Map<String,Object>> objlist = getDirtyRows(ti,tableName);
- PreparedQueryObject pQueryObject = null;
- String pre_cql = String.format("SELECT * FROM %s.%s WHERE ", music_ns, tableName);
- List<Object> vallist = new ArrayList<Object>();
- StringBuilder sb = new StringBuilder();
- //\TODO Perform a batch operation instead of each row at a time
- for (Map<String,Object> map : objlist) {
- pQueryObject = new PreparedQueryObject();
- sb.setLength(0);
- vallist.clear();
- String pfx = "";
- for (String key : map.keySet()) {
- sb.append(pfx).append(key).append("=?");
- vallist.add(map.get(key));
- pQueryObject.addValue(map.get(key));
- pfx = " AND ";
- }
-
- String cql = pre_cql + sb.toString();
- System.out.println("readDirtyRowsAndUpdateDb: cql: "+cql);
- pQueryObject.appendQueryString(cql);
- ResultSet dirtyRows = null;
- try {
- //\TODO Why is this an eventual put?, this should be an atomic
- dirtyRows = MusicCore.get(pQueryObject);
- } catch (MusicServiceException e) {
-
- e.printStackTrace();
- }
- /*
- Session sess = getMusicSession();
- PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(vallist.toArray());
- bound.setReadTimeoutMillis(60000);
- ResultSet dirtyRows = null;
- synchronized (sess) {
- dirtyRows = sess.execute(bound);
- }*/
- List<Row> rows = dirtyRows.all();
- if (rows.isEmpty()) {
- // No rows, the row must have been deleted
- deleteRowFromSqlDb(dbi,tableName, map);
- } else {
- for (Row row : rows) {
- writeMusicRowToSQLDb(dbi,tableName, row);
- }
- }
- }
- }
-
- private void deleteRowFromSqlDb(DBInterface dbi, String tableName, Map<String, Object> map) {
- dbi.deleteRowFromSqlDb(tableName, map);
- TableInfo ti = dbi.getTableInfo(tableName);
- List<Object> vallist = new ArrayList<Object>();
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- String col = ti.columns.get(i);
- Object val = map.get(col);
- vallist.add(val);
- }
- }
- cleanDirtyRow(ti, tableName, new JSONObject(vallist));
- }
- /**
- * This functions copies the contents of a row in Music into the corresponding row in the SQL table
- * @param tableName This is the name of the table in both Music and swl
- * @param musicRow This is the row in Music that is being copied into SQL
- */
- private void writeMusicRowToSQLDb(DBInterface dbi, String tableName, Row musicRow) {
- // First construct the map of columns and their values
- TableInfo ti = dbi.getTableInfo(tableName);
- Map<String, Object> map = new HashMap<String, Object>();
- List<Object> vallist = new ArrayList<Object>();
- String rowid = tableName;
- for (String col : ti.columns) {
- Object val = getValue(musicRow, col);
- map.put(col, val);
- if (ti.iskey(col)) {
- vallist.add(val);
- rowid += "_" + val.toString();
- }
- }
-
- logger.debug("Blocking rowid: "+rowid);
- in_progress.add(rowid); // Block propagation of the following INSERT/UPDATE
-
- dbi.insertRowIntoSqlDb(tableName, map);
-
- logger.debug("Unblocking rowid: "+rowid);
- in_progress.remove(rowid); // Unblock propagation
-
-// try {
-// String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString());
-// executeSQLWrite(sql);
-// } catch (SQLException e) {
-// logger.debug("Insert failed because row exists, do an update");
-// // TODO - rewrite this UPDATE command should not update key fields
-// String sql = String.format("UPDATE %s SET (%s) = (%s) WHERE %s", tableName, fields.toString(), values.toString(), where.toString());
-// try {
-// executeSQLWrite(sql);
-// } catch (SQLException e1) {
-// e1.printStackTrace();
-// }
-// }
-
- ti = dbi.getTableInfo(tableName);
- cleanDirtyRow(ti, tableName, new JSONObject(vallist));
-
-// String selectQuery = "select "+ primaryKeyName+" FROM "+tableName+" WHERE "+primaryKeyName+"="+primaryKeyValue+";";
-// java.sql.ResultSet rs = executeSQLRead(selectQuery);
-// String dbWriteQuery=null;
-// try {
-// if(rs.next()){//this entry is there, do an update
-// dbWriteQuery = "UPDATE "+tableName+" SET "+columnNameString+" = "+ valueString +"WHERE "+primaryKeyName+"="+primaryKeyValue+";";
-// }else
-// dbWriteQuery = "INSERT INTO "+tableName+" VALUES"+valueString+";";
-// executeSQLWrite(dbWriteQuery);
-// } catch (SQLException e) {
-// // ZZTODO Auto-generated catch block
-// e.printStackTrace();
-// }
-
- //clean the music dirty bits table
-// String dirtyRowIdsTableName = music_ns+".DIRTY_"+tableName+"_"+myId;
-// String deleteQuery = "DELETE FROM "+dirtyRowIdsTableName+" WHERE dirtyRowKeys=$$"+primaryKeyValue+"$$;";
-// executeMusicWriteQuery(deleteQuery);
- }
- private Object getValue(Row musicRow, String colname) {
- ColumnDefinitions cdef = musicRow.getColumnDefinitions();
- DataType colType;
- try {
- colType= cdef.getType(colname);
- }
- catch(IllegalArgumentException e) {
- logger.warn("Colname is not part of table metadata: "+e);
- throw e;
- }
- String typeStr = colType.getName().toString().toUpperCase();
- switch (typeStr) {
- case "BIGINT":
- return musicRow.getLong(colname);
- case "BOOLEAN":
- return musicRow.getBool(colname);
- case "BLOB":
- return musicRow.getString(colname);
- case "DATE":
- return musicRow.getString(colname);
- case "DECIMAL":
- return musicRow.getDecimal(colname);
- case "DOUBLE":
- return musicRow.getDouble(colname);
- case "SMALLINT":
- case "INT":
- return musicRow.getInt(colname);
- case "TIMESTAMP":
- return musicRow.getTimestamp(colname);
- case "UUID":
- return musicRow.getUUID(colname);
- default:
- logger.error(EELFLoggerDelegate.errorLogger, "UNEXPECTED COLUMN TYPE: columname="+colname+", columntype="+typeStr);
- // fall thru
- case "VARCHAR":
- return musicRow.getString(colname);
- }
- }
-
- /**
- * This method is called whenever there is an INSERT or UPDATE to a local SQL table, wherein it updates the
- * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. Music propagates
- * it to the other replicas.
- *
- * @param tableName This is the table that has changed.
- * @param changedRow This is information about the row that has changed
- */
- @Override
- public void updateDirtyRowAndEntityTableInMusic(TableInfo ti, String tableName, JSONObject changedRow) {
- // Build the CQL command
- Object[] objects = getObjects(ti,tableName,changedRow);
- StringBuilder fields = new StringBuilder();
- StringBuilder values = new StringBuilder();
- String rowid = tableName;
- Object[] newrow = new Object[objects.length];
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- String pfx = "";
- int keyoffset=0;
- for (int i = 0; i < objects.length; i++) {
- if (!ti.hasKey() && i==0) {
- //We need to tack on cassandra's uid in place of a primary key
- fields.append(MDBC_PRIMARYKEY_NAME);
- values.append("?");
- newrow[i] = UUID.fromString((String) objects[i]);
- pQueryObject.addValue(newrow[i]);
- keyoffset=-1;
- pfx = ", ";
- continue;
- }
- fields.append(pfx).append(ti.columns.get(i+keyoffset));
- values.append(pfx).append("?");
- pfx = ", ";
- if (objects[i] instanceof byte[]) {
- // Cassandra doesn't seem to have a Codec to translate a byte[] to a ByteBuffer
- newrow[i] = ByteBuffer.wrap((byte[]) objects[i]);
- pQueryObject.addValue(newrow[i]);
- } else if (objects[i] instanceof Reader) {
- // Cassandra doesn't seem to have a Codec to translate a Reader to a ByteBuffer either...
- newrow[i] = ByteBuffer.wrap(readBytesFromReader((Reader) objects[i]));
- pQueryObject.addValue(newrow[i]);
- } else {
- newrow[i] = objects[i];
- pQueryObject.addValue(newrow[i]);
- }
- if (i+keyoffset>=0 && ti.iskey.get(i+keyoffset)) {
- rowid += "_" + newrow[i].toString();
- }
- }
-
- if (in_progress.contains(rowid)) {
- // This call to updateDirtyRowAndEntityTableInMusic() was called as a result of a Cassandra -> H2 update; ignore
- logger.debug(EELFLoggerDelegate.applicationLogger, "updateDirtyRowAndEntityTableInMusic: bypassing MUSIC update on "+rowid);
-
- } else {
- // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update
- String cql = String.format("INSERT INTO %s.%s (%s) VALUES (%s);", music_ns, tableName, fields.toString(), values.toString());
-
- pQueryObject.appendQueryString(cql);
- String primaryKey = getMusicKeyFromRow(ti,tableName, changedRow);
- updateMusicDB(tableName, primaryKey, pQueryObject);
-
- /*PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(newrow);
- bound.setReadTimeoutMillis(60000);
- Session sess = getMusicSession();
- synchronized (sess) {
- sess.execute(bound);
- }*/
- // Mark the dirty rows in music for all the replicas but us
- markDirtyRow(ti,tableName, changedRow);
- }
- }
-
-
-
- private byte[] readBytesFromReader(Reader rdr) {
- StringBuilder sb = new StringBuilder();
- try {
- int ch;
- while ((ch = rdr.read()) >= 0) {
- sb.append((char)ch);
- }
- } catch (IOException e) {
- logger.warn("readBytesFromReader: "+e);
- }
- return sb.toString().getBytes();
- }
-
- protected PreparedStatement getPreparedStatementFromCache(String cql) {
- // Note: have to hope that the Session never changes!
- if (!ps_cache.containsKey(cql)) {
- Session sess = getMusicSession();
- PreparedStatement ps = sess.prepare(cql);
- ps_cache.put(cql, ps);
- }
- return ps_cache.get(cql);
- }
-
- /**
- * This method gets a connection to Music
- * @return the Cassandra Session to use
- */
- protected Session getMusicSession() {
- // create cassandra session
- if (musicSession == null) {
- logger.info(EELFLoggerDelegate.applicationLogger, "Creating New Music Session");
- mCon = new MusicConnector(musicAddress);
- musicSession = mCon.getSession();
- }
- return musicSession;
- }
-
- /**
- * This method executes a write query in Music
- * @param cql the CQL to be sent to Cassandra
- */
- protected void executeMusicWriteQuery(String cql) {
- logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- ReturnType rt = MusicCore.eventualPut(pQueryObject);
- if(rt.getResult().getResult().toLowerCase().equals("failure")) {
- logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage());
- }
- /*Session sess = getMusicSession();
- SimpleStatement s = new SimpleStatement(cql);
- s.setReadTimeoutMillis(60000);
- synchronized (sess) {
- sess.execute(s);
- }*/
- }
-
- /**
- * This method executes a read query in Music
- * @param cql the CQL to be sent to Cassandra
- * @return a ResultSet containing the rows returned from the query
- */
- protected ResultSet executeMusicRead(String cql) {
- logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- ResultSet results = null;
- try {
- results = MusicCore.get(pQueryObject);
- } catch (MusicServiceException e) {
-
- e.printStackTrace();
- }
- return results;
- /*Session sess = getMusicSession();
- synchronized (sess) {
- return sess.execute(cql);
- }*/
- }
-
- /**
- * Returns the default primary key name that this mixin uses
- */
- public String getMusicDefaultPrimaryKeyName() {
- return MDBC_PRIMARYKEY_NAME;
- }
-
- /**
- * Return the function for cassandra's primary key generation
- */
- public String generateUniqueKey() {
- return MDBCUtils.generateUniqueKey().toString();
- }
-
- @Override
- public String getMusicKeyFromRowWithoutPrimaryIndexes(TableInfo ti, String table, JSONObject dbRow) {
- //\TODO this operation is super expensive to perform, both latency and BW
- // it is better to add additional where clauses, and have the primary key
- // to be composed of known columns of the table
- // Adding this primary indexes would be an additional burden to the developers, which spanner
- // also does, but otherwise performance is really bad
- // At least it should have a set of columns that are guaranteed to be unique
- StringBuilder cqlOperation = new StringBuilder();
- cqlOperation.append("SELECT * FROM ")
- .append(music_ns)
- .append(".")
- .append(table);
- ResultSet musicResults = executeMusicRead(cqlOperation.toString());
- Object[] dbRowObjects = getObjects(ti,table,dbRow);
- while (!musicResults.isExhausted()) {
- Row musicRow = musicResults.one();
- if (rowIs(ti, musicRow, dbRowObjects)) {
- return ((UUID)getValue(musicRow, MDBC_PRIMARYKEY_NAME)).toString();
- }
- }
- //should never reach here
- return null;
- }
-
- /**
- * Checks to see if this row is in list of database entries
- * @param ti
- * @param musicRow
- * @param dbRow
- * @return
- */
- private boolean rowIs(TableInfo ti, Row musicRow, Object[] dbRow) {
- //System.out.println("Comparing " + musicRow.toString());
- boolean sameRow=true;
- for (int i=0; i<ti.columns.size(); i++) {
- Object val = getValue(musicRow, ti.columns.get(i));
- if (!dbRow[i].equals(val)) {
- sameRow=false;
- break;
- }
- }
- return sameRow;
- }
-
- @Override
- public String getMusicKeyFromRow(TableInfo ti, String tableName, JSONObject row) {
- List<String> keyCols = ti.getKeyColumns();
- if(keyCols.isEmpty()){
- throw new IllegalArgumentException("Table doesn't have defined primary indexes ");
- }
- StringBuilder key = new StringBuilder();
- String pfx = "";
- for(String keyCol: keyCols) {
- key.append(pfx);
- key.append(row.get(keyCol));
- pfx = ",";
- }
- String keyStr = key.toString();
- return keyStr;
- }
-
- public void updateMusicDB(String tableName, String primaryKey, PreparedQueryObject pQObject) {
- if(MusicMixin.criticalTables.contains(tableName)) {
- ReturnType rt = null;
- try {
- rt = MusicCore.atomicPut(music_ns, tableName, primaryKey, pQObject, null);
- } catch (MusicLockingException e) {
- e.printStackTrace();
- } catch (MusicServiceException e) {
- e.printStackTrace();
- } catch (MusicQueryException e) {
- e.printStackTrace();
- }
- if(rt.getResult().getResult().toLowerCase().equals("failure")) {
- System.out.println("Failure while critical put..."+rt.getMessage());
- }
- } else {
- ReturnType rt = MusicCore.eventualPut(pQObject);
- if(rt.getResult().getResult().toLowerCase().equals("failure")) {
- System.out.println("Failure while critical put..."+rt.getMessage());
- }
- }
- }
-
-
- private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, String table, UUID redoUuid){
- PreparedQueryObject query = new PreparedQueryObject();
- StringBuilder appendBuilder = new StringBuilder();
- appendBuilder.append("UPDATE ")
- .append(music_ns)
- .append(".")
- .append(mriTable)
- .append(" SET txredolog = txredolog +[('")
- .append(table)
- .append("',")
- .append(redoUuid)
- .append(")] WHERE rangeid = ")
- .append(uuid)
- .append(";");
- query.appendQueryString(appendBuilder.toString());
- return query;
- }
-
- protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException {
- UUID mriIndex = partition.getMusicRangeInformationIndex();
- String lockId;
- lockId = MusicCore.createLockReference(fullyQualifiedKey);
- //\TODO Handle better failures to acquire locks
- ReturnType lockReturn;
- try {
- lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId);
- } catch (MusicLockingException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Lock was not acquire correctly for key "+fullyQualifiedKey);
- throw new MDBCServiceException("Lock was not acquire correctly for key "+fullyQualifiedKey);
- } catch (MusicServiceException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Error in music, when locking key: "+fullyQualifiedKey);
- throw new MDBCServiceException("Error in music, when locking: "+fullyQualifiedKey);
- } catch (MusicQueryException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Error in executing query music, when locking key: "+fullyQualifiedKey);
- throw new MDBCServiceException("Error in executing query music, when locking: "+fullyQualifiedKey);
- }
- //\TODO this is wrong, we should have a better way to obtain a lock forcefully, clean the queue and obtain the lock
- if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) {
- try {
- MusicCore.forciblyReleaseLock(fullyQualifiedKey,lockId);
- CassaLockStore lockingServiceHandle = MusicCore.getLockingServiceHandle();
- CassaLockStore.LockObject lockOwner = lockingServiceHandle.peekLockQueue(music_ns,
- this.musicRangeInformationTableName, mriIndex.toString());
- while(lockOwner.lockRef != lockId) {
- MusicCore.forciblyReleaseLock(fullyQualifiedKey, lockOwner.lockRef);
- try {
- lockOwner = lockingServiceHandle.peekLockQueue(music_ns,
- this.musicRangeInformationTableName, mriIndex.toString());
- } catch(NullPointerException e){
- //Ignore null pointer exception
- lockId = MusicCore.createLockReference(fullyQualifiedKey);
- break;
- }
- }
- lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId);
-
- } catch (MusicLockingException e) {
- throw new MDBCServiceException("Could not lock the corresponding lock");
- } catch (MusicServiceException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Error in music, when locking key: "+fullyQualifiedKey);
- throw new MDBCServiceException("Error in music, when locking: "+fullyQualifiedKey);
- } catch (MusicQueryException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Error in executing query music, when locking key: "+fullyQualifiedKey);
- throw new MDBCServiceException("Error in executing query music, when locking: "+fullyQualifiedKey);
- }
- }
- if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) {
- throw new MDBCServiceException("Could not lock the corresponding lock");
- }
- //TODO: Java newbie here, verify that this lockId is actually assigned to the global DatabasePartition in the StateManager instance
- partition.setLockId(lockId);
- return lockId;
- }
-
-
-
- protected void appendIndexToMri(String lockId, UUID commitId, UUID MriIndex) throws MDBCServiceException{
- PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MriIndex, musicTxDigestTableName, commitId);
- ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, MriIndex.toString(), appendQuery, lockId, null);
- if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
- logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage());
- throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage());
- }
- }
-
- @Override
- public void commitLog(DBInterface dbi, DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{
- UUID mriIndex = partition.getMusicRangeInformationIndex();
- if(mriIndex==null) {
- //\TODO Fetch MriIndex from the Range Information Table
- throw new MDBCServiceException("TIT Index retrieval not yet implemented");
- }
- String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex.toString();
- //0. See if reference to lock was already created
- String lockId = partition.getLockId();
- if(lockId == null || lockId.isEmpty()) {
- lockId = createAndAssignLock(fullyQualifiedMriKey,partition);
- }
-
- UUID commitId;
- //Generate a local commit id
- if(progressKeeper.containsTx(txId)) {
- commitId = progressKeeper.getCommitId(txId);
- }
- else{
- logger.error(EELFLoggerDelegate.errorLogger, "Tx with id "+txId+" was not created in the TxCommitProgress ");
- throw new MDBCServiceException("Tx with id "+txId+" was not created in the TxCommitProgress ");
- }
- //Add creation type of transaction digest
-
- //1. Push new row to RRT and obtain its index
- String serializedTransactionDigest;
- try {
- serializedTransactionDigest = MDBCUtils.toString(transactionDigest);
- } catch (IOException e) {
- throw new MDBCServiceException("Failed to serialized transaction digest with error "+e.toString());
- }
- MusicTxDigestId digestId = new MusicTxDigestId(commitId);
- addTxDigest(digestId, serializedTransactionDigest);
- //2. Save RRT index to RQ
- if(progressKeeper!= null) {
- progressKeeper.setRecordId(txId,digestId);
- }
- //3. Append RRT index into the corresponding TIT row array
- appendToRedoLog(mriIndex,partition,digestId);
- }
-
- /**
- * @param tableName
- * @param string
- * @param rowValues
- * @return
- */
- @SuppressWarnings("unused")
- private String getUid(String tableName, String string, Object[] rowValues) {
- //
- // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update
- String cql = String.format("SELECT * FROM %s.%s;", music_ns, tableName);
- PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind();
- bound.setReadTimeoutMillis(60000);
- Session sess = getMusicSession();
- ResultSet rs;
- synchronized (sess) {
- rs = sess.execute(bound);
- }
-
- //should never reach here
- logger.error(EELFLoggerDelegate.errorLogger, "Could not find the row in the primary key");
- return null;
- }
-
- @Override
- public Object[] getObjects(TableInfo ti, String tableName, JSONObject row) {
- // \FIXME: we may need to add the primary key of the row if it was autogenerated by MUSIC
- List<String> cols = ti.columns;
- int size = cols.size();
- boolean hasDefault = false;
- if(row.has(getMusicDefaultPrimaryKeyName())) {
- size++;
- hasDefault = true;
- }
-
- Object[] objects = new Object[size];
- int idx = 0;
- if(hasDefault) {
- objects[idx++] = row.getString(getMusicDefaultPrimaryKeyName());
- }
- for(String col : ti.columns) {
- objects[idx]=row.get(col);
- }
- return objects;
- }
-
- @Override
- public List<UUID> getPartitionIndexes() {
- ArrayList<UUID> partitions = new ArrayList<UUID>();
- String cql = String.format("SELECT rangeid FROM %s.%s", music_ns, musicRangeInformationTableName);
- ResultSet rs = executeMusicRead(cql);
- for (Row r: rs) {
- partitions.add(r.getUUID("rangeid"));
- }
- return partitions;
- }
-
- @Override
- public MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException {
- //TODO: verify that lock id is valid before calling the database operations function
- //UUID id = partition.getMusicRangeInformationIndex();
-
- String cql = String.format("SELECT * FROM %s.%s WHERE rangeid = ?;", music_ns, musicRangeInformationTableName);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- pQueryObject.addValue(partitionIndex);
- Row newRow;
- try {
- newRow = executeMusicUnlockedQuorumGet(pQueryObject);
- } catch (MDBCServiceException e) {
- logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName);
- throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
- }
-
- List<TupleValue> log = newRow.getList("txredolog",TupleValue.class);
- List<MusicTxDigestId> digestIds = new ArrayList<>();
- for(TupleValue t: log){
- //final String tableName = t.getString(0);
- final UUID index = t.getUUID(1);
- digestIds.add(new MusicTxDigestId(index));
- }
- List<Range> partitions = new ArrayList<>();
- Set<String> tables = newRow.getSet("keys",String.class);
- for (String table:tables){
- partitions.add(new Range(table));
- }
- return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex, ""),
- digestIds, newRow.getString("ownerid"),newRow.getString("metricprocessid"));
- }
-
-
- /**
- * This function creates the TransactionInformation table. It contain information related
- * to the transactions happening in a given partition.
- * * The schema of the table is
- * * Id, uiid.
- * * Partition, uuid id of the partition
- * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables
- * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables
- * * Redo: list of uiids associated to the Redo Records Table
- *
- */
- private void createMusicRangeInformationTable() throws MDBCServiceException {
- String tableName = this.musicRangeInformationTableName;
- String priKey = "rangeid";
- StringBuilder fields = new StringBuilder();
- fields.append("rangeid uuid, ");
- fields.append("keys set<text>, ");
- fields.append("ownerid text, ");
- fields.append("metricprocessid text, ");
- //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly
- fields.append("txredolog list<frozen<tuple<text,uuid>>> ");
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));",
- this.music_ns, tableName, fields, priKey);
- try {
- executeMusicWriteQuery(this.music_ns,tableName,cql);
- } catch (MDBCServiceException e) {
- logger.error("Initialization error: Failure to create transaction information table");
- throw(e);
- }
- }
-
-
- @Override
- public DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException {
- DatabasePartition newPartition = info.getDBPartition();
- String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+newPartition.getMusicRangeInformationIndex().toString();
- String lockId = createAndAssignLock(fullyQualifiedMriKey,newPartition);
- createEmptyMriRow(info.getMetricProcessId(),lockId,new ArrayList<Range>());
- throw new UnsupportedOperationException();
- }
-
- /**
- * Creates a new empty MRI row
- * @param processId id of the process that is going to own initially this.
- * @return uuid associated to the new row
- */
- private UUID createEmptyMriRow(String processId, String lockId, List<Range> ranges)
- throws MDBCServiceException {
- UUID id = MDBCUtils.generateUniqueKey();
- return createEmptyMriRow(id,processId,lockId,ranges);
- }
-
- /**
- * Creates a new empty MRI row
- * @param processId id of the process that is going to own initially this.
- * @return uuid associated to the new row
- */
- private UUID createEmptyMriRow(UUID id, String processId, String lockId, List<Range> ranges)
- throws MDBCServiceException{
- StringBuilder insert = new StringBuilder("INSERT INTO ")
- .append(this.music_ns)
- .append('.')
- .append(this.musicRangeInformationTableName)
- .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ")
- .append("(")
- .append(id)
- .append(",{");
- boolean first=true;
- for(Range r: ranges){
- if(first){ first=false; }
- else {
- insert.append(',');
- }
- insert.append("'").append(r.toString()).append("'");
- }
- insert.append("},'")
- .append((lockId==null)?"":lockId)
- .append("','")
- .append(processId)
- .append("',[]);");
- PreparedQueryObject query = new PreparedQueryObject();
- query.appendQueryString(insert.toString());
- try {
- executeMusicLockedPut(this.music_ns,this.musicRangeInformationTableName,id.toString(),query,lockId,null);
- } catch (MDBCServiceException e) {
- logger.error("Initialization error: Failure to add new row to transaction information");
- throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
- }
- return id;
- }
-
- @Override
- public void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException {
- PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, mriRowId, musicTxDigestTableName, newRecord.txId);
- ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, mriRowId.toString(), appendQuery, partition.getLockId(), null);
- if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
- logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage());
- throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage());
- }
- }
-
- public void createMusicTxDigest() throws MDBCServiceException {
- createMusicTxDigest(-1);
- }
-
-
- /**
- * This function creates the MusicTxDigest table. It contain information related to each transaction committed
- * * LeaseId: id associated with the lease, text
- * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
- * * TransactionDigest: text that contains all the changes in the transaction
- */
- private void createMusicTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException {
- String tableName = this.musicTxDigestTableName;
- if(musicTxDigestTableNumber >= 0) {
- tableName = tableName +
- "-" +
- Integer.toString(musicTxDigestTableNumber);
- }
- String priKey = "txid";
- StringBuilder fields = new StringBuilder();
- fields.append("txid uuid, ");
- fields.append("transactiondigest text ");//notice lack of ','
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey);
- try {
- executeMusicWriteQuery(this.music_ns,tableName,cql);
- } catch (MDBCServiceException e) {
- logger.error("Initialization error: Failure to create redo records table");
- throw(e);
- }
- }
-
-
- @Override
- public void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException {
- //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest);
- PreparedQueryObject query = new PreparedQueryObject();
- String cqlQuery = "INSERT INTO " +
- this.music_ns +
- '.' +
- this.musicTxDigestTableName +
- " (txid,transactiondigest) " +
- "VALUES (" +
- newId.txId + ",'" +
- transactionDigest +
- "');";
- query.appendQueryString(cqlQuery);
- //\TODO check if I am not shooting on my own foot
- try {
- MusicCore.nonKeyRelatedPut(query,"critical");
- } catch (MusicServiceException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage());
- throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString());
- }
- }
-
-
-
- @Override
- public HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException {
- String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- pQueryObject.addValue(id.txId);
- Row newRow;
- try {
- newRow = executeMusicUnlockedQuorumGet(pQueryObject);
- } catch (MDBCServiceException e) {
- logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.txId);
- throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
- }
- String digest = newRow.getString("transactiondigest");
- HashMap<Range,StagingTable> changes;
- try {
- changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest);
- } catch (IOException e) {
- logger.error("IOException when deserializing digest failed with an invalid class for id:"+id.txId);
- throw new MDBCServiceException("Deserializng digest failed with ioexception");
- } catch (ClassNotFoundException e) {
- logger.error("Deserializng digest failed with an invalid class for id:"+id.txId);
- throw new MDBCServiceException("Deserializng digest failed with an invalid class");
- }
- return changes;
- }
-
- @Override
- public void own(List<Range> ranges){
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void appendRange(String rangeId, List<Range> ranges){
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void relinquish(String ownerId, String rangeId){
- throw new UnsupportedOperationException();
- }
-
- /**
- * This method executes a write query in Music
- * @param cql the CQL to be sent to Cassandra
- */
- private static void executeMusicWriteQuery(String keyspace, String table, String cql)
- throws MDBCServiceException {
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- ResultType rt = null;
- try {
- rt = MusicCore.createTable(keyspace,table,pQueryObject,"critical");
- } catch (MusicServiceException e) {
- //\TODO: handle better, at least transform into an MDBCServiceException
- e.printStackTrace();
- }
- String result = rt.getResult();
- if (result==null || result.toLowerCase().equals("failure")) {
- throw new MDBCServiceException("Music eventual put failed");
- }
- }
-
- private static Row executeMusicLockedGet(String keyspace, String table, PreparedQueryObject cqlObject, String primaryKey,
- String lock)
- throws MDBCServiceException{
- ResultSet result;
- try {
- result = MusicCore.criticalGet(keyspace,table,primaryKey,cqlObject,lock);
- } catch(MusicServiceException e){
- //\TODO: handle better, at least transform into an MDBCServiceException
- e.printStackTrace();
- throw new MDBCServiceException("Error executing critical get");
- }
- if(result.isExhausted()){
- throw new MDBCServiceException("There is not a row that matches the id "+primaryKey);
- }
- return result.one();
- }
-
- private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject)
- throws MDBCServiceException{
- ResultSet result = MusicCore.quorumGet(cqlObject);
- //\TODO: handle better, at least transform into an MDBCServiceException
- if(result.isExhausted()){
- throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]");
- }
- return result.one();
- }
-
- private void executeMusicLockedPut(String namespace, String tableName,
- String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId,
- MusicCore.Condition conditionInfo) throws MDBCServiceException {
- ReturnType rt ;
- if(lockId==null) {
- try {
- rt = MusicCore.atomicPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, conditionInfo);
- } catch (MusicLockingException e) {
- logger.error("Music locked put failed");
- throw new MDBCServiceException("Music locked put failed");
- } catch (MusicServiceException e) {
- logger.error("Music service fail: Music locked put failed");
- throw new MDBCServiceException("Music service fail: Music locked put failed");
- } catch (MusicQueryException e) {
- logger.error("Music query fail: locked put failed");
- throw new MDBCServiceException("Music query fail: Music locked put failed");
- }
- }
- else {
- rt = MusicCore.criticalPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, lockId, conditionInfo);
- }
- if (rt.getResult().getResult().toLowerCase().equals("failure")) {
- throw new MDBCServiceException("Music locked put failed");
- }
- }
-}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
index 3d85a27..0f05734 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
@@ -25,8 +25,6 @@ import java.sql.Connection;
import java.util.Properties;
import org.onap.music.logging.EELFLoggerDelegate;
-import org.onap.music.mdbc.DatabasePartition;
-import org.onap.music.mdbc.MusicSqlManager;
/**
* This class is used to construct instances of Mixins that implement either the {@link org.onap.music.mdbc.mixins.DBInterface}
@@ -50,7 +48,7 @@ public class MixinFactory {
* @param info the Properties to use as an argument to the constructor
* @return the newly constructed DBInterface, or null if one cannot be found.
*/
- public static DBInterface createDBInterface(String name, MusicSqlManager msm, String url, Connection conn, Properties info) {
+ public static DBInterface createDBInterface(String name, MusicInterface mi, String url, Connection conn, Properties info) {
for (Class<?> cl : Utils.getClassesImplementing(DBInterface.class)) {
try {
Constructor<?> con = cl.getConstructor();
@@ -59,10 +57,10 @@ public class MixinFactory {
String miname = dbi.getMixinName();
logger.info(EELFLoggerDelegate.applicationLogger,"Checking "+miname);
if (miname.equalsIgnoreCase(name)) {
- con = cl.getConstructor(MusicSqlManager.class, String.class, Connection.class, Properties.class);
+ con = cl.getConstructor(MusicInterface.class, String.class, Connection.class, Properties.class);
if (con != null) {
logger.info(EELFLoggerDelegate.applicationLogger,"Found match: "+miname);
- return (DBInterface) con.newInstance(msm, url, conn, info);
+ return (DBInterface) con.newInstance(mi, url, conn, info);
}
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java
index 0732dc8..8181159 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java
@@ -44,21 +44,21 @@ import com.datastax.driver.core.Row;
/**
* This class provides the methods that MDBC needs to access Cassandra directly in order to provide persistence
* to calls to the user's DB. It stores dirty row references in one table (called DIRTY____) rather than one dirty
- * table per real table (as {@link org.onap.music.mdbc.mixins.CassandraMixin} does).
+ * table per real table (as {@link org.onap.music.mdbc.mixins.MusicMixin} does).
*
* @author Robert P. Eby
*/
-public class Cassandra2Mixin extends CassandraMixin {
+public class Music2Mixin extends MusicMixin {
private static final String DIRTY_TABLE = "DIRTY____"; // it seems Cassandra won't allow __DIRTY__
private boolean dirty_table_created = false;
- private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Cassandra2Mixin.class);
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Music2Mixin.class);
- public Cassandra2Mixin() {
+ public Music2Mixin() {
super();
}
- public Cassandra2Mixin(String url, Properties info) throws MDBCServiceException {
+ public Music2Mixin(String url, Properties info) throws MDBCServiceException {
super(url, info);
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
index 52b3036..5b10a73 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
@@ -162,14 +162,13 @@ public interface MusicInterface {
/**
* Commits the corresponding REDO-log into MUSIC
*
- * @param dbi, the database interface use in the local SQL cache, where the music interface is being used
* @param partition
* @param transactionDigest digest of the transaction that is being committed into the Redo log in music. It has to be a HashMap, because it is required to be serializable
* @param txId id associated with the log being send
* @param progressKeeper data structure that is used to handle to detect failures, and know what to do
* @throws MDBCServiceException
*/
- void commitLog(DBInterface dbi, DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException;
+ void commitLog(DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException;
MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index a7ea680..258ea4f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -20,211 +20,1488 @@
package org.onap.music.mdbc.mixins;
import java.io.IOException;
-import java.io.InputStream;
-import java.util.*;
+import java.io.Reader;
+import java.nio.ByteBuffer;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
-import org.onap.music.mdbc.LockId;
-import org.json.JSONObject;
-import org.onap.music.exceptions.MusicLockingException;
-
-import org.onap.music.exceptions.MDBCServiceException;
-import org.onap.music.mdbc.DatabasePartition;
-import org.onap.music.mdbc.Range;
-import org.onap.music.mdbc.TableInfo;
+import org.onap.music.mdbc.*;
import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigest;
import org.onap.music.mdbc.tables.TxCommitProgress;
+import org.json.JSONObject;
+import org.onap.music.datastore.CassaLockStore;
+import org.onap.music.datastore.PreparedQueryObject;
+import org.onap.music.exceptions.MusicLockingException;
+import org.onap.music.exceptions.MusicQueryException;
+import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.main.MusicCore;
+import org.onap.music.main.MusicCore.Condition;
+import org.onap.music.main.ResultType;
+import org.onap.music.main.ReturnType;
-/**
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.logging.EELFLoggerDelegate;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TupleValue;
+/**
+ * This class provides the methods that MDBC needs to access Cassandra directly in order to provide persistence
+ * to calls to the user's DB. It does not do any table or row locking.
+ *
+ * <p>This code only supports the following limited list of H2 and Cassandra data types:</p>
+ * <table summary="">
+ * <tr><th>H2 Data Type</th><th>Mapped to Cassandra Data Type</th></tr>
+ * <tr><td>BIGINT</td><td>BIGINT</td></tr>
+ * <tr><td>BOOLEAN</td><td>BOOLEAN</td></tr>
+ * <tr><td>CLOB</td><td>BLOB</td></tr>
+ * <tr><td>DOUBLE</td><td>DOUBLE</td></tr>
+ * <tr><td>INTEGER</td><td>INT</td></tr>
+ * <tr><td>TIMESTAMP</td><td>TIMESTAMP</td></tr>
+ * <tr><td>VARBINARY</td><td>BLOB</td></tr>
+ * <tr><td>VARCHAR</td><td>VARCHAR</td></tr>
+ * </table>
*
+ * @author Robert P. Eby
*/
public class MusicMixin implements MusicInterface {
+ /** The property name to use to identify this replica to MusicSqlManager */
+ public static final String KEY_MY_ID = "myid";
+ /** The property name to use for the comma-separated list of replica IDs. */
+ public static final String KEY_REPLICAS = "replica_ids";
+ /** The property name to use to identify the IP address for Cassandra. */
+ public static final String KEY_MUSIC_ADDRESS = "cassandra.host";
+ /** The property name to use to provide the replication factor for Cassandra. */
+ public static final String KEY_MUSIC_RFACTOR = "music_rfactor";
+ /** The property name to use to provide the replication factor for Cassandra. */
+ public static final String KEY_MUSIC_NAMESPACE = "music_namespace";
+ /** Namespace for the tables in MUSIC (Cassandra) */
+ public static final String DEFAULT_MUSIC_NAMESPACE = "namespace";
+ /** The default property value to use for the Cassandra IP address. */
+ public static final String DEFAULT_MUSIC_ADDRESS = "localhost";
+ /** The default property value to use for the Cassandra replication factor. */
+ public static final int DEFAULT_MUSIC_RFACTOR = 1;
+ /** The default primary string column, if none is provided. */
+ public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid";
+ /** Type of the primary key, if none is defined by the user */
+ public static final String MDBC_PRIMARYKEY_TYPE = "uuid";
+
+
+ //\TODO Add logic to change the names when required and create the tables when necessary
+ private String musicTxDigestTableName = "musictxdigest";
+ private String musicRangeInformationTableName = "musicrangeinformation";
- public static Map<Integer, Set<String>> currentLockMap = new HashMap<>();
- public static List<String> criticalTables = new ArrayList<>();
-
- @Override
- public String getMixinName() {
- //
- return null;
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicMixin.class);
+
+ private static final Map<Integer, String> typemap = new HashMap<>();
+ static {
+ // We only support the following type mappings currently (from DB -> Cassandra).
+ // Anything else will likely cause a NullPointerException
+ typemap.put(Types.BIGINT, "BIGINT"); // aka. IDENTITY
+ typemap.put(Types.BLOB, "VARCHAR");
+ typemap.put(Types.BOOLEAN, "BOOLEAN");
+ typemap.put(Types.CLOB, "BLOB");
+ typemap.put(Types.DATE, "VARCHAR");
+ typemap.put(Types.DOUBLE, "DOUBLE");
+ typemap.put(Types.DECIMAL, "DECIMAL");
+ typemap.put(Types.INTEGER, "INT");
+ //typemap.put(Types.TIMESTAMP, "TIMESTAMP");
+ typemap.put(Types.SMALLINT, "SMALLINT");
+ typemap.put(Types.TIMESTAMP, "VARCHAR");
+ typemap.put(Types.VARBINARY, "BLOB");
+ typemap.put(Types.VARCHAR, "VARCHAR");
+ typemap.put(Types.CHAR, "VARCHAR");
+ //The "Hacks", these don't have a direct mapping
+ //typemap.put(Types.DATE, "VARCHAR");
+ //typemap.put(Types.DATE, "TIMESTAMP");
}
- @Override
- public String getMusicDefaultPrimaryKeyName() {
- //
- return null;
- }
+ protected final String music_ns;
+ protected final String myId;
+ protected final String[] allReplicaIds;
+ private final String musicAddress;
+ private final int music_rfactor;
+ private MusicConnector mCon = null;
+ private Session musicSession = null;
+ private boolean keyspace_created = false;
+ private Map<String, PreparedStatement> ps_cache = new HashMap<>();
+ private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>());
+ public MusicMixin() {
+ //this.logger = null;
+ this.musicAddress = null;
+ this.music_ns = null;
+ this.music_rfactor = 0;
+ this.myId = null;
+ this.allReplicaIds = null;
+ }
+
+ public MusicMixin(String url, Properties info) throws MDBCServiceException {
+ // Default values -- should be overridden in the Properties
+ // Default to using the host_ids of the various peers as the replica IDs (this is probably preferred)
+ this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS);
+ logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: musicAddress="+musicAddress);
+
+ String s = info.getProperty(KEY_MUSIC_RFACTOR);
+ this.music_rfactor = (s == null) ? DEFAULT_MUSIC_RFACTOR : Integer.parseInt(s);
+
+ this.myId = info.getProperty(KEY_MY_ID, getMyHostId());
+ logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: myId="+myId);
+
+
+ this.allReplicaIds = info.getProperty(KEY_REPLICAS, getAllHostIds()).split(",");
+ logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: allReplicaIds="+info.getProperty(KEY_REPLICAS, this.myId));
+
+ this.music_ns = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE);
+ logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns);
+ createKeyspace();
+ }
+
+ /**
+ * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables.
+ * The keyspace name comes from the initialization properties passed to the JDBC driver.
+ */
@Override
- public String generateUniqueKey() {
- //
- return null;
+ public void createKeyspace() throws MDBCServiceException {
+
+ Map<String,Object> replicationInfo = new HashMap<>();
+ replicationInfo.put("'class'", "'SimpleStrategy'");
+ replicationInfo.put("'replication_factor'", music_rfactor);
+
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ queryObject.appendQueryString(
+ "CREATE KEYSPACE IF NOT EXISTS " + this.music_ns +
+ " WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":"));
+
+ try {
+ MusicCore.nonKeyRelatedPut(queryObject, "eventual");
+ } catch (MusicServiceException e) {
+ if (!e.getMessage().equals("Keyspace "+music_ns+" already exists")) {
+ throw new MDBCServiceException("Error creating namespace: "+music_ns+". Internal error:"+e.getErrorMessage());
+ }
+ }
+ }
+
+ private String getMyHostId() {
+ ResultSet rs = executeMusicRead("SELECT HOST_ID FROM SYSTEM.LOCAL");
+ Row row = rs.one();
+ return (row == null) ? "UNKNOWN" : row.getUUID("HOST_ID").toString();
+ }
+ private String getAllHostIds() {
+ ResultSet results = executeMusicRead("SELECT HOST_ID FROM SYSTEM.PEERS");
+ StringBuilder sb = new StringBuilder(myId);
+ for (Row row : results) {
+ sb.append(",");
+ sb.append(row.getUUID("HOST_ID").toString());
+ }
+ return sb.toString();
}
+ /**
+ * Get the name of this MusicInterface mixin object.
+ * @return the name
+ */
@Override
- public String getMusicKeyFromRow(TableInfo ti, String table, JSONObject dbRow) {
- //
- return null;
+ public String getMixinName() {
+ return "cassandra";
}
-
+ /**
+ * Do what is needed to close down the MUSIC connection.
+ */
@Override
public void close() {
- //
-
+ if (musicSession != null) {
+ musicSession.close();
+ musicSession = null;
+ }
}
-
@Override
- public void createKeyspace() {
- //
-
+ public void initializeMetricDataStructures() throws MDBCServiceException {
+ try {
+ createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number
+ createMusicRangeInformationTable();
+ }
+ catch(MDBCServiceException e){
+ logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC");
+ }
}
+ /**
+ * This method performs all necessary initialization in Music/Cassandra to store the table <i>tableName</i>.
+ * @param tableName the table to initialize MUSIC for
+ */
@Override
public void initializeMusicForTable(TableInfo ti, String tableName) {
- //
-
+ /**
+ * This code creates two tables for every table in SQL:
+ * (i) a table with the exact same name as the SQL table storing the SQL data.
+ * (ii) a "dirty bits" table that stores the keys in the Cassandra table that are yet to be
+ * updated in the SQL table (they were written by some other node).
+ */
+ StringBuilder fields = new StringBuilder();
+ StringBuilder prikey = new StringBuilder();
+ String pfx = "", pfx2 = "";
+ for (int i = 0; i < ti.columns.size(); i++) {
+ fields.append(pfx)
+ .append(ti.columns.get(i))
+ .append(" ")
+ .append(typemap.get(ti.coltype.get(i)));
+ if (ti.iskey.get(i)) {
+ // Primary key column
+ prikey.append(pfx2).append(ti.columns.get(i));
+ pfx2 = ", ";
+ }
+ pfx = ", ";
+ }
+ if (prikey.length()==0) {
+ fields.append(pfx).append(MDBC_PRIMARYKEY_NAME)
+ .append(" ")
+ .append(MDBC_PRIMARYKEY_TYPE);
+ prikey.append(MDBC_PRIMARYKEY_NAME);
+ }
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", music_ns, tableName, fields.toString(), prikey.toString());
+ executeMusicWriteQuery(cql);
}
+ // **************************************************
+ // Dirty Tables (in MUSIC) methods
+ // **************************************************
+
+ /**
+ * Create a <i>dirty row</i> table for the real table <i>tableName</i>. The primary keys columns from the real table are recreated in
+ * the dirty table, along with a "REPLICA__" column that names the replica that should update it's internal state from MUSIC.
+ * @param tableName the table to create a "dirty" table for
+ */
@Override
public void createDirtyRowTable(TableInfo ti, String tableName) {
- //
-
+ // create dirtybitsTable at all replicas
+// for (String repl : allReplicaIds) {
+//// String dirtyRowsTableName = "dirty_"+tableName+"_"+allReplicaIds[i];
+//// String dirtyTableQuery = "CREATE TABLE IF NOT EXISTS "+music_ns+"."+ dirtyRowsTableName+" (dirtyRowKeys text PRIMARY KEY);";
+// cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s_%s (dirtyRowKeys TEXT PRIMARY KEY);", music_ns, tableName, repl);
+// executeMusicWriteQuery(cql);
+// }
+ StringBuilder ddl = new StringBuilder("REPLICA__ TEXT");
+ StringBuilder cols = new StringBuilder("REPLICA__");
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ // Only use the primary keys columns in the "Dirty" table
+ ddl.append(", ")
+ .append(ti.columns.get(i))
+ .append(" ")
+ .append(typemap.get(ti.coltype.get(i)));
+ cols.append(", ").append(ti.columns.get(i));
+ }
+ }
+ if(cols.length()==0) {
+ //fixme
+ System.err.println("Create dirty row table found no primary key");
+ }
+ ddl.append(", PRIMARY KEY(").append(cols).append(")");
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s (%s);", music_ns, tableName, ddl.toString());
+ executeMusicWriteQuery(cql);
}
-
+ /**
+ * Drop the dirty row table for <i>tableName</i> from MUSIC.
+ * @param tableName the table being dropped
+ */
@Override
public void dropDirtyRowTable(String tableName) {
- //
-
+ String cql = String.format("DROP TABLE %s.DIRTY_%s;", music_ns, tableName);
+ executeMusicWriteQuery(cql);
}
-
- @Override
- public void clearMusicForTable(String tableName) {
- //
-
- }
-
+ /**
+ * Mark rows as "dirty" in the dirty rows table for <i>tableName</i>. Rows are marked for all replicas but
+ * this one (this replica already has the up to date data).
+ * @param tableName the table we are marking dirty
+ * @param keys an ordered list of the values being put into the table. The values that correspond to the tables'
+ * primary key are copied into the dirty row table.
+ */
@Override
public void markDirtyRow(TableInfo ti, String tableName, JSONObject keys) {
- //
-
+ Object[] keyObj = getObjects(ti,tableName, keys);
+ StringBuilder cols = new StringBuilder("REPLICA__");
+ PreparedQueryObject pQueryObject = null;
+ StringBuilder vals = new StringBuilder("?");
+ List<Object> vallist = new ArrayList<Object>();
+ vallist.add(""); // placeholder for replica
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ cols.append(", ").append(ti.columns.get(i));
+ vals.append(", ").append("?");
+ vallist.add(keyObj[i]);
+ }
+ }
+ if(cols.length()==0) {
+ //FIXME
+ System.err.println("markDIrtyRow need to fix primary key");
+ }
+ String cql = String.format("INSERT INTO %s.DIRTY_%s (%s) VALUES (%s);", music_ns, tableName, cols.toString(), vals.toString());
+ /*Session sess = getMusicSession();
+ PreparedStatement ps = getPreparedStatementFromCache(cql);*/
+ String primaryKey;
+ if(ti.hasKey()) {
+ primaryKey = getMusicKeyFromRow(ti,tableName, keys);
+ }
+ else {
+ primaryKey = getMusicKeyFromRowWithoutPrimaryIndexes(ti,tableName, keys);
+ }
+ System.out.println("markDirtyRow: PK value: "+primaryKey);
+
+ Object pkObj = null;
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ pkObj = keyObj[i];
+ }
+ }
+ for (String repl : allReplicaIds) {
+ pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ pQueryObject.addValue(tableName);
+ pQueryObject.addValue(repl);
+ pQueryObject.addValue(pkObj);
+ updateMusicDB(tableName, primaryKey, pQueryObject);
+ //if (!repl.equals(myId)) {
+ /*logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
+ vallist.set(0, repl);
+ BoundStatement bound = ps.bind(vallist.toArray());
+ bound.setReadTimeoutMillis(60000);
+ synchronized (sess) {
+ sess.execute(bound);
+ }*/
+ //}
+
+ }
}
-
+ /**
+ * Remove the entries from the dirty row (for this replica) that correspond to a set of primary keys
+ * @param tableName the table we are removing dirty entries from
+ * @param keys the primary key values to use in the DELETE. Note: this is *only* the primary keys, not a full table row.
+ */
@Override
public void cleanDirtyRow(TableInfo ti, String tableName, JSONObject keys) {
- //
-
+ Object[] keysObjects = getObjects(ti,tableName,keys);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ StringBuilder cols = new StringBuilder("REPLICA__=?");
+ List<Object> vallist = new ArrayList<Object>();
+ vallist.add(myId);
+ int n = 0;
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ cols.append(" AND ").append(ti.columns.get(i)).append("=?");
+ vallist.add(keysObjects[n++]);
+ pQueryObject.addValue(keysObjects[n++]);
+ }
+ }
+ String cql = String.format("DELETE FROM %s.DIRTY_%s WHERE %s;", music_ns, tableName, cols.toString());
+ logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
+ pQueryObject.appendQueryString(cql);
+ ReturnType rt = MusicCore.eventualPut(pQueryObject);
+ if(rt.getResult().getResult().toLowerCase().equals("failure")) {
+ System.out.println("Failure while cleanDirtyRow..."+rt.getMessage());
+ }
+ /*Session sess = getMusicSession();
+ PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(vallist.toArray());
+ bound.setReadTimeoutMillis(60000);
+ synchronized (sess) {
+ sess.execute(bound);
+ }*/
}
-
+ /**
+ * Get a list of "dirty rows" for a table. The dirty rows returned apply only to this replica,
+ * and consist of a Map of primary key column names and values.
+ * @param tableName the table we are querying for
+ * @return a list of maps; each list item is a map of the primary key names and values for that "dirty row".
+ */
@Override
- public List<Map<String, Object>> getDirtyRows(TableInfo ti, String tableName) {
- //
- return null;
+ public List<Map<String,Object>> getDirtyRows(TableInfo ti, String tableName) {
+ String cql = String.format("SELECT * FROM %s.DIRTY_%s WHERE REPLICA__=?;", music_ns, tableName);
+ ResultSet results = null;
+ logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
+
+ /*Session sess = getMusicSession();
+ PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(new Object[] { myId });
+ bound.setReadTimeoutMillis(60000);
+ synchronized (sess) {
+ results = sess.execute(bound);
+ }*/
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ try {
+ results = MusicCore.get(pQueryObject);
+ } catch (MusicServiceException e) {
+
+ e.printStackTrace();
+ }
+
+ ColumnDefinitions cdef = results.getColumnDefinitions();
+ List<Map<String,Object>> list = new ArrayList<Map<String,Object>>();
+ for (Row row : results) {
+ Map<String,Object> objs = new HashMap<String,Object>();
+ for (int i = 0; i < cdef.size(); i++) {
+ String colname = cdef.getName(i).toUpperCase();
+ String coltype = cdef.getType(i).getName().toString().toUpperCase();
+ if (!colname.equals("REPLICA__")) {
+ switch (coltype) {
+ case "BIGINT":
+ objs.put(colname, row.getLong(colname));
+ break;
+ case "BOOLEAN":
+ objs.put(colname, row.getBool(colname));
+ break;
+ case "BLOB":
+ objs.put(colname, row.getString(colname));
+ break;
+ case "DATE":
+ objs.put(colname, row.getString(colname));
+ break;
+ case "DOUBLE":
+ objs.put(colname, row.getDouble(colname));
+ break;
+ case "DECIMAL":
+ objs.put(colname, row.getDecimal(colname));
+ break;
+ case "INT":
+ objs.put(colname, row.getInt(colname));
+ break;
+ case "TIMESTAMP":
+ objs.put(colname, row.getTimestamp(colname));
+ break;
+ case "VARCHAR":
+ default:
+ objs.put(colname, row.getString(colname));
+ break;
+ }
+ }
+ }
+ list.add(objs);
+ }
+ return list;
}
+ /**
+ * Drops the named table and its dirty row table (for all replicas) from MUSIC. The dirty row table is dropped first.
+ * @param tableName This is the table that has been dropped
+ */
+ @Override
+ public void clearMusicForTable(String tableName) {
+ dropDirtyRowTable(tableName);
+ String cql = String.format("DROP TABLE %s.%s;", music_ns, tableName);
+ executeMusicWriteQuery(cql);
+ }
+ /**
+ * This function is called whenever there is a DELETE to a row on a local SQL table, wherein it updates the
+ * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. MUSIC propagates
+ * it to the other replicas.
+ *
+ * @param tableName This is the table that has changed.
+ * @param oldRow This is a copy of the old row being deleted
+ */
@Override
public void deleteFromEntityTableInMusic(TableInfo ti, String tableName, JSONObject oldRow) {
- //
+ Object[] objects = getObjects(ti,tableName,oldRow);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ if (ti.hasKey()) {
+ assert(ti.columns.size() == objects.length);
+ } else {
+ assert(ti.columns.size()+1 == objects.length);
+ }
+
+ StringBuilder where = new StringBuilder();
+ List<Object> vallist = new ArrayList<Object>();
+ String pfx = "";
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ where.append(pfx)
+ .append(ti.columns.get(i))
+ .append("=?");
+ vallist.add(objects[i]);
+ pQueryObject.addValue(objects[i]);
+ pfx = " AND ";
+ }
+ }
+ if (!ti.hasKey()) {
+ where.append(MDBC_PRIMARYKEY_NAME + "=?");
+ //\FIXME this is wrong, old row is not going to contain the UUID, this needs to be fixed
+ vallist.add(UUID.fromString((String) objects[0]));
+ pQueryObject.addValue(UUID.fromString((String) objects[0]));
+ }
+
+ String cql = String.format("DELETE FROM %s.%s WHERE %s;", music_ns, tableName, where.toString());
+ logger.error(EELFLoggerDelegate.errorLogger,"Executing MUSIC write:"+ cql);
+ pQueryObject.appendQueryString(cql);
+
+ /*PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(vallist.toArray());
+ bound.setReadTimeoutMillis(60000);
+ Session sess = getMusicSession();
+ synchronized (sess) {
+ sess.execute(bound);
+ }*/
+ String primaryKey = getMusicKeyFromRow(ti,tableName, oldRow);
+ updateMusicDB(tableName, primaryKey, pQueryObject);
+
+ // Mark the dirty rows in music for all the replicas but us
+ markDirtyRow(ti,tableName, oldRow);
}
+ public Set<String> getMusicTableSet(String ns) {
+ Set<String> set = new TreeSet<String>();
+ String cql = String.format("SELECT TABLE_NAME FROM SYSTEM_SCHEMA.TABLES WHERE KEYSPACE_NAME = '%s'", ns);
+ ResultSet rs = executeMusicRead(cql);
+ for (Row row : rs) {
+ set.add(row.getString("TABLE_NAME").toUpperCase());
+ }
+ return set;
+ }
+ /**
+ * This method is called whenever there is a SELECT on a local SQL table, wherein it first checks the local
+ * dirty bits table to see if there are any keys in Cassandra whose value has not yet been sent to SQL
+ * @param tableName This is the table on which the select is being performed
+ */
@Override
public void readDirtyRowsAndUpdateDb(DBInterface dbi, String tableName) {
- //
+ // Read dirty rows of this table from Music
+ TableInfo ti = dbi.getTableInfo(tableName);
+ List<Map<String,Object>> objlist = getDirtyRows(ti,tableName);
+ PreparedQueryObject pQueryObject = null;
+ String pre_cql = String.format("SELECT * FROM %s.%s WHERE ", music_ns, tableName);
+ List<Object> vallist = new ArrayList<Object>();
+ StringBuilder sb = new StringBuilder();
+ //\TODO Perform a batch operation instead of each row at a time
+ for (Map<String,Object> map : objlist) {
+ pQueryObject = new PreparedQueryObject();
+ sb.setLength(0);
+ vallist.clear();
+ String pfx = "";
+ for (String key : map.keySet()) {
+ sb.append(pfx).append(key).append("=?");
+ vallist.add(map.get(key));
+ pQueryObject.addValue(map.get(key));
+ pfx = " AND ";
+ }
+
+ String cql = pre_cql + sb.toString();
+ System.out.println("readDirtyRowsAndUpdateDb: cql: "+cql);
+ pQueryObject.appendQueryString(cql);
+ ResultSet dirtyRows = null;
+ try {
+ //\TODO Why is this an eventual put?, this should be an atomic
+ dirtyRows = MusicCore.get(pQueryObject);
+ } catch (MusicServiceException e) {
+
+ e.printStackTrace();
+ }
+ /*
+ Session sess = getMusicSession();
+ PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(vallist.toArray());
+ bound.setReadTimeoutMillis(60000);
+ ResultSet dirtyRows = null;
+ synchronized (sess) {
+ dirtyRows = sess.execute(bound);
+ }*/
+ List<Row> rows = dirtyRows.all();
+ if (rows.isEmpty()) {
+ // No rows, the row must have been deleted
+ deleteRowFromSqlDb(dbi,tableName, map);
+ } else {
+ for (Row row : rows) {
+ writeMusicRowToSQLDb(dbi,tableName, row);
+ }
+ }
+ }
+ }
+
+ private void deleteRowFromSqlDb(DBInterface dbi, String tableName, Map<String, Object> map) {
+ dbi.deleteRowFromSqlDb(tableName, map);
+ TableInfo ti = dbi.getTableInfo(tableName);
+ List<Object> vallist = new ArrayList<Object>();
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ String col = ti.columns.get(i);
+ Object val = map.get(col);
+ vallist.add(val);
+ }
+ }
+ cleanDirtyRow(ti, tableName, new JSONObject(vallist));
+ }
+ /**
+ * This functions copies the contents of a row in Music into the corresponding row in the SQL table
+ * @param tableName This is the name of the table in both Music and swl
+ * @param musicRow This is the row in Music that is being copied into SQL
+ */
+ private void writeMusicRowToSQLDb(DBInterface dbi, String tableName, Row musicRow) {
+ // First construct the map of columns and their values
+ TableInfo ti = dbi.getTableInfo(tableName);
+ Map<String, Object> map = new HashMap<String, Object>();
+ List<Object> vallist = new ArrayList<Object>();
+ String rowid = tableName;
+ for (String col : ti.columns) {
+ Object val = getValue(musicRow, col);
+ map.put(col, val);
+ if (ti.iskey(col)) {
+ vallist.add(val);
+ rowid += "_" + val.toString();
+ }
+ }
+
+ logger.debug("Blocking rowid: "+rowid);
+ in_progress.add(rowid); // Block propagation of the following INSERT/UPDATE
+
+ dbi.insertRowIntoSqlDb(tableName, map);
+
+ logger.debug("Unblocking rowid: "+rowid);
+ in_progress.remove(rowid); // Unblock propagation
+
+// try {
+// String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString());
+// executeSQLWrite(sql);
+// } catch (SQLException e) {
+// logger.debug("Insert failed because row exists, do an update");
+// // TODO - rewrite this UPDATE command should not update key fields
+// String sql = String.format("UPDATE %s SET (%s) = (%s) WHERE %s", tableName, fields.toString(), values.toString(), where.toString());
+// try {
+// executeSQLWrite(sql);
+// } catch (SQLException e1) {
+// e1.printStackTrace();
+// }
+// }
+
+ ti = dbi.getTableInfo(tableName);
+ cleanDirtyRow(ti, tableName, new JSONObject(vallist));
+
+// String selectQuery = "select "+ primaryKeyName+" FROM "+tableName+" WHERE "+primaryKeyName+"="+primaryKeyValue+";";
+// java.sql.ResultSet rs = executeSQLRead(selectQuery);
+// String dbWriteQuery=null;
+// try {
+// if(rs.next()){//this entry is there, do an update
+// dbWriteQuery = "UPDATE "+tableName+" SET "+columnNameString+" = "+ valueString +"WHERE "+primaryKeyName+"="+primaryKeyValue+";";
+// }else
+// dbWriteQuery = "INSERT INTO "+tableName+" VALUES"+valueString+";";
+// executeSQLWrite(dbWriteQuery);
+// } catch (SQLException e) {
+// // ZZTODO Auto-generated catch block
+// e.printStackTrace();
+// }
+ //clean the music dirty bits table
+// String dirtyRowIdsTableName = music_ns+".DIRTY_"+tableName+"_"+myId;
+// String deleteQuery = "DELETE FROM "+dirtyRowIdsTableName+" WHERE dirtyRowKeys=$$"+primaryKeyValue+"$$;";
+// executeMusicWriteQuery(deleteQuery);
+ }
+ private Object getValue(Row musicRow, String colname) {
+ ColumnDefinitions cdef = musicRow.getColumnDefinitions();
+ DataType colType;
+ try {
+ colType= cdef.getType(colname);
+ }
+ catch(IllegalArgumentException e) {
+ logger.warn("Colname is not part of table metadata: "+e);
+ throw e;
+ }
+ String typeStr = colType.getName().toString().toUpperCase();
+ switch (typeStr) {
+ case "BIGINT":
+ return musicRow.getLong(colname);
+ case "BOOLEAN":
+ return musicRow.getBool(colname);
+ case "BLOB":
+ return musicRow.getString(colname);
+ case "DATE":
+ return musicRow.getString(colname);
+ case "DECIMAL":
+ return musicRow.getDecimal(colname);
+ case "DOUBLE":
+ return musicRow.getDouble(colname);
+ case "SMALLINT":
+ case "INT":
+ return musicRow.getInt(colname);
+ case "TIMESTAMP":
+ return musicRow.getTimestamp(colname);
+ case "UUID":
+ return musicRow.getUUID(colname);
+ default:
+ logger.error(EELFLoggerDelegate.errorLogger, "UNEXPECTED COLUMN TYPE: columname="+colname+", columntype="+typeStr);
+ // fall thru
+ case "VARCHAR":
+ return musicRow.getString(colname);
+ }
}
+ /**
+ * This method is called whenever there is an INSERT or UPDATE to a local SQL table, wherein it updates the
+ * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. Music propagates
+ * it to the other replicas.
+ *
+ * @param tableName This is the table that has changed.
+ * @param changedRow This is information about the row that has changed
+ */
@Override
public void updateDirtyRowAndEntityTableInMusic(TableInfo ti, String tableName, JSONObject changedRow) {
- updateDirtyRowAndEntityTableInMusic(tableName, changedRow, false);
+ // Build the CQL command
+ Object[] objects = getObjects(ti,tableName,changedRow);
+ StringBuilder fields = new StringBuilder();
+ StringBuilder values = new StringBuilder();
+ String rowid = tableName;
+ Object[] newrow = new Object[objects.length];
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ String pfx = "";
+ int keyoffset=0;
+ for (int i = 0; i < objects.length; i++) {
+ if (!ti.hasKey() && i==0) {
+ //We need to tack on cassandra's uid in place of a primary key
+ fields.append(MDBC_PRIMARYKEY_NAME);
+ values.append("?");
+ newrow[i] = UUID.fromString((String) objects[i]);
+ pQueryObject.addValue(newrow[i]);
+ keyoffset=-1;
+ pfx = ", ";
+ continue;
+ }
+ fields.append(pfx).append(ti.columns.get(i+keyoffset));
+ values.append(pfx).append("?");
+ pfx = ", ";
+ if (objects[i] instanceof byte[]) {
+ // Cassandra doesn't seem to have a Codec to translate a byte[] to a ByteBuffer
+ newrow[i] = ByteBuffer.wrap((byte[]) objects[i]);
+ pQueryObject.addValue(newrow[i]);
+ } else if (objects[i] instanceof Reader) {
+ // Cassandra doesn't seem to have a Codec to translate a Reader to a ByteBuffer either...
+ newrow[i] = ByteBuffer.wrap(readBytesFromReader((Reader) objects[i]));
+ pQueryObject.addValue(newrow[i]);
+ } else {
+ newrow[i] = objects[i];
+ pQueryObject.addValue(newrow[i]);
+ }
+ if (i+keyoffset>=0 && ti.iskey.get(i+keyoffset)) {
+ rowid += "_" + newrow[i].toString();
+ }
+ }
+ if (in_progress.contains(rowid)) {
+ // This call to updateDirtyRowAndEntityTableInMusic() was called as a result of a Cassandra -> H2 update; ignore
+ logger.debug(EELFLoggerDelegate.applicationLogger, "updateDirtyRowAndEntityTableInMusic: bypassing MUSIC update on "+rowid);
+
+ } else {
+ // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update
+ String cql = String.format("INSERT INTO %s.%s (%s) VALUES (%s);", music_ns, tableName, fields.toString(), values.toString());
+
+ pQueryObject.appendQueryString(cql);
+ String primaryKey = getMusicKeyFromRow(ti,tableName, changedRow);
+ updateMusicDB(tableName, primaryKey, pQueryObject);
+
+ /*PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(newrow);
+ bound.setReadTimeoutMillis(60000);
+ Session sess = getMusicSession();
+ synchronized (sess) {
+ sess.execute(bound);
+ }*/
+ // Mark the dirty rows in music for all the replicas but us
+ markDirtyRow(ti,tableName, changedRow);
+ }
}
+
- public void updateDirtyRowAndEntityTableInMusic(String tableName, JSONObject changedRow, boolean isCritical) {
- }
- public static void releaseZKLocks(Set<LockId> lockIds) {
- for (LockId lockId : lockIds) {
- System.out.println("Releasing lock: " + lockId);
- try {
- MusicCore.voluntaryReleaseLock(lockId.getFullyQualifiedLockKey(), lockId.getLockReference());
- MusicCore.destroyLockRef(lockId.getFullyQualifiedLockKey(), lockId.getLockReference());
- } catch (MusicLockingException e) {
- e.printStackTrace();
+ private byte[] readBytesFromReader(Reader rdr) {
+ StringBuilder sb = new StringBuilder();
+ try {
+ int ch;
+ while ((ch = rdr.read()) >= 0) {
+ sb.append((char)ch);
}
+ } catch (IOException e) {
+ logger.warn("readBytesFromReader: "+e);
}
+ return sb.toString().getBytes();
}
- @Override
- public String getMusicKeyFromRowWithoutPrimaryIndexes(TableInfo ti, String tableName, JSONObject changedRow) {
- //
- return null;
+ protected PreparedStatement getPreparedStatementFromCache(String cql) {
+ // Note: have to hope that the Session never changes!
+ if (!ps_cache.containsKey(cql)) {
+ Session sess = getMusicSession();
+ PreparedStatement ps = sess.prepare(cql);
+ ps_cache.put(cql, ps);
+ }
+ return ps_cache.get(cql);
}
- @Override
- public void initializeMetricDataStructures() {
- //
+ /**
+ * This method gets a connection to Music
+ * @return the Cassandra Session to use
+ */
+ protected Session getMusicSession() {
+ // create cassandra session
+ if (musicSession == null) {
+ logger.info(EELFLoggerDelegate.applicationLogger, "Creating New Music Session");
+ mCon = new MusicConnector(musicAddress);
+ musicSession = mCon.getSession();
+ }
+ return musicSession;
+ }
+
+ /**
+ * This method executes a write query in Music
+ * @param cql the CQL to be sent to Cassandra
+ */
+ protected void executeMusicWriteQuery(String cql) {
+ logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ ReturnType rt = MusicCore.eventualPut(pQueryObject);
+ if(rt.getResult().getResult().toLowerCase().equals("failure")) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage());
+ }
+ /*Session sess = getMusicSession();
+ SimpleStatement s = new SimpleStatement(cql);
+ s.setReadTimeoutMillis(60000);
+ synchronized (sess) {
+ sess.execute(s);
+ }*/
+ }
+
+ /**
+ * This method executes a read query in Music
+ * @param cql the CQL to be sent to Cassandra
+ * @return a ResultSet containing the rows returned from the query
+ */
+ protected ResultSet executeMusicRead(String cql) {
+ logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ ResultSet results = null;
+ try {
+ results = MusicCore.get(pQueryObject);
+ } catch (MusicServiceException e) {
+
+ e.printStackTrace();
+ }
+ return results;
+ /*Session sess = getMusicSession();
+ synchronized (sess) {
+ return sess.execute(cql);
+ }*/
+ }
+ /**
+ * Returns the default primary key name that this mixin uses
+ */
+ public String getMusicDefaultPrimaryKeyName() {
+ return MDBC_PRIMARYKEY_NAME;
}
+ /**
+ * Return the function for cassandra's primary key generation
+ */
+ public String generateUniqueKey() {
+ return MDBCUtils.generateUniqueKey().toString();
+ }
+
@Override
- public Object[] getObjects(TableInfo ti, String tableName, JSONObject row) {
+ public String getMusicKeyFromRowWithoutPrimaryIndexes(TableInfo ti, String table, JSONObject dbRow) {
+ //\TODO this operation is super expensive to perform, both latency and BW
+ // it is better to add additional where clauses, and have the primary key
+ // to be composed of known columns of the table
+ // Adding this primary indexes would be an additional burden to the developers, which spanner
+ // also does, but otherwise performance is really bad
+ // At least it should have a set of columns that are guaranteed to be unique
+ StringBuilder cqlOperation = new StringBuilder();
+ cqlOperation.append("SELECT * FROM ")
+ .append(music_ns)
+ .append(".")
+ .append(table);
+ ResultSet musicResults = executeMusicRead(cqlOperation.toString());
+ Object[] dbRowObjects = getObjects(ti,table,dbRow);
+ while (!musicResults.isExhausted()) {
+ Row musicRow = musicResults.one();
+ if (rowIs(ti, musicRow, dbRowObjects)) {
+ return ((UUID)getValue(musicRow, MDBC_PRIMARYKEY_NAME)).toString();
+ }
+ }
+ //should never reach here
return null;
}
+
+ /**
+ * Checks to see if this row is in list of database entries
+ * @param ti
+ * @param musicRow
+ * @param dbRow
+ * @return
+ */
+ private boolean rowIs(TableInfo ti, Row musicRow, Object[] dbRow) {
+ //System.out.println("Comparing " + musicRow.toString());
+ boolean sameRow=true;
+ for (int i=0; i<ti.columns.size(); i++) {
+ Object val = getValue(musicRow, ti.columns.get(i));
+ if (!dbRow[i].equals(val)) {
+ sameRow=false;
+ break;
+ }
+ }
+ return sameRow;
+ }
@Override
- public void commitLog(DBInterface dbi, DatabasePartition partition, HashMap<Range, StagingTable> transactionDigest, String txId, TxCommitProgress progressKeeper)
- throws MDBCServiceException {
- // TODO Auto-generated method stub
+ public String getMusicKeyFromRow(TableInfo ti, String tableName, JSONObject row) {
+ List<String> keyCols = ti.getKeyColumns();
+ if(keyCols.isEmpty()){
+ throw new IllegalArgumentException("Table doesn't have defined primary indexes ");
+ }
+ StringBuilder key = new StringBuilder();
+ String pfx = "";
+ for(String keyCol: keyCols) {
+ key.append(pfx);
+ key.append(row.get(keyCol));
+ pfx = ",";
+ }
+ String keyStr = key.toString();
+ return keyStr;
}
- @Override
- public HashMap<Range, StagingTable> getTxDigest(MusicTxDigestId id) {
- return null;
+ public void updateMusicDB(String tableName, String primaryKey, PreparedQueryObject pQObject) {
+ ReturnType rt = MusicCore.eventualPut(pQObject);
+ if(rt.getResult().getResult().toLowerCase().equals("failure")) {
+ System.out.println("Failure while critical put..."+rt.getMessage());
+ }
}
+ /**
+ * Build a preparedQueryObject that appends a transaction to the mriTable
+ * @param mriTable
+ * @param uuid
+ * @param table
+ * @param redoUuid
+ * @return
+ */
+ private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, String table, UUID redoUuid){
+ PreparedQueryObject query = new PreparedQueryObject();
+ StringBuilder appendBuilder = new StringBuilder();
+ appendBuilder.append("UPDATE ")
+ .append(music_ns)
+ .append(".")
+ .append(mriTable)
+ .append(" SET txredolog = txredolog +[('")
+ .append(table)
+ .append("',")
+ .append(redoUuid)
+ .append(")] WHERE rangeid = ")
+ .append(uuid)
+ .append(";");
+ query.appendQueryString(appendBuilder.toString());
+ return query;
+ }
+
+ protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException {
+ UUID mriIndex = partition.getMusicRangeInformationIndex();
+ String lockId;
+ lockId = MusicCore.createLockReference(fullyQualifiedKey);
+ //\TODO Handle better failures to acquire locks
+ ReturnType lockReturn;
+ try {
+ lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId);
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Lock was not acquire correctly for key "+fullyQualifiedKey);
+ throw new MDBCServiceException("Lock was not acquire correctly for key "+fullyQualifiedKey);
+ } catch (MusicServiceException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Error in music, when locking key: "+fullyQualifiedKey);
+ throw new MDBCServiceException("Error in music, when locking: "+fullyQualifiedKey);
+ } catch (MusicQueryException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Error in executing query music, when locking key: "+fullyQualifiedKey);
+ throw new MDBCServiceException("Error in executing query music, when locking: "+fullyQualifiedKey);
+ }
+ //\TODO this is wrong, we should have a better way to obtain a lock forcefully, clean the queue and obtain the lock
+ if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) {
+ try {
+ MusicCore.forciblyReleaseLock(fullyQualifiedKey,lockId);
+ CassaLockStore lockingServiceHandle = MusicCore.getLockingServiceHandle();
+ CassaLockStore.LockObject lockOwner = lockingServiceHandle.peekLockQueue(music_ns,
+ this.musicRangeInformationTableName, mriIndex.toString());
+ while(lockOwner.lockRef != lockId) {
+ MusicCore.forciblyReleaseLock(fullyQualifiedKey, lockOwner.lockRef);
+ try {
+ lockOwner = lockingServiceHandle.peekLockQueue(music_ns,
+ this.musicRangeInformationTableName, mriIndex.toString());
+ } catch(NullPointerException e){
+ //Ignore null pointer exception
+ lockId = MusicCore.createLockReference(fullyQualifiedKey);
+ break;
+ }
+ }
+ lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId);
+
+ } catch (MusicLockingException e) {
+ throw new MDBCServiceException("Could not lock the corresponding lock");
+ } catch (MusicServiceException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Error in music, when locking key: "+fullyQualifiedKey);
+ throw new MDBCServiceException("Error in music, when locking: "+fullyQualifiedKey);
+ } catch (MusicQueryException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Error in executing query music, when locking key: "+fullyQualifiedKey);
+ throw new MDBCServiceException("Error in executing query music, when locking: "+fullyQualifiedKey);
+ }
+ }
+ if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) {
+ throw new MDBCServiceException("Could not lock the corresponding lock");
+ }
+ //TODO: Java newbie here, verify that this lockId is actually assigned to the global DatabasePartition in the StateManager instance
+ partition.setLockId(lockId);
+ return lockId;
+ }
+
+
+ protected void appendIndexToMri(String lockId, UUID commitId, UUID MriIndex) throws MDBCServiceException{
+ PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MriIndex, musicTxDigestTableName, commitId);
+ ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, MriIndex.toString(), appendQuery, lockId, null);
+ if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
+ logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage());
+ throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage());
+ }
+ }
+
+ /**
+ * Writes the transaction information to metric's txDigest and musicRangeInformation table
+ * This officially commits the transaction globally
+ */
@Override
- public DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) {
+ public void commitLog(DatabasePartition partition,
+ HashMap<Range,StagingTable> transactionDigest, String txId ,
+ TxCommitProgress progressKeeper) throws MDBCServiceException {
+ if (transactionDigest==null || transactionDigest.size()==0) {
+ return;
+ }
+ logger.info("Commiting lock for " + partition.getMusicRangeInformationIndex() + ", txID=" + txId);
+ UUID mriIndex = partition.getMusicRangeInformationIndex();
+ if(mriIndex==null) {
+ //\TODO Fetch MriIndex from the Range Information Table
+ throw new MDBCServiceException("TIT Index retrieval not yet implemented");
+ }
+ String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex.toString();
+ //0. See if reference to lock was already created
+ String lockId = partition.getLockId();
+ if(lockId == null || lockId.isEmpty()) {
+ lockId = createAndAssignLock(fullyQualifiedMriKey,partition);
+ }
+
+ UUID commitId;
+ //Generate a local commit id
+ if(progressKeeper.containsTx(txId)) {
+ commitId = progressKeeper.getCommitId(txId);
+ }
+ else{
+ logger.error(EELFLoggerDelegate.errorLogger, "Tx with id "+txId+" was not created in the TxCommitProgress ");
+ throw new MDBCServiceException("Tx with id "+txId+" was not created in the TxCommitProgress ");
+ }
+ //Add creation type of transaction digest
+
+ //1. Push new row to RRT and obtain its index
+ String serializedTransactionDigest;
+ try {
+ serializedTransactionDigest = MDBCUtils.toString(transactionDigest);
+ } catch (IOException e) {
+ throw new MDBCServiceException("Failed to serialized transaction digest with error "+e.toString());
+ }
+ MusicTxDigestId digestId = new MusicTxDigestId(commitId);
+ addTxDigest(digestId, serializedTransactionDigest);
+ //2. Save RRT index to RQ
+ if(progressKeeper!= null) {
+ progressKeeper.setRecordId(txId,digestId);
+ }
+ //3. Append RRT index into the corresponding TIT row array
+ appendToRedoLog(mriIndex,partition,digestId);
+ }
+
+ /**
+ * @param tableName
+ * @param string
+ * @param rowValues
+ * @return
+ */
+ @SuppressWarnings("unused")
+ private String getUid(String tableName, String string, Object[] rowValues) {
+ //
+ // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update
+ String cql = String.format("SELECT * FROM %s.%s;", music_ns, tableName);
+ PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind();
+ bound.setReadTimeoutMillis(60000);
+ Session sess = getMusicSession();
+ ResultSet rs;
+ synchronized (sess) {
+ rs = sess.execute(bound);
+ }
+
+ //should never reach here
+ logger.error(EELFLoggerDelegate.errorLogger, "Could not find the row in the primary key");
return null;
}
@Override
- public void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) {
- }
+ public Object[] getObjects(TableInfo ti, String tableName, JSONObject row) {
+ // \FIXME: we may need to add the primary key of the row if it was autogenerated by MUSIC
+ List<String> cols = ti.columns;
+ int size = cols.size();
+ boolean hasDefault = false;
+ if(row.has(getMusicDefaultPrimaryKeyName())) {
+ size++;
+ hasDefault = true;
+ }
- @Override
- public void addTxDigest(MusicTxDigestId newId, String transactionDigest) {
+ Object[] objects = new Object[size];
+ int idx = 0;
+ if(hasDefault) {
+ objects[idx++] = row.getString(getMusicDefaultPrimaryKeyName());
+ }
+ for(String col : ti.columns) {
+ objects[idx]=row.get(col);
+ }
+ return objects;
}
-
+
@Override
- public void own(List<Range> ranges) {
- throw new java.lang.UnsupportedOperationException("function not implemented yet");
+ public List<UUID> getPartitionIndexes() {
+ ArrayList<UUID> partitions = new ArrayList<UUID>();
+ String cql = String.format("SELECT rangeid FROM %s.%s", music_ns, musicRangeInformationTableName);
+ ResultSet rs = executeMusicRead(cql);
+ for (Row r: rs) {
+ partitions.add(r.getUUID("rangeid"));
+ }
+ return partitions;
+ }
+
+ @Override
+ public MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException {
+ //TODO: verify that lock id is valid before calling the database operations function
+ //UUID id = partition.getMusicRangeInformationIndex();
+
+ String cql = String.format("SELECT * FROM %s.%s WHERE rangeid = ?;", music_ns, musicRangeInformationTableName);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ pQueryObject.addValue(partitionIndex);
+ Row newRow;
+ try {
+ newRow = executeMusicUnlockedQuorumGet(pQueryObject);
+ } catch (MDBCServiceException e) {
+ logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName);
+ throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
+ }
+
+ List<TupleValue> log = newRow.getList("txredolog",TupleValue.class);
+ List<MusicTxDigestId> digestIds = new ArrayList<>();
+ for(TupleValue t: log){
+ //final String tableName = t.getString(0);
+ final UUID index = t.getUUID(1);
+ digestIds.add(new MusicTxDigestId(index));
+ }
+ List<Range> partitions = new ArrayList<>();
+ Set<String> tables = newRow.getSet("keys",String.class);
+ for (String table:tables){
+ partitions.add(new Range(table));
+ }
+ return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex, ""),
+ digestIds, newRow.getString("ownerid"),newRow.getString("metricprocessid"));
+ }
+
+
+ /**
+ * This function creates the TransactionInformation table. It contain information related
+ * to the transactions happening in a given partition.
+ * * The schema of the table is
+ * * Id, uiid.
+ * * Partition, uuid id of the partition
+ * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables
+ * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables
+ * * Redo: list of uiids associated to the Redo Records Table
+ *
+ */
+ private void createMusicRangeInformationTable() throws MDBCServiceException {
+ String tableName = this.musicRangeInformationTableName;
+ String priKey = "rangeid";
+ StringBuilder fields = new StringBuilder();
+ fields.append("rangeid uuid, ");
+ fields.append("keys set<text>, ");
+ fields.append("ownerid text, ");
+ fields.append("metricprocessid text, ");
+ //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly
+ fields.append("txredolog list<frozen<tuple<text,uuid>>> ");
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));",
+ this.music_ns, tableName, fields, priKey);
+ try {
+ executeMusicWriteQuery(this.music_ns,tableName,cql);
+ } catch (MDBCServiceException e) {
+ logger.error("Initialization error: Failure to create transaction information table");
+ throw(e);
+ }
+ }
+
+
+ @Override
+ public DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException {
+ DatabasePartition newPartition = info.getDBPartition();
+ String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+newPartition.getMusicRangeInformationIndex().toString();
+ String lockId = createAndAssignLock(fullyQualifiedMriKey,newPartition);
+ createEmptyMriRow(info.getMetricProcessId(),lockId,new ArrayList<Range>());
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Creates a new empty MRI row
+ * @param processId id of the process that is going to own initially this.
+ * @return uuid associated to the new row
+ */
+ private UUID createEmptyMriRow(String processId, String lockId, List<Range> ranges)
+ throws MDBCServiceException {
+ UUID id = MDBCUtils.generateUniqueKey();
+ return createEmptyMriRow(id,processId,lockId,ranges);
+ }
+
+ /**
+ * Creates a new empty MRI row
+ * @param processId id of the process that is going to own initially this.
+ * @return uuid associated to the new row
+ */
+ private UUID createEmptyMriRow(UUID id, String processId, String lockId, List<Range> ranges)
+ throws MDBCServiceException{
+ StringBuilder insert = new StringBuilder("INSERT INTO ")
+ .append(this.music_ns)
+ .append('.')
+ .append(this.musicRangeInformationTableName)
+ .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ")
+ .append("(")
+ .append(id)
+ .append(",{");
+ boolean first=true;
+ for(Range r: ranges){
+ if(first){ first=false; }
+ else {
+ insert.append(',');
+ }
+ insert.append("'").append(r.toString()).append("'");
+ }
+ insert.append("},'")
+ .append((lockId==null)?"":lockId)
+ .append("','")
+ .append(processId)
+ .append("',[]);");
+ PreparedQueryObject query = new PreparedQueryObject();
+ query.appendQueryString(insert.toString());
+ try {
+ executeMusicLockedPut(this.music_ns,this.musicRangeInformationTableName,id.toString(),query,lockId,null);
+ } catch (MDBCServiceException e) {
+ logger.error("Initialization error: Failure to add new row to transaction information");
+ throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
+ }
+ return id;
+ }
+
+ @Override
+ public void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException {
+ logger.info("Appending to redo log for partition " + partition.getMusicRangeInformationIndex() + " txId=" + newRecord.txId);
+ PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, mriRowId, musicTxDigestTableName, newRecord.txId);
+ ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, mriRowId.toString(), appendQuery, partition.getLockId(), null);
+ if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
+ logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage());
+ throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage());
+ }
+ }
+
+ public void createMusicTxDigest() throws MDBCServiceException {
+ createMusicTxDigest(-1);
+ }
+
+
+ /**
+ * This function creates the MusicTxDigest table. It contain information related to each transaction committed
+ * * LeaseId: id associated with the lease, text
+ * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
+ * * TransactionDigest: text that contains all the changes in the transaction
+ */
+ private void createMusicTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException {
+ String tableName = this.musicTxDigestTableName;
+ if(musicTxDigestTableNumber >= 0) {
+ tableName = tableName +
+ "-" +
+ Integer.toString(musicTxDigestTableNumber);
+ }
+ String priKey = "txid";
+ StringBuilder fields = new StringBuilder();
+ fields.append("txid uuid, ");
+ fields.append("transactiondigest text ");//notice lack of ','
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey);
+ try {
+ executeMusicWriteQuery(this.music_ns,tableName,cql);
+ } catch (MDBCServiceException e) {
+ logger.error("Initialization error: Failure to create redo records table");
+ throw(e);
+ }
}
+
+
+ /**
+ * Writes the transaction history to the txDigest
+ */
+ @Override
+ public void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException {
+ //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest);
+ PreparedQueryObject query = new PreparedQueryObject();
+ String cqlQuery = "INSERT INTO " +
+ this.music_ns +
+ '.' +
+ this.musicTxDigestTableName +
+ " (txid,transactiondigest) " +
+ "VALUES (" +
+ newId.txId + ",'" +
+ transactionDigest +
+ "');";
+ query.appendQueryString(cqlQuery);
+ //\TODO check if I am not shooting on my own foot
+ try {
+ MusicCore.nonKeyRelatedPut(query,"critical");
+ } catch (MusicServiceException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage());
+ throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString());
+ }
+ }
+
- @Override
- public void appendRange(String rangeId, List<Range> ranges) {
- throw new java.lang.UnsupportedOperationException("function not implemented yet");
+
+ @Override
+ public HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException {
+ String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ pQueryObject.addValue(id.txId);
+ Row newRow;
+ try {
+ newRow = executeMusicUnlockedQuorumGet(pQueryObject);
+ } catch (MDBCServiceException e) {
+ logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.txId);
+ throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
+ }
+ String digest = newRow.getString("transactiondigest");
+ HashMap<Range,StagingTable> changes;
+ try {
+ changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest);
+ } catch (IOException e) {
+ logger.error("IOException when deserializing digest failed with an invalid class for id:"+id.txId);
+ throw new MDBCServiceException("Deserializng digest failed with ioexception");
+ } catch (ClassNotFoundException e) {
+ logger.error("Deserializng digest failed with an invalid class for id:"+id.txId);
+ throw new MDBCServiceException("Deserializng digest failed with an invalid class");
+ }
+ return changes;
+ }
+
+ @Override
+ public void own(List<Range> ranges){
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendRange(String rangeId, List<Range> ranges){
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void relinquish(String ownerId, String rangeId){
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * This method executes a write query in Music
+ * @param cql the CQL to be sent to Cassandra
+ */
+ private static void executeMusicWriteQuery(String keyspace, String table, String cql)
+ throws MDBCServiceException {
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ ResultType rt = null;
+ try {
+ rt = MusicCore.createTable(keyspace,table,pQueryObject,"critical");
+ } catch (MusicServiceException e) {
+ //\TODO: handle better, at least transform into an MDBCServiceException
+ e.printStackTrace();
+ }
+ String result = rt.getResult();
+ if (result==null || result.toLowerCase().equals("failure")) {
+ throw new MDBCServiceException("Music eventual put failed");
+ }
}
- @Override
- public void relinquish(String ownerId, String rangeId) {
- throw new java.lang.UnsupportedOperationException("function not implemented yet");
+ private static Row executeMusicLockedGet(String keyspace, String table, PreparedQueryObject cqlObject, String primaryKey,
+ String lock)
+ throws MDBCServiceException{
+ ResultSet result;
+ try {
+ result = MusicCore.criticalGet(keyspace,table,primaryKey,cqlObject,lock);
+ } catch(MusicServiceException e){
+ //\TODO: handle better, at least transform into an MDBCServiceException
+ e.printStackTrace();
+ throw new MDBCServiceException("Error executing critical get");
+ }
+ if(result.isExhausted()){
+ throw new MDBCServiceException("There is not a row that matches the id "+primaryKey);
+ }
+ return result.one();
}
- @Override
- public List<UUID> getPartitionIndexes() {
- // TODO Auto-generated method stub
- return null;
+ private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject)
+ throws MDBCServiceException{
+ ResultSet result = MusicCore.quorumGet(cqlObject);
+ //\TODO: handle better, at least transform into an MDBCServiceException
+ if(result.isExhausted()){
+ throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]");
+ }
+ return result.one();
}
- @Override
- public MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException {
- // TODO Auto-generated method stub
- return null;
+ private void executeMusicLockedPut(String namespace, String tableName,
+ String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId,
+ MusicCore.Condition conditionInfo) throws MDBCServiceException {
+ ReturnType rt ;
+ if(lockId==null) {
+ try {
+ rt = MusicCore.atomicPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, conditionInfo);
+ } catch (MusicLockingException e) {
+ logger.error("Music locked put failed");
+ throw new MDBCServiceException("Music locked put failed");
+ } catch (MusicServiceException e) {
+ logger.error("Music service fail: Music locked put failed");
+ throw new MDBCServiceException("Music service fail: Music locked put failed");
+ } catch (MusicQueryException e) {
+ logger.error("Music query fail: locked put failed");
+ throw new MDBCServiceException("Music query fail: Music locked put failed");
+ }
+ }
+ else {
+ rt = MusicCore.criticalPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, lockId, conditionInfo);
+ }
+ if (rt.getResult().getResult().toLowerCase().equals("failure")) {
+ throw new MDBCServiceException("Music locked put failed");
+ }
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
index 5943b34..383d522 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
@@ -37,7 +37,6 @@ import org.json.JSONObject;
import org.json.JSONTokener;
import org.onap.music.logging.EELFLoggerDelegate;
-import org.onap.music.mdbc.MusicSqlManager;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
import org.onap.music.mdbc.tables.OperationType;
@@ -71,7 +70,7 @@ public class MySQLMixin implements DBInterface {
"CREATE TABLE IF NOT EXISTS "+TRANS_TBL+
" (IX INT AUTO_INCREMENT, OP CHAR(1), TABLENAME VARCHAR(255), NEWROWDATA VARCHAR(1024), KEYDATA VARCHAR(1024), CONNECTION_ID INT,PRIMARY KEY (IX))";
- private final MusicSqlManager msm;
+ private final MusicInterface mi;
private final int connId;
private final String dbName;
private final Connection dbConnection;
@@ -79,14 +78,14 @@ public class MySQLMixin implements DBInterface {
private boolean server_tbl_created = false;
public MySQLMixin() {
- this.msm = null;
+ this.mi = null;
this.connId = 0;
this.dbName = null;
this.dbConnection = null;
this.tables = null;
}
- public MySQLMixin(MusicSqlManager msm, String url, Connection conn, Properties info) {
- this.msm = msm;
+ public MySQLMixin(MusicInterface mi, String url, Connection conn, Properties info) {
+ this.mi = mi;
this.connId = generateConnID(conn);
this.dbName = getDBName(conn);
this.dbConnection = conn;
@@ -585,13 +584,13 @@ NEW.field refers to the new value
// the actual columns, otherwise performance when doing range queries are going
// to be even worse (see the else bracket down)
//
- String musicKey = msm.generateUniqueKey();
+ String musicKey = mi.generateUniqueKey();
/*} else {
//get key from data
musicKey = msm.getMusicKeyFromRowWithoutPrimaryIndexes(tbl,newRow);
}*/
- newRow.put(msm.getMusicDefaultPrimaryKeyName(), musicKey);
- keydataStr.put(msm.getMusicDefaultPrimaryKeyName(), musicKey);
+ newRow.put(mi.getMusicDefaultPrimaryKeyName(), musicKey);
+ keydataStr.put(mi.getMusicDefaultPrimaryKeyName(), musicKey);
}
/*else {
//Use the keys
@@ -646,8 +645,8 @@ NEW.field refers to the new value
JSONObject jo = new JSONObject();
if (!getTableInfo(tableName).hasKey()) {
- String musicKey = msm.generateUniqueKey();
- jo.put(msm.getMusicDefaultPrimaryKeyName(), musicKey);
+ String musicKey = mi.generateUniqueKey();
+ jo.put(mi.getMusicDefaultPrimaryKeyName(), musicKey);
}
for (String col : ti.columns) {
@@ -655,7 +654,7 @@ NEW.field refers to the new value
}
@SuppressWarnings("unused")
- Object[] row = Utils.jsonToRow(ti,tableName, jo,msm.getMusicDefaultPrimaryKeyName());
+ Object[] row = Utils.jsonToRow(ti,tableName, jo, mi.getMusicDefaultPrimaryKeyName());
//\FIXME this is wrong now, update of the dirty row and entity is now handled by the archival process
//msm.updateDirtyRowAndEntityTableInMusic(ti,tableName, jo);
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
index 0a5bd60..cb5d28e 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
@@ -40,7 +40,7 @@ import org.onap.music.mdbc.MdbcServerLogic;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.StateManager;
import org.onap.music.mdbc.configurations.NodeConfiguration;
-import org.onap.music.mdbc.mixins.CassandraMixin;
+import org.onap.music.mdbc.mixins.MusicMixin;
import org.onap.music.mdbc.mixins.MusicInterface;
import com.datastax.driver.core.Row;
@@ -88,7 +88,7 @@ public class MusicTxDigest {
while(colIterator.hasNext()) {
String col = colIterator.next();
//FIXME: should not explicitly refer to cassandramixin
- if (col.equals(CassandraMixin.MDBC_PRIMARYKEY_NAME)) {
+ if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) {
//reserved name
continue;
}
@@ -140,6 +140,8 @@ public class MusicTxDigest {
case SELECT:
//no update happened, do nothing
break;
+ default:
+ logger.error(op.getOperationType() + "not implemented for replay");
}
}
diff --git a/mdbc-server/src/main/resources/mdbc.properties b/mdbc-server/src/main/resources/mdbc.properties
index d3feee2..73e8f77 100755
--- a/mdbc-server/src/main/resources/mdbc.properties
+++ b/mdbc-server/src/main/resources/mdbc.properties
@@ -2,11 +2,9 @@
# A list of all Mixins that should be checked by MDBC
#
MIXINS= \
- org.onap.music.mdbc.mixins.H2Mixin \
- org.onap.music.mdbc.mixins.H2ServerMixin \
org.onap.music.mdbc.mixins.MySQLMixin \
- org.onap.music.mdbc.mixins.CassandraMixin \
- org.onap.music.mdbc.mixins.Cassandra2Mixin
+ org.onap.music.mdbc.mixins.MusicMixin \
+ org.onap.music.mdbc.mixins.Music2Mixin
DEFAULT_DRIVERS=\
org.h2.Driver \
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java
index eab38d3..388f8b8 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java
@@ -38,5 +38,13 @@ public class MusicTxDigestTest {
HashMap<Range, StagingTable> digest = (HashMap<Range, StagingTable>) MDBCUtils.fromString(t1);
txDigest.replayTxDigest(digest);
}
+
+ @Test
+ public void testEmptyDigest() throws Exception {
+ String t1 = "rO0ABXNyABFqYXZhLnV0aWwuSGFzaE1hcAUH2sHDFmDRAwACRgAKbG9hZEZhY3RvckkACXRocmVzaG9sZHhwP0AAAAAAAAB3CAAAABAAAAAAeA==";
+ MusicTxDigest txDigest = new MusicTxDigest(null);
+ HashMap<Range, StagingTable> digest = (HashMap<Range, StagingTable>) MDBCUtils.fromString(t1);
+ txDigest.replayTxDigest(digest);
+ }
}
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/test/TestCommon.java b/mdbc-server/src/test/java/org/onap/music/mdbc/test/TestCommon.java
index c618f3b..7e7dc88 100755
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/test/TestCommon.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/test/TestCommon.java
@@ -24,7 +24,7 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
-import org.onap.music.mdbc.mixins.CassandraMixin;
+import org.onap.music.mdbc.mixins.MusicMixin;
public class TestCommon {
public static final String DB_DRIVER = "avatica.Driver";
@@ -34,9 +34,9 @@ public class TestCommon {
public Connection getDBConnection(String url, String keyspace, String id) throws SQLException, ClassNotFoundException {
Class.forName(DB_DRIVER);
Properties driver_info = new Properties();
- driver_info.put(CassandraMixin.KEY_MY_ID, id);
- driver_info.put(CassandraMixin.KEY_REPLICAS, "0,1,2");
- driver_info.put(CassandraMixin.KEY_MUSIC_ADDRESS, "localhost");
+ driver_info.put(MusicMixin.KEY_MY_ID, id);
+ driver_info.put(MusicMixin.KEY_REPLICAS, "0,1,2");
+ driver_info.put(MusicMixin.KEY_MUSIC_ADDRESS, "localhost");
driver_info.put("user", DB_USER);
driver_info.put("password", DB_PASSWORD);
return DriverManager.getConnection(url, driver_info);