aboutsummaryrefslogtreecommitdiffstats
path: root/mdbc-server
diff options
context:
space:
mode:
authorTschaen, Brendan <ctschaen@att.com>2018-11-20 15:29:12 -0500
committerTschaen, Brendan <ctschaen@att.com>2018-11-20 15:43:30 -0500
commit1359f1023594c201b91fff73b2baa3f5d5cf6fd6 (patch)
tree30c3ac2940f82414a76e59bc8ed916ab9599b5f2 /mdbc-server
parent83db3eb4ccf7e636f586a6873a966e14ba1685ae (diff)
MusicSqlManager Removal, othe code refactors
MusicSqlManager not required in current setup Current code condensed and absorbed into MdbcConnection and MusicMixin Modify deprecated names "CassandraMixin" to "MusicMixin" Change-Id: If68f8937a385c19d901c56e5795ddc0acf45d399 Issue-ID: MUSIC-198 Signed-off-by: Tschaen, Brendan <ctschaen@att.com>
Diffstat (limited to 'mdbc-server')
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java2
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcCallableStatement.java8
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java162
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java56
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java10
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java50
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java329
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java46
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java9
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java2
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java (renamed from mdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java)3
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java1502
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java8
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java (renamed from mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java)10
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java3
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java1493
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java21
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java6
-rwxr-xr-xmdbc-server/src/main/resources/mdbc.properties6
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java8
-rwxr-xr-xmdbc-server/src/test/java/org/onap/music/mdbc/test/TestCommon.java8
21 files changed, 1636 insertions, 2106 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
index 2442af1..2723490 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
@@ -34,7 +34,7 @@ import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.logging.format.AppMessages;
import org.onap.music.logging.format.ErrorSeverity;
import org.onap.music.logging.format.ErrorTypes;
-import org.onap.music.mdbc.mixins.CassandraMixin;
+import org.onap.music.mdbc.mixins.MusicMixin;
import org.onap.music.mdbc.mixins.Utils;
import org.onap.music.mdbc.tables.Operation;
import org.onap.music.mdbc.tables.StagingTable;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcCallableStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcCallableStatement.java
index b820686..d4d20ad 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcCallableStatement.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcCallableStatement.java
@@ -53,12 +53,12 @@ public class MdbcCallableStatement extends MdbcPreparedStatement implements Call
@SuppressWarnings("unused")
private static final String DATASTAX_PREFIX = "com.datastax.driver";
- public MdbcCallableStatement(Statement stmt, MusicSqlManager m) {
- super(stmt, m);
+ public MdbcCallableStatement(Statement stmt, MdbcConnection mConn) {
+ super(stmt, mConn);
}
- public MdbcCallableStatement(Statement stmt, String sql, MusicSqlManager mgr) {
- super(stmt, sql, mgr);
+ public MdbcCallableStatement(Statement stmt, String sql, MdbcConnection mConn) {
+ super(stmt, sql, mConn);
}
@Override
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
index 352eacf..cca14c0 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
@@ -34,8 +34,12 @@ import java.sql.SQLXML;
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.Executor;
import org.onap.music.exceptions.MDBCServiceException;
@@ -44,7 +48,10 @@ import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.logging.format.AppMessages;
import org.onap.music.logging.format.ErrorSeverity;
import org.onap.music.logging.format.ErrorTypes;
+import org.onap.music.mdbc.mixins.DBInterface;
+import org.onap.music.mdbc.mixins.MixinFactory;
import org.onap.music.mdbc.mixins.MusicInterface;
+import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.TxCommitProgress;
@@ -61,25 +68,28 @@ public class MdbcConnection implements Connection {
private final String id; // This is the transaction id, assigned to this connection. There is no need to change the id, if connection is reused
private final Connection conn; // the JDBC Connection to the actual underlying database
- private final MusicSqlManager mgr; // there should be one MusicSqlManager in use per Connection
+ private final MusicInterface mi;
private final TxCommitProgress progressKeeper;
private final DatabasePartition partition;
+ private final DBInterface dbi;
+ private final HashMap<Range,StagingTable> transactionDigest;
+ private final Set<String> table_set;
public MdbcConnection(String id, String url, Connection c, Properties info, MusicInterface mi, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException {
this.id = id;
+ this.table_set = Collections.synchronizedSet(new HashSet<String>());
+ this.transactionDigest = new HashMap<Range,StagingTable>();
+
if (c == null) {
throw new MDBCServiceException("Connection is null");
}
this.conn = c;
+ info.putAll(MDBCUtils.getMdbcProperties());
+ String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT);
+ this.dbi = MixinFactory.createDBInterface(mixinDb, mi, url, conn, info);
+ this.mi = mi;
try {
- this.mgr = new MusicSqlManager(url, c, info, mi);
- } catch (MDBCServiceException e) {
- logger.error("Failure in creating Music SQL Manager");
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
- throw e;
- }
- try {
- this.mgr.setAutoCommit(c.getAutoCommit(),null,null,null);
+ this.setAutoCommit(c.getAutoCommit());
} catch (SQLException e) {
logger.error("Failure in autocommit");
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
@@ -87,19 +97,15 @@ public class MdbcConnection implements Connection {
// Verify the tables in MUSIC match the tables in the database
// and create triggers on any tables that need them
- //mgr.synchronizeTableData();
- if ( mgr != null ) try {
- mgr.synchronizeTables();
+ try {
+ this.synchronizeTables();
} catch (QueryException e) {
logger.error("Error syncrhonizing tables");
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
}
- else {
- logger.error(EELFLoggerDelegate.errorLogger, "MusicSqlManager was not correctly created", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
- throw new MDBCServiceException("Music SQL Manager object is null or invalid");
- }
this.progressKeeper = progressKeeper;
this.partition = partition;
+
logger.debug("Mdbc connection created with id: "+id);
}
@@ -117,18 +123,18 @@ public class MdbcConnection implements Connection {
@Override
public Statement createStatement() throws SQLException {
- return new MdbcCallableStatement(conn.createStatement(), mgr);
+ return new MdbcCallableStatement(conn.createStatement(), this);
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
//TODO: grab the sql call from here and all the other preparestatement calls
- return new MdbcPreparedStatement(conn.prepareStatement(sql), sql, mgr);
+ return new MdbcPreparedStatement(conn.prepareStatement(sql), sql, this);
}
@Override
public CallableStatement prepareCall(String sql) throws SQLException {
- return new MdbcCallableStatement(conn.prepareCall(sql), mgr);
+ return new MdbcCallableStatement(conn.prepareCall(sql), this);
}
@Override
@@ -141,13 +147,23 @@ public class MdbcConnection implements Connection {
boolean b = conn.getAutoCommit();
if (b != autoCommit) {
if(progressKeeper!=null) progressKeeper.commitRequested(id);
- try {
- mgr.setAutoCommit(autoCommit,id,progressKeeper,partition);
- if(progressKeeper!=null)
- progressKeeper.setMusicDone(id);
- } catch (MDBCServiceException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
- throw new SQLException("Failure commiting to MUSIC");
+ logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b);
+ if (b) {
+ // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction
+ if(id == null || id.isEmpty()) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
+ throw new SQLException("tx id is null");
+ }
+ try {
+ mi.commitLog(partition, transactionDigest, id, progressKeeper);
+ } catch (MDBCServiceException e) {
+ // TODO Auto-generated catch block
+ logger.error("Cannot commit log to music" + e.getStackTrace());
+ throw new SQLException(e.getMessage());
+ }
+ }
+ if(progressKeeper!=null) {
+ progressKeeper.setMusicDone(id);
}
conn.setAutoCommit(autoCommit);
if(progressKeeper!=null) {
@@ -164,6 +180,11 @@ public class MdbcConnection implements Connection {
return conn.getAutoCommit();
}
+ /**
+ * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed,
+ * they are performed now and copied into MUSIC.
+ * @throws SQLException
+ */
@Override
public void commit() throws SQLException {
if(progressKeeper.isComplete(id)) {
@@ -174,7 +195,9 @@ public class MdbcConnection implements Connection {
}
try {
- mgr.commit(id,progressKeeper,partition);
+ logger.debug(EELFLoggerDelegate.applicationLogger, " commit ");
+ // transaction was committed -- add all the updates into the REDO-Log in MUSIC
+ mi.commitLog(partition, transactionDigest, id, progressKeeper);
} catch (MDBCServiceException e) {
//If the commit fail, then a new commitId should be used
logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
@@ -196,19 +219,26 @@ public class MdbcConnection implements Connection {
}
}
+ /**
+ * Perform a rollback, as requested by the JDBC driver. If any row updates have been delayed,
+ * they are discarded.
+ */
@Override
public void rollback() throws SQLException {
- mgr.rollback();
+ logger.debug(EELFLoggerDelegate.applicationLogger, "Rollback");;
+ transactionDigest.clear();
conn.rollback();
progressKeeper.reinitializeTxProgress(id);
}
+ /**
+ * Close this MdbcConnection.
+ */
@Override
public void close() throws SQLException {
logger.debug("Closing mdbc connection with id:"+id);
- if (mgr != null) {
- logger.debug("Closing mdbc manager with id:"+id);
- mgr.close();
+ if (dbi != null) {
+ dbi.close();
}
if (conn != null && !conn.isClosed()) {
logger.debug("Closing jdbc from mdbc with id:"+id);
@@ -269,18 +299,18 @@ public class MdbcConnection implements Connection {
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
- return new MdbcCallableStatement(conn.createStatement(resultSetType, resultSetConcurrency), mgr);
+ return new MdbcCallableStatement(conn.createStatement(resultSetType, resultSetConcurrency), this);
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
throws SQLException {
- return new MdbcCallableStatement(conn.prepareStatement(sql, resultSetType, resultSetConcurrency), sql, mgr);
+ return new MdbcCallableStatement(conn.prepareStatement(sql, resultSetType, resultSetConcurrency), sql, this);
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
- return new MdbcCallableStatement(conn.prepareCall(sql, resultSetType, resultSetConcurrency), mgr);
+ return new MdbcCallableStatement(conn.prepareCall(sql, resultSetType, resultSetConcurrency), this);
}
@Override
@@ -326,34 +356,34 @@ public class MdbcConnection implements Connection {
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
throws SQLException {
- return new MdbcCallableStatement(conn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability), mgr);
+ return new MdbcCallableStatement(conn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability), this);
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
int resultSetHoldability) throws SQLException {
- return new MdbcCallableStatement(conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql, mgr);
+ return new MdbcCallableStatement(conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql, this);
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
int resultSetHoldability) throws SQLException {
- return new MdbcCallableStatement(conn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability), mgr);
+ return new MdbcCallableStatement(conn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability), this);
}
@Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
- return new MdbcPreparedStatement(conn.prepareStatement(sql, autoGeneratedKeys), sql, mgr);
+ return new MdbcPreparedStatement(conn.prepareStatement(sql, autoGeneratedKeys), sql, this);
}
@Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
- return new MdbcPreparedStatement(conn.prepareStatement(sql, columnIndexes), sql, mgr);
+ return new MdbcPreparedStatement(conn.prepareStatement(sql, columnIndexes), sql, this);
}
@Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
- return new MdbcPreparedStatement(conn.prepareStatement(sql, columnNames), sql, mgr);
+ return new MdbcPreparedStatement(conn.prepareStatement(sql, columnNames), sql, this);
}
@Override
@@ -435,4 +465,56 @@ public class MdbcConnection implements Connection {
public int getNetworkTimeout() throws SQLException {
return conn.getNetworkTimeout();
}
+
+
+ /**
+ * Code to be run within the DB driver before a SQL statement is executed. This is where tables
+ * can be synchronized before a SELECT, for those databases that do not support SELECT triggers.
+ * @param sql the SQL statement that is about to be executed
+ */
+ public void preStatementHook(String sql) {
+ dbi.preStatementHook(sql);
+ }
+
+ /**
+ * Code to be run within the DB driver after a SQL statement has been executed. This is where remote
+ * statement actions can be copied back to Cassandra/MUSIC.
+ * @param sql the SQL statement that was executed
+ */
+ public void postStatementHook(String sql) {
+ dbi.postStatementHook(sql, transactionDigest);
+ }
+
+ /**
+ * Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the
+ * proxy first starts, and whenever there is the possibility that tables were created or dropped. It is synchronized
+ * in order to prevent multiple threads from running this code in parallel.
+ */
+ public void synchronizeTables() throws QueryException {
+ Set<String> set1 = dbi.getSQLTableSet(); // set of tables in the database
+ logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1);
+ for (String tableName : set1) {
+ // This map will be filled in if this table was previously discovered
+ tableName = tableName.toUpperCase();
+ if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) {
+ logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName);
+ try {
+ TableInfo ti = dbi.getTableInfo(tableName);
+ mi.initializeMusicForTable(ti,tableName);
+ //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted
+ ti = dbi.getTableInfo(tableName);
+ mi.createDirtyRowTable(ti,tableName);
+ dbi.createSQLTriggers(tableName);
+ table_set.add(tableName);
+ dbi.synchronizeData(tableName);
+ logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" +
+ table_set.size() + "/" + set1.size() + "tables uploaded");
+ } catch (Exception e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
+ //logger.error(EELFLoggerDelegate.errorLogger, "Exception synchronizeTables: "+e);
+ throw new QueryException();
+ }
+ }
+ }
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java
index fa08d3c..092aa94 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java
@@ -62,13 +62,13 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
String[] params; // holds the parameters if prepared statement, indexing starts at 1
- public MdbcPreparedStatement(Statement stmt, MusicSqlManager m) {
- super(stmt, m);
+ public MdbcPreparedStatement(Statement stmt, MdbcConnection mConn) {
+ super(stmt, mConn);
this.sql = null;
}
- public MdbcPreparedStatement(Statement stmt, String sql, MusicSqlManager mgr) {
- super(stmt, sql, mgr);
+ public MdbcPreparedStatement(Statement stmt, String sql, MdbcConnection mConn) {
+ super(stmt, sql, mConn);
this.sql = sql;
//indexing starts at 1
params = new String[StringUtils.countMatches(sql, "?")+1];
@@ -89,9 +89,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"executeQuery: "+sql);
ResultSet r = null;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
r = stmt.executeQuery(sql);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -108,9 +108,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -189,9 +189,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -301,9 +301,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql, autoGeneratedKeys);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -319,9 +319,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql, columnIndexes);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -337,9 +337,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql, columnNames);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -355,9 +355,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql, autoGeneratedKeys);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -373,9 +373,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql, columnIndexes);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -391,9 +391,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql, columnNames);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -439,9 +439,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"executeQuery: "+sql);
ResultSet r = null;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
r = ((PreparedStatement)stmt).executeQuery();;
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
e.printStackTrace();
@@ -458,9 +458,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = ((PreparedStatement)stmt).executeUpdate();
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
e.printStackTrace();
@@ -579,9 +579,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = ((PreparedStatement)stmt).execute();
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
e.printStackTrace();
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
index cccea92..757b468 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
@@ -97,7 +97,7 @@ public class MdbcServerLogic extends JdbcMeta{
//\TODO: don't use connectionCache, use this.manager internal state
Connection conn = connectionCache.getIfPresent(id);
if (conn == null) {
- this.manager.CloseConnection(id);
+ this.manager.closeConnection(id);
logger.error(EELFLoggerDelegate.errorLogger,"Connection not found: invalid id, closed, or expired: "
+ id);
throw new RuntimeException(" Connection not found: invalid id, closed, or expired: " + id);
@@ -119,8 +119,8 @@ public class MdbcServerLogic extends JdbcMeta{
}
// Avoid global synchronization of connection opening
try {
- this.manager.OpenConnection(ch.id, info);
- Connection conn = this.manager.GetConnection(ch.id);
+ this.manager.openConnection(ch.id, info);
+ Connection conn = this.manager.getConnection(ch.id);
if(conn == null) {
logger.error(EELFLoggerDelegate.errorLogger, "Connection created was null");
throw new RuntimeException("Connection created was null for connection: " + ch.id);
@@ -130,7 +130,7 @@ public class MdbcServerLogic extends JdbcMeta{
// Race condition: someone beat us to storing the connection in the cache.
if (loadedConn != null) {
//\TODO check if we added an additional race condition for this
- this.manager.CloseConnection(ch.id);
+ this.manager.closeConnection(ch.id);
conn.close();
throw new RuntimeException("Connection already exists: " + ch.id);
}
@@ -156,7 +156,7 @@ public class MdbcServerLogic extends JdbcMeta{
throw new RuntimeException(e.getMessage());
} finally {
connectionCache.invalidate(ch.id);
- this.manager.CloseConnection(ch.id);
+ this.manager.closeConnection(ch.id);
logger.info("connection closed with id {}", ch.id);
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java
index 320531f..97f4be4 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java
@@ -42,18 +42,18 @@ public class MdbcStatement implements Statement {
private static final String DATASTAX_PREFIX = "com.datastax.driver";
final Statement stmt; // the Statement that we are proxying
- final MusicSqlManager mgr;
+ final MdbcConnection mConn;
//\TODO We may need to all pass the connection object to support autocommit
- public MdbcStatement(Statement s, MusicSqlManager m) {
+ public MdbcStatement(Statement s, MdbcConnection mConn) {
this.stmt = s;
- this.mgr = m;
+ this.mConn = mConn;
}
- public MdbcStatement(Statement stmt, String sql, MusicSqlManager mgr) {
+ public MdbcStatement(Statement stmt, String sql, MdbcConnection mConn) {
//\TODO why there is a constructor with a sql parameter in a not PreparedStatement
this.stmt = stmt;
- this.mgr = mgr;
+ this.mConn = mConn;
}
@Override
@@ -73,9 +73,9 @@ public class MdbcStatement implements Statement {
logger.debug(EELFLoggerDelegate.applicationLogger,"executeQuery: "+sql);
ResultSet r = null;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
r = stmt.executeQuery(sql);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -92,9 +92,9 @@ public class MdbcStatement implements Statement {
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -175,9 +175,9 @@ public class MdbcStatement implements Statement {
boolean b = false;
//\TODO Add the result of the postStatementHook to b
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -287,9 +287,9 @@ public class MdbcStatement implements Statement {
logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql, autoGeneratedKeys);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -305,9 +305,9 @@ public class MdbcStatement implements Statement {
logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql, columnIndexes);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -323,9 +323,9 @@ public class MdbcStatement implements Statement {
logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql, columnNames);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -341,9 +341,9 @@ public class MdbcStatement implements Statement {
logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql, autoGeneratedKeys);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -359,9 +359,9 @@ public class MdbcStatement implements Statement {
logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql, columnIndexes);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -378,9 +378,9 @@ public class MdbcStatement implements Statement {
//\TODO Idem to the other execute without columnNames
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql, columnNames);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -423,9 +423,9 @@ public class MdbcStatement implements Statement {
protected void synchronizeTables(String sql) {
if (sql == null || sql.trim().toLowerCase().startsWith("create")) {
- if (mgr != null) {
+ if (mConn != null) {
try {
- mgr.synchronizeTables();
+ mConn.synchronizeTables();
} catch (QueryException e) {
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java
deleted file mode 100755
index c2019cf..0000000
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * ============LICENSE_START====================================================
- * org.onap.music.mdbc
- * =============================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END======================================================
- */
-package org.onap.music.mdbc;
-
-import java.sql.Connection;
-import java.util.*;
-
-import org.json.JSONObject;
-
-import org.onap.music.mdbc.mixins.DBInterface;
-import org.onap.music.mdbc.mixins.MixinFactory;
-import org.onap.music.mdbc.mixins.MusicInterface;
-import org.onap.music.mdbc.mixins.Utils;
-import org.onap.music.mdbc.tables.StagingTable;
-import org.onap.music.mdbc.tables.TxCommitProgress;
-import org.onap.music.exceptions.MDBCServiceException;
-import org.onap.music.exceptions.QueryException;
-import org.onap.music.logging.*;
-import org.onap.music.logging.format.AppMessages;
-import org.onap.music.logging.format.ErrorSeverity;
-import org.onap.music.logging.format.ErrorTypes;
-
-/**
-* <p>
-* MUSIC SQL Manager - code that helps take data written to a SQL database and seamlessly integrates it
-* with <a href="https://github.com/att/music">MUSIC</a> that maintains data in a No-SQL data-store
-* (<a href="http://cassandra.apache.org/">Cassandra</a>) and protects access to it with a distributed
-* locking service (based on <a href="https://zookeeper.apache.org/">Zookeeper</a>).
-* </p>
-* <p>
-* This code will support transactions by taking note of the value of the autoCommit flag, and of calls
-* to <code>commit()</code> and <code>rollback()</code>. These calls should be made by the user's JDBC
-* client.
-* </p>
-*
-* @author Bharath Balasubramanian, Robert Eby
-*/
-public class MusicSqlManager {
-
- private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicSqlManager.class);
-
- private final DBInterface dbi;
- private final MusicInterface mi;
- private final Set<String> table_set;
- private final HashMap<Range,StagingTable> transactionDigest;
- private boolean autocommit; // a copy of the autocommit flag from the JDBC Connection
-
- /**
- * Build a MusicSqlManager for a DB connection. This construct may only be called by getMusicSqlManager(),
- * which will ensure that only one MusicSqlManager is created per URL.
- * This is the location where the appropriate mixins to use for the MusicSqlManager should be determined.
- * They should be picked based upon the URL and the properties passed to this constructor.
- * <p>
- * At the present time, we only support the use of the H2Mixin (for access to a local H2 database),
- * with the CassandraMixin (for direct access to a Cassandra noSQL DB as the persistence layer).
- * </p>
- *
- * @param url the JDBC URL which was used to connection to the database
- * @param conn the actual connection to the database
- * @param info properties passed from the initial JDBC connect() call
- * @throws MDBCServiceException
- */
- public MusicSqlManager(String url, Connection conn, Properties info, MusicInterface mi) throws MDBCServiceException {
- try {
- info.putAll(MDBCUtils.getMdbcProperties());
- String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT);
- this.dbi = MixinFactory.createDBInterface(mixinDb, this, url, conn, info);
- this.mi = mi;
- this.table_set = Collections.synchronizedSet(new HashSet<String>());
- this.autocommit = true;
- this.transactionDigest = new HashMap<Range,StagingTable>();
-
- }catch(Exception e) {
- throw new MDBCServiceException(e.getMessage());
- }
- }
-
- public void setAutoCommit(boolean b,String txId, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException {
- if (b != autocommit) {
- autocommit = b;
- logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b);
- if (b) {
- // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction
- if(txId == null || txId.isEmpty()) {
- logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
- throw new MDBCServiceException("tx id is null");
- }
- commit(txId,progressKeeper,partition);
- }
- }
- }
-
- /**
- * Close this MusicSqlManager.
- */
- public void close() {
- if (dbi != null) {
- dbi.close();
- }
- }
-
- /**
- * Code to be run within the DB driver before a SQL statement is executed. This is where tables
- * can be synchronized before a SELECT, for those databases that do not support SELECT triggers.
- * @param sql the SQL statement that is about to be executed
- */
- public void preStatementHook(final String sql) {
- dbi.preStatementHook(sql);
- }
- /**
- * Code to be run within the DB driver after a SQL statement has been executed. This is where remote
- * statement actions can be copied back to Cassandra/MUSIC.
- * @param sql the SQL statement that was executed
- */
- public void postStatementHook(final String sql) {
- dbi.postStatementHook(sql,transactionDigest);
- }
- /**
- * Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the
- * proxy first starts, and whenever there is the possibility that tables were created or dropped. It is synchronized
- * in order to prevent multiple threads from running this code in parallel.
- */
- public synchronized void synchronizeTables() throws QueryException {
- Set<String> set1 = dbi.getSQLTableSet(); // set of tables in the database
- logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1);
- for (String tableName : set1) {
- // This map will be filled in if this table was previously discovered
- tableName = tableName.toUpperCase();
- if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) {
- logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName);
- try {
- TableInfo ti = dbi.getTableInfo(tableName);
- mi.initializeMusicForTable(ti,tableName);
- //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted
- ti = dbi.getTableInfo(tableName);
- mi.createDirtyRowTable(ti,tableName);
- dbi.createSQLTriggers(tableName);
- table_set.add(tableName);
- synchronizeTableData(tableName);
- logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" +
- table_set.size() + "/" + set1.size() + "tables uploaded");
- } catch (Exception e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
- //logger.error(EELFLoggerDelegate.errorLogger, "Exception synchronizeTables: "+e);
- throw new QueryException();
- }
- }
- }
-
-// Set<String> set2 = getMusicTableSet(music_ns);
- // not working - fix later
-// for (String tbl : set2) {
-// if (!set1.contains(tbl)) {
-// logger.debug("Old table dropped: "+tbl);
-// dropSQLTriggers(tbl, conn);
-// // ZZTODO drop camunda table ?
-// }
-// }
- }
-
- /**
- * On startup, copy dirty data from Cassandra to H2. May not be needed.
- * @param tableName
- */
- public void synchronizeTableData(String tableName) {
- // TODO - copy MUSIC -> H2
- dbi.synchronizeData(tableName);
- }
- /**
- * This method is called whenever there is a SELECT on a local SQL table, and should be called by the underlying databases
- * triggering mechanism. It first checks the local dirty bits table to see if there are any keys in Cassandra whose value
- * has not yet been sent to SQL. If there are, the appropriate values are copied from Cassandra to the local database.
- * Under normal execution, this function behaves as a NOP operation.
- * @param tableName This is the table on which the SELECT is being performed
- */
- public void readDirtyRowsAndUpdateDb(String tableName) {
- mi.readDirtyRowsAndUpdateDb(dbi,tableName);
- }
-
-
-
-
- /**
- * This method gets the primary key that the music interfaces uses by default.
- * If the front end uses a primary key, this will not match what is used in the MUSIC interface
- * @return
- */
- public String getMusicDefaultPrimaryKeyName() {
- return mi.getMusicDefaultPrimaryKeyName();
- }
-
- /**
- * Asks music interface to provide the function to create a primary key
- * e.g. uuid(), 1, "unique_aksd419fjc"
- * @return
- */
- public String generateUniqueKey() {
- //
- return mi.generateUniqueKey();
- }
-
-
- /**
- * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed,
- * they are performed now and copied into MUSIC.
- * @throws MDBCServiceException
- */
- public synchronized void commit(String txId, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException {
- logger.debug(EELFLoggerDelegate.applicationLogger, " commit ");
- // transaction was committed -- add all the updates into the REDO-Log in MUSIC
- try {
- mi.commitLog(dbi, partition, transactionDigest, txId, progressKeeper);
- }catch(MDBCServiceException e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
- throw e;
- }
- }
-
- /**
- * Perform a rollback, as requested by the JDBC driver. If any row updates have been delayed,
- * they are discarded.
- */
- public synchronized void rollback() {
- // transaction was rolled back - discard the updates
- logger.debug(EELFLoggerDelegate.applicationLogger, "Rollback");;
- transactionDigest.clear();
- }
-
- /**
- * Get all
- * @param table
- * @param dbRow
- * @return
- */
- public String getMusicKeyFromRowWithoutPrimaryIndexes(String table, JSONObject dbRow) {
- TableInfo ti = dbi.getTableInfo(table);
- return mi.getMusicKeyFromRowWithoutPrimaryIndexes(ti,table, dbRow);
- }
-
- @Deprecated
- public String getMusicKeyFromRow(String table, JSONObject dbRow) {
- TableInfo ti = dbi.getTableInfo(table);
- return mi.getMusicKeyFromRow(ti,table, dbRow);
- }
-
- /**
- * Returns all keys that matches the current sql statement, and not in already updated keys.
- *
- * @param sql the query that we are getting keys for
- * @deprecated
- */
- public ArrayList<String> getMusicKeys(String sql) {
- ArrayList<String> musicKeys = new ArrayList<String>();
- //\TODO See if this is required
- /*
- try {
- net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql);
- if (stmt instanceof Insert) {
- Insert s = (Insert) stmt;
- String tbl = s.getTable().getName();
- musicKeys.add(generatePrimaryKey());
- } else {
- String tbl;
- String where = "";
- if (stmt instanceof Update){
- Update u = (Update) stmt;
- tbl = u.getTables().get(0).getName();
- where = u.getWhere().toString();
- } else if (stmt instanceof Delete) {
- Delete d = (Delete) stmt;
- tbl = d.getTable().getName();
- if (d.getWhere()!=null) {
- where = d.getWhere().toString();
- }
- } else {
- System.err.println("Not recognized sql type");
- tbl = "";
- }
- String dbiSelect = "SELECT * FROM " + tbl;
- if (!where.equals("")) {
- dbiSelect += "WHERE" + where;
- }
- ResultSet rs = dbi.executeSQLRead(dbiSelect);
- musicKeys.addAll(getMusicKeysWhere(tbl, Utils.parseResults(dbi.getTableInfo(tbl), rs)));
- rs.getStatement().close();
- }
- } catch (JSQLParserException | SQLException e) {
-
- e.printStackTrace();
- }
- System.err.print("MusicKeys:");
- for(String musicKey:musicKeys) {
- System.out.print(musicKey + ",");
- }
- */
- return musicKeys;
- }
-
- public void own(List<Range> ranges) {
- throw new java.lang.UnsupportedOperationException("function not implemented yet");
- }
-
- public void appendRange(String rangeId, List<Range> ranges) {
- throw new java.lang.UnsupportedOperationException("function not implemented yet");
- }
-
- public void relinquish(String ownerId, String rangeId) {
- throw new java.lang.UnsupportedOperationException("function not implemented yet");
- }
-
-
-}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java
index 49b032e..3175f94 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java
@@ -60,11 +60,11 @@ public class ProxyStatement implements CallableStatement {
private static final String DATASTAX_PREFIX = "com.datastax.driver";
private final Statement stmt; // the Statement that we are proxying
- private final MusicSqlManager mgr;
+ private final MdbcConnection mConn;
- public ProxyStatement(Statement s, MusicSqlManager m) {
+ public ProxyStatement(Statement s, MdbcConnection mConn) {
this.stmt = s;
- this.mgr = m;
+ this.mConn = mConn;
}
@Override
@@ -82,9 +82,9 @@ public class ProxyStatement implements CallableStatement {
logger.debug("executeQuery: "+sql);
ResultSet r = null;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
r = stmt.executeQuery(sql);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -100,9 +100,9 @@ public class ProxyStatement implements CallableStatement {
logger.debug("executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -178,9 +178,9 @@ public class ProxyStatement implements CallableStatement {
logger.debug("execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -289,9 +289,9 @@ public class ProxyStatement implements CallableStatement {
logger.debug("executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql, autoGeneratedKeys);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -307,9 +307,9 @@ public class ProxyStatement implements CallableStatement {
logger.debug("executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql, columnIndexes);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -325,9 +325,9 @@ public class ProxyStatement implements CallableStatement {
logger.debug("executeUpdate: "+sql);
int n = 0;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
n = stmt.executeUpdate(sql, columnNames);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -343,9 +343,9 @@ public class ProxyStatement implements CallableStatement {
logger.debug("execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql, autoGeneratedKeys);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -361,9 +361,9 @@ public class ProxyStatement implements CallableStatement {
logger.debug("execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql, columnIndexes);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -379,9 +379,9 @@ public class ProxyStatement implements CallableStatement {
logger.debug("execute: "+sql);
boolean b = false;
try {
- mgr.preStatementHook(sql);
+ mConn.preStatementHook(sql);
b = stmt.execute(sql, columnNames);
- mgr.postStatementHook(sql);
+ mConn.postStatementHook(sql);
synchronizeTables(sql);
} catch (Exception e) {
String nm = e.getClass().getName();
@@ -1268,9 +1268,9 @@ public class ProxyStatement implements CallableStatement {
private void synchronizeTables(String sql) {
if (sql == null || sql.trim().toLowerCase().startsWith("create")) {
- if (mgr != null) {
+ if (mConn != null) {
try {
- mgr.synchronizeTables();
+ mConn.synchronizeTables();
} catch (QueryException e) {
e.printStackTrace();
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index 4bb0c85..1f2ad91 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -27,7 +27,6 @@ import org.onap.music.logging.format.ErrorSeverity;
import org.onap.music.logging.format.ErrorTypes;
import org.onap.music.mdbc.mixins.MixinFactory;
import org.onap.music.mdbc.mixins.MusicInterface;
-import org.onap.music.mdbc.mixins.MusicMixin;
import org.onap.music.mdbc.tables.MusicTxDigest;
import org.onap.music.mdbc.tables.TxCommitProgress;
@@ -151,7 +150,7 @@ public class StateManager {
}
- public void CloseConnection(String connectionId){
+ public void closeConnection(String connectionId){
//\TODO check if there is a race condition
if(mdbcConnections.containsKey(connectionId)) {
transactionInfo.deleteTxProgress(connectionId);
@@ -166,7 +165,7 @@ public class StateManager {
}
}
- public void OpenConnection(String id, Properties information){
+ public void openConnection(String id, Properties information){
if(!mdbcConnections.containsKey(id)){
Connection sqlConnection;
MdbcConnection newConnection;
@@ -208,7 +207,7 @@ public class StateManager {
* @param id of the transaction, created using
* @return
*/
- public Connection GetConnection(String id) {
+ public Connection getConnection(String id) {
if(mdbcConnections.containsKey(id)) {
//\TODO: Verify if this make sense
// Intent: reinitialize transaction progress, when it already completed the previous tx for the same connection
@@ -253,7 +252,7 @@ public class StateManager {
return newConnection;
}
- public void InitializeSystem() {
+ public void initializeSystem() {
//\TODO Prefetch data to system using the data ranges as guide
throw new UnsupportedOperationException("Function initialize system needs to be implemented id MdbcStateManager");
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
index f3f5d22..3e3447d 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
@@ -24,7 +24,7 @@ import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.RedoRow;
-import org.onap.music.mdbc.mixins.CassandraMixin;
+import org.onap.music.mdbc.mixins.MusicMixin;
import org.onap.music.mdbc.tables.MusicTxDigest;
import com.google.gson.Gson;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java
index eede9be..35293ef 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java
@@ -22,7 +22,7 @@ package org.onap.music.mdbc.examples;
import java.sql.*;
import org.apache.calcite.avatica.remote.Driver;
-public class EtdbTestClient {
+public class MdbcTestClient {
public static class Hr {
public final Employee[] emps = {
@@ -145,7 +145,6 @@ public class EtdbTestClient {
}
try {
- connection.commit();
connection.close();
} catch (SQLException e) {
e.printStackTrace();
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java
deleted file mode 100755
index 3b1651f..0000000
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java
+++ /dev/null
@@ -1,1502 +0,0 @@
-/*
- * ============LICENSE_START====================================================
- * org.onap.music.mdbc
- * =============================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END======================================================
- */
-package org.onap.music.mdbc.mixins;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.nio.ByteBuffer;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
-
-import org.onap.music.mdbc.*;
-import org.onap.music.mdbc.tables.MusicTxDigestId;
-import org.onap.music.mdbc.tables.StagingTable;
-import org.onap.music.mdbc.tables.MriReference;
-import org.onap.music.mdbc.tables.MusicRangeInformationRow;
-import org.onap.music.mdbc.tables.MusicTxDigest;
-import org.onap.music.mdbc.tables.TxCommitProgress;
-
-import org.json.JSONObject;
-import org.onap.music.datastore.CassaLockStore;
-import org.onap.music.datastore.PreparedQueryObject;
-import org.onap.music.exceptions.MusicLockingException;
-import org.onap.music.exceptions.MusicQueryException;
-import org.onap.music.exceptions.MusicServiceException;
-import org.onap.music.main.MusicCore;
-import org.onap.music.main.MusicCore.Condition;
-import org.onap.music.main.ResultType;
-import org.onap.music.main.ReturnType;
-
-import org.onap.music.exceptions.MDBCServiceException;
-import org.onap.music.logging.EELFLoggerDelegate;
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.ColumnDefinitions;
-import com.datastax.driver.core.DataType;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.TupleValue;
-
-/**
- * This class provides the methods that MDBC needs to access Cassandra directly in order to provide persistence
- * to calls to the user's DB. It does not do any table or row locking.
- *
- * <p>This code only supports the following limited list of H2 and Cassandra data types:</p>
- * <table summary="">
- * <tr><th>H2 Data Type</th><th>Mapped to Cassandra Data Type</th></tr>
- * <tr><td>BIGINT</td><td>BIGINT</td></tr>
- * <tr><td>BOOLEAN</td><td>BOOLEAN</td></tr>
- * <tr><td>CLOB</td><td>BLOB</td></tr>
- * <tr><td>DOUBLE</td><td>DOUBLE</td></tr>
- * <tr><td>INTEGER</td><td>INT</td></tr>
- * <tr><td>TIMESTAMP</td><td>TIMESTAMP</td></tr>
- * <tr><td>VARBINARY</td><td>BLOB</td></tr>
- * <tr><td>VARCHAR</td><td>VARCHAR</td></tr>
- * </table>
- *
- * @author Robert P. Eby
- */
-public class CassandraMixin implements MusicInterface {
- /** The property name to use to identify this replica to MusicSqlManager */
- public static final String KEY_MY_ID = "myid";
- /** The property name to use for the comma-separated list of replica IDs. */
- public static final String KEY_REPLICAS = "replica_ids";
- /** The property name to use to identify the IP address for Cassandra. */
- public static final String KEY_MUSIC_ADDRESS = "cassandra.host";
- /** The property name to use to provide the replication factor for Cassandra. */
- public static final String KEY_MUSIC_RFACTOR = "music_rfactor";
- /** The property name to use to provide the replication factor for Cassandra. */
- public static final String KEY_MUSIC_NAMESPACE = "music_namespace";
- /** Namespace for the tables in MUSIC (Cassandra) */
- public static final String DEFAULT_MUSIC_NAMESPACE = "namespace";
- /** The default property value to use for the Cassandra IP address. */
- public static final String DEFAULT_MUSIC_ADDRESS = "localhost";
- /** The default property value to use for the Cassandra replication factor. */
- public static final int DEFAULT_MUSIC_RFACTOR = 1;
- /** The default primary string column, if none is provided. */
- public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid";
- /** Type of the primary key, if none is defined by the user */
- public static final String MDBC_PRIMARYKEY_TYPE = "uuid";
-
-
- //\TODO Add logic to change the names when required and create the tables when necessary
- private String musicTxDigestTableName = "musictxdigest";
- private String musicRangeInformationTableName = "musicrangeinformation";
-
- private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassandraMixin.class);
-
- private static final Map<Integer, String> typemap = new HashMap<>();
- static {
- // We only support the following type mappings currently (from DB -> Cassandra).
- // Anything else will likely cause a NullPointerException
- typemap.put(Types.BIGINT, "BIGINT"); // aka. IDENTITY
- typemap.put(Types.BLOB, "VARCHAR");
- typemap.put(Types.BOOLEAN, "BOOLEAN");
- typemap.put(Types.CLOB, "BLOB");
- typemap.put(Types.DATE, "VARCHAR");
- typemap.put(Types.DOUBLE, "DOUBLE");
- typemap.put(Types.DECIMAL, "DECIMAL");
- typemap.put(Types.INTEGER, "INT");
- //typemap.put(Types.TIMESTAMP, "TIMESTAMP");
- typemap.put(Types.SMALLINT, "SMALLINT");
- typemap.put(Types.TIMESTAMP, "VARCHAR");
- typemap.put(Types.VARBINARY, "BLOB");
- typemap.put(Types.VARCHAR, "VARCHAR");
- typemap.put(Types.CHAR, "VARCHAR");
- //The "Hacks", these don't have a direct mapping
- //typemap.put(Types.DATE, "VARCHAR");
- //typemap.put(Types.DATE, "TIMESTAMP");
- }
-
- protected final String music_ns;
- protected final String myId;
- protected final String[] allReplicaIds;
- private final String musicAddress;
- private final int music_rfactor;
- private MusicConnector mCon = null;
- private Session musicSession = null;
- private boolean keyspace_created = false;
- private Map<String, PreparedStatement> ps_cache = new HashMap<>();
- private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>());
-
- public CassandraMixin() {
- //this.logger = null;
- this.musicAddress = null;
- this.music_ns = null;
- this.music_rfactor = 0;
- this.myId = null;
- this.allReplicaIds = null;
- }
-
- public CassandraMixin(String url, Properties info) throws MDBCServiceException {
- // Default values -- should be overridden in the Properties
- // Default to using the host_ids of the various peers as the replica IDs (this is probably preferred)
- this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS);
- logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: musicAddress="+musicAddress);
-
- String s = info.getProperty(KEY_MUSIC_RFACTOR);
- this.music_rfactor = (s == null) ? DEFAULT_MUSIC_RFACTOR : Integer.parseInt(s);
-
- this.myId = info.getProperty(KEY_MY_ID, getMyHostId());
- logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: myId="+myId);
-
-
- this.allReplicaIds = info.getProperty(KEY_REPLICAS, getAllHostIds()).split(",");
- logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: allReplicaIds="+info.getProperty(KEY_REPLICAS, this.myId));
-
- this.music_ns = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE);
- logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns);
- createKeyspace();
- }
-
- /**
- * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables.
- * The keyspace name comes from the initialization properties passed to the JDBC driver.
- */
- @Override
- public void createKeyspace() throws MDBCServiceException {
-
- Map<String,Object> replicationInfo = new HashMap<>();
- replicationInfo.put("'class'", "'SimpleStrategy'");
- replicationInfo.put("'replication_factor'", music_rfactor);
-
- PreparedQueryObject queryObject = new PreparedQueryObject();
- queryObject.appendQueryString(
- "CREATE KEYSPACE IF NOT EXISTS " + this.music_ns +
- " WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":"));
-
- try {
- MusicCore.nonKeyRelatedPut(queryObject, "eventual");
- } catch (MusicServiceException e) {
- if (!e.getMessage().equals("Keyspace "+music_ns+" already exists")) {
- throw new MDBCServiceException("Error creating namespace: "+music_ns+". Internal error:"+e.getErrorMessage());
- }
- }
- }
-
- private String getMyHostId() {
- ResultSet rs = executeMusicRead("SELECT HOST_ID FROM SYSTEM.LOCAL");
- Row row = rs.one();
- return (row == null) ? "UNKNOWN" : row.getUUID("HOST_ID").toString();
- }
- private String getAllHostIds() {
- ResultSet results = executeMusicRead("SELECT HOST_ID FROM SYSTEM.PEERS");
- StringBuilder sb = new StringBuilder(myId);
- for (Row row : results) {
- sb.append(",");
- sb.append(row.getUUID("HOST_ID").toString());
- }
- return sb.toString();
- }
-
- /**
- * Get the name of this MusicInterface mixin object.
- * @return the name
- */
- @Override
- public String getMixinName() {
- return "cassandra";
- }
- /**
- * Do what is needed to close down the MUSIC connection.
- */
- @Override
- public void close() {
- if (musicSession != null) {
- musicSession.close();
- musicSession = null;
- }
- }
- @Override
- public void initializeMetricDataStructures() throws MDBCServiceException {
- try {
- createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number
- createMusicRangeInformationTable();
- }
- catch(MDBCServiceException e){
- logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC");
- }
- }
-
- /**
- * This method performs all necessary initialization in Music/Cassandra to store the table <i>tableName</i>.
- * @param tableName the table to initialize MUSIC for
- */
- @Override
- public void initializeMusicForTable(TableInfo ti, String tableName) {
- /**
- * This code creates two tables for every table in SQL:
- * (i) a table with the exact same name as the SQL table storing the SQL data.
- * (ii) a "dirty bits" table that stores the keys in the Cassandra table that are yet to be
- * updated in the SQL table (they were written by some other node).
- */
- StringBuilder fields = new StringBuilder();
- StringBuilder prikey = new StringBuilder();
- String pfx = "", pfx2 = "";
- for (int i = 0; i < ti.columns.size(); i++) {
- fields.append(pfx)
- .append(ti.columns.get(i))
- .append(" ")
- .append(typemap.get(ti.coltype.get(i)));
- if (ti.iskey.get(i)) {
- // Primary key column
- prikey.append(pfx2).append(ti.columns.get(i));
- pfx2 = ", ";
- }
- pfx = ", ";
- }
- if (prikey.length()==0) {
- fields.append(pfx).append(MDBC_PRIMARYKEY_NAME)
- .append(" ")
- .append(MDBC_PRIMARYKEY_TYPE);
- prikey.append(MDBC_PRIMARYKEY_NAME);
- }
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", music_ns, tableName, fields.toString(), prikey.toString());
- executeMusicWriteQuery(cql);
- }
-
- // **************************************************
- // Dirty Tables (in MUSIC) methods
- // **************************************************
-
- /**
- * Create a <i>dirty row</i> table for the real table <i>tableName</i>. The primary keys columns from the real table are recreated in
- * the dirty table, along with a "REPLICA__" column that names the replica that should update it's internal state from MUSIC.
- * @param tableName the table to create a "dirty" table for
- */
- @Override
- public void createDirtyRowTable(TableInfo ti, String tableName) {
- // create dirtybitsTable at all replicas
-// for (String repl : allReplicaIds) {
-//// String dirtyRowsTableName = "dirty_"+tableName+"_"+allReplicaIds[i];
-//// String dirtyTableQuery = "CREATE TABLE IF NOT EXISTS "+music_ns+"."+ dirtyRowsTableName+" (dirtyRowKeys text PRIMARY KEY);";
-// cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s_%s (dirtyRowKeys TEXT PRIMARY KEY);", music_ns, tableName, repl);
-// executeMusicWriteQuery(cql);
-// }
- StringBuilder ddl = new StringBuilder("REPLICA__ TEXT");
- StringBuilder cols = new StringBuilder("REPLICA__");
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- // Only use the primary keys columns in the "Dirty" table
- ddl.append(", ")
- .append(ti.columns.get(i))
- .append(" ")
- .append(typemap.get(ti.coltype.get(i)));
- cols.append(", ").append(ti.columns.get(i));
- }
- }
- if(cols.length()==0) {
- //fixme
- System.err.println("Create dirty row table found no primary key");
- }
- ddl.append(", PRIMARY KEY(").append(cols).append(")");
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s (%s);", music_ns, tableName, ddl.toString());
- executeMusicWriteQuery(cql);
- }
- /**
- * Drop the dirty row table for <i>tableName</i> from MUSIC.
- * @param tableName the table being dropped
- */
- @Override
- public void dropDirtyRowTable(String tableName) {
- String cql = String.format("DROP TABLE %s.DIRTY_%s;", music_ns, tableName);
- executeMusicWriteQuery(cql);
- }
- /**
- * Mark rows as "dirty" in the dirty rows table for <i>tableName</i>. Rows are marked for all replicas but
- * this one (this replica already has the up to date data).
- * @param tableName the table we are marking dirty
- * @param keys an ordered list of the values being put into the table. The values that correspond to the tables'
- * primary key are copied into the dirty row table.
- */
- @Override
- public void markDirtyRow(TableInfo ti, String tableName, JSONObject keys) {
- Object[] keyObj = getObjects(ti,tableName, keys);
- StringBuilder cols = new StringBuilder("REPLICA__");
- PreparedQueryObject pQueryObject = null;
- StringBuilder vals = new StringBuilder("?");
- List<Object> vallist = new ArrayList<Object>();
- vallist.add(""); // placeholder for replica
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- cols.append(", ").append(ti.columns.get(i));
- vals.append(", ").append("?");
- vallist.add(keyObj[i]);
- }
- }
- if(cols.length()==0) {
- //FIXME
- System.err.println("markDIrtyRow need to fix primary key");
- }
- String cql = String.format("INSERT INTO %s.DIRTY_%s (%s) VALUES (%s);", music_ns, tableName, cols.toString(), vals.toString());
- /*Session sess = getMusicSession();
- PreparedStatement ps = getPreparedStatementFromCache(cql);*/
- String primaryKey;
- if(ti.hasKey()) {
- primaryKey = getMusicKeyFromRow(ti,tableName, keys);
- }
- else {
- primaryKey = getMusicKeyFromRowWithoutPrimaryIndexes(ti,tableName, keys);
- }
- System.out.println("markDirtyRow: PK value: "+primaryKey);
-
- Object pkObj = null;
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- pkObj = keyObj[i];
- }
- }
- for (String repl : allReplicaIds) {
- pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- pQueryObject.addValue(tableName);
- pQueryObject.addValue(repl);
- pQueryObject.addValue(pkObj);
- updateMusicDB(tableName, primaryKey, pQueryObject);
- //if (!repl.equals(myId)) {
- /*logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
- vallist.set(0, repl);
- BoundStatement bound = ps.bind(vallist.toArray());
- bound.setReadTimeoutMillis(60000);
- synchronized (sess) {
- sess.execute(bound);
- }*/
- //}
-
- }
- }
- /**
- * Remove the entries from the dirty row (for this replica) that correspond to a set of primary keys
- * @param tableName the table we are removing dirty entries from
- * @param keys the primary key values to use in the DELETE. Note: this is *only* the primary keys, not a full table row.
- */
- @Override
- public void cleanDirtyRow(TableInfo ti, String tableName, JSONObject keys) {
- Object[] keysObjects = getObjects(ti,tableName,keys);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- StringBuilder cols = new StringBuilder("REPLICA__=?");
- List<Object> vallist = new ArrayList<Object>();
- vallist.add(myId);
- int n = 0;
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- cols.append(" AND ").append(ti.columns.get(i)).append("=?");
- vallist.add(keysObjects[n++]);
- pQueryObject.addValue(keysObjects[n++]);
- }
- }
- String cql = String.format("DELETE FROM %s.DIRTY_%s WHERE %s;", music_ns, tableName, cols.toString());
- logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
- pQueryObject.appendQueryString(cql);
- ReturnType rt = MusicCore.eventualPut(pQueryObject);
- if(rt.getResult().getResult().toLowerCase().equals("failure")) {
- System.out.println("Failure while cleanDirtyRow..."+rt.getMessage());
- }
- /*Session sess = getMusicSession();
- PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(vallist.toArray());
- bound.setReadTimeoutMillis(60000);
- synchronized (sess) {
- sess.execute(bound);
- }*/
- }
- /**
- * Get a list of "dirty rows" for a table. The dirty rows returned apply only to this replica,
- * and consist of a Map of primary key column names and values.
- * @param tableName the table we are querying for
- * @return a list of maps; each list item is a map of the primary key names and values for that "dirty row".
- */
- @Override
- public List<Map<String,Object>> getDirtyRows(TableInfo ti, String tableName) {
- String cql = String.format("SELECT * FROM %s.DIRTY_%s WHERE REPLICA__=?;", music_ns, tableName);
- ResultSet results = null;
- logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
-
- /*Session sess = getMusicSession();
- PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(new Object[] { myId });
- bound.setReadTimeoutMillis(60000);
- synchronized (sess) {
- results = sess.execute(bound);
- }*/
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- try {
- results = MusicCore.get(pQueryObject);
- } catch (MusicServiceException e) {
-
- e.printStackTrace();
- }
-
- ColumnDefinitions cdef = results.getColumnDefinitions();
- List<Map<String,Object>> list = new ArrayList<Map<String,Object>>();
- for (Row row : results) {
- Map<String,Object> objs = new HashMap<String,Object>();
- for (int i = 0; i < cdef.size(); i++) {
- String colname = cdef.getName(i).toUpperCase();
- String coltype = cdef.getType(i).getName().toString().toUpperCase();
- if (!colname.equals("REPLICA__")) {
- switch (coltype) {
- case "BIGINT":
- objs.put(colname, row.getLong(colname));
- break;
- case "BOOLEAN":
- objs.put(colname, row.getBool(colname));
- break;
- case "BLOB":
- objs.put(colname, row.getString(colname));
- break;
- case "DATE":
- objs.put(colname, row.getString(colname));
- break;
- case "DOUBLE":
- objs.put(colname, row.getDouble(colname));
- break;
- case "DECIMAL":
- objs.put(colname, row.getDecimal(colname));
- break;
- case "INT":
- objs.put(colname, row.getInt(colname));
- break;
- case "TIMESTAMP":
- objs.put(colname, row.getTimestamp(colname));
- break;
- case "VARCHAR":
- default:
- objs.put(colname, row.getString(colname));
- break;
- }
- }
- }
- list.add(objs);
- }
- return list;
- }
-
- /**
- * Drops the named table and its dirty row table (for all replicas) from MUSIC. The dirty row table is dropped first.
- * @param tableName This is the table that has been dropped
- */
- @Override
- public void clearMusicForTable(String tableName) {
- dropDirtyRowTable(tableName);
- String cql = String.format("DROP TABLE %s.%s;", music_ns, tableName);
- executeMusicWriteQuery(cql);
- }
- /**
- * This function is called whenever there is a DELETE to a row on a local SQL table, wherein it updates the
- * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. MUSIC propagates
- * it to the other replicas.
- *
- * @param tableName This is the table that has changed.
- * @param oldRow This is a copy of the old row being deleted
- */
- @Override
- public void deleteFromEntityTableInMusic(TableInfo ti, String tableName, JSONObject oldRow) {
- Object[] objects = getObjects(ti,tableName,oldRow);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- if (ti.hasKey()) {
- assert(ti.columns.size() == objects.length);
- } else {
- assert(ti.columns.size()+1 == objects.length);
- }
-
- StringBuilder where = new StringBuilder();
- List<Object> vallist = new ArrayList<Object>();
- String pfx = "";
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- where.append(pfx)
- .append(ti.columns.get(i))
- .append("=?");
- vallist.add(objects[i]);
- pQueryObject.addValue(objects[i]);
- pfx = " AND ";
- }
- }
- if (!ti.hasKey()) {
- where.append(MDBC_PRIMARYKEY_NAME + "=?");
- //\FIXME this is wrong, old row is not going to contain the UUID, this needs to be fixed
- vallist.add(UUID.fromString((String) objects[0]));
- pQueryObject.addValue(UUID.fromString((String) objects[0]));
- }
-
- String cql = String.format("DELETE FROM %s.%s WHERE %s;", music_ns, tableName, where.toString());
- logger.error(EELFLoggerDelegate.errorLogger,"Executing MUSIC write:"+ cql);
- pQueryObject.appendQueryString(cql);
-
- /*PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(vallist.toArray());
- bound.setReadTimeoutMillis(60000);
- Session sess = getMusicSession();
- synchronized (sess) {
- sess.execute(bound);
- }*/
- String primaryKey = getMusicKeyFromRow(ti,tableName, oldRow);
-
- updateMusicDB(tableName, primaryKey, pQueryObject);
-
- // Mark the dirty rows in music for all the replicas but us
- markDirtyRow(ti,tableName, oldRow);
- }
-
- public Set<String> getMusicTableSet(String ns) {
- Set<String> set = new TreeSet<String>();
- String cql = String.format("SELECT TABLE_NAME FROM SYSTEM_SCHEMA.TABLES WHERE KEYSPACE_NAME = '%s'", ns);
- ResultSet rs = executeMusicRead(cql);
- for (Row row : rs) {
- set.add(row.getString("TABLE_NAME").toUpperCase());
- }
- return set;
- }
- /**
- * This method is called whenever there is a SELECT on a local SQL table, wherein it first checks the local
- * dirty bits table to see if there are any keys in Cassandra whose value has not yet been sent to SQL
- * @param tableName This is the table on which the select is being performed
- */
- @Override
- public void readDirtyRowsAndUpdateDb(DBInterface dbi, String tableName) {
- // Read dirty rows of this table from Music
- TableInfo ti = dbi.getTableInfo(tableName);
- List<Map<String,Object>> objlist = getDirtyRows(ti,tableName);
- PreparedQueryObject pQueryObject = null;
- String pre_cql = String.format("SELECT * FROM %s.%s WHERE ", music_ns, tableName);
- List<Object> vallist = new ArrayList<Object>();
- StringBuilder sb = new StringBuilder();
- //\TODO Perform a batch operation instead of each row at a time
- for (Map<String,Object> map : objlist) {
- pQueryObject = new PreparedQueryObject();
- sb.setLength(0);
- vallist.clear();
- String pfx = "";
- for (String key : map.keySet()) {
- sb.append(pfx).append(key).append("=?");
- vallist.add(map.get(key));
- pQueryObject.addValue(map.get(key));
- pfx = " AND ";
- }
-
- String cql = pre_cql + sb.toString();
- System.out.println("readDirtyRowsAndUpdateDb: cql: "+cql);
- pQueryObject.appendQueryString(cql);
- ResultSet dirtyRows = null;
- try {
- //\TODO Why is this an eventual put?, this should be an atomic
- dirtyRows = MusicCore.get(pQueryObject);
- } catch (MusicServiceException e) {
-
- e.printStackTrace();
- }
- /*
- Session sess = getMusicSession();
- PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(vallist.toArray());
- bound.setReadTimeoutMillis(60000);
- ResultSet dirtyRows = null;
- synchronized (sess) {
- dirtyRows = sess.execute(bound);
- }*/
- List<Row> rows = dirtyRows.all();
- if (rows.isEmpty()) {
- // No rows, the row must have been deleted
- deleteRowFromSqlDb(dbi,tableName, map);
- } else {
- for (Row row : rows) {
- writeMusicRowToSQLDb(dbi,tableName, row);
- }
- }
- }
- }
-
- private void deleteRowFromSqlDb(DBInterface dbi, String tableName, Map<String, Object> map) {
- dbi.deleteRowFromSqlDb(tableName, map);
- TableInfo ti = dbi.getTableInfo(tableName);
- List<Object> vallist = new ArrayList<Object>();
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- String col = ti.columns.get(i);
- Object val = map.get(col);
- vallist.add(val);
- }
- }
- cleanDirtyRow(ti, tableName, new JSONObject(vallist));
- }
- /**
- * This functions copies the contents of a row in Music into the corresponding row in the SQL table
- * @param tableName This is the name of the table in both Music and swl
- * @param musicRow This is the row in Music that is being copied into SQL
- */
- private void writeMusicRowToSQLDb(DBInterface dbi, String tableName, Row musicRow) {
- // First construct the map of columns and their values
- TableInfo ti = dbi.getTableInfo(tableName);
- Map<String, Object> map = new HashMap<String, Object>();
- List<Object> vallist = new ArrayList<Object>();
- String rowid = tableName;
- for (String col : ti.columns) {
- Object val = getValue(musicRow, col);
- map.put(col, val);
- if (ti.iskey(col)) {
- vallist.add(val);
- rowid += "_" + val.toString();
- }
- }
-
- logger.debug("Blocking rowid: "+rowid);
- in_progress.add(rowid); // Block propagation of the following INSERT/UPDATE
-
- dbi.insertRowIntoSqlDb(tableName, map);
-
- logger.debug("Unblocking rowid: "+rowid);
- in_progress.remove(rowid); // Unblock propagation
-
-// try {
-// String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString());
-// executeSQLWrite(sql);
-// } catch (SQLException e) {
-// logger.debug("Insert failed because row exists, do an update");
-// // TODO - rewrite this UPDATE command should not update key fields
-// String sql = String.format("UPDATE %s SET (%s) = (%s) WHERE %s", tableName, fields.toString(), values.toString(), where.toString());
-// try {
-// executeSQLWrite(sql);
-// } catch (SQLException e1) {
-// e1.printStackTrace();
-// }
-// }
-
- ti = dbi.getTableInfo(tableName);
- cleanDirtyRow(ti, tableName, new JSONObject(vallist));
-
-// String selectQuery = "select "+ primaryKeyName+" FROM "+tableName+" WHERE "+primaryKeyName+"="+primaryKeyValue+";";
-// java.sql.ResultSet rs = executeSQLRead(selectQuery);
-// String dbWriteQuery=null;
-// try {
-// if(rs.next()){//this entry is there, do an update
-// dbWriteQuery = "UPDATE "+tableName+" SET "+columnNameString+" = "+ valueString +"WHERE "+primaryKeyName+"="+primaryKeyValue+";";
-// }else
-// dbWriteQuery = "INSERT INTO "+tableName+" VALUES"+valueString+";";
-// executeSQLWrite(dbWriteQuery);
-// } catch (SQLException e) {
-// // ZZTODO Auto-generated catch block
-// e.printStackTrace();
-// }
-
- //clean the music dirty bits table
-// String dirtyRowIdsTableName = music_ns+".DIRTY_"+tableName+"_"+myId;
-// String deleteQuery = "DELETE FROM "+dirtyRowIdsTableName+" WHERE dirtyRowKeys=$$"+primaryKeyValue+"$$;";
-// executeMusicWriteQuery(deleteQuery);
- }
- private Object getValue(Row musicRow, String colname) {
- ColumnDefinitions cdef = musicRow.getColumnDefinitions();
- DataType colType;
- try {
- colType= cdef.getType(colname);
- }
- catch(IllegalArgumentException e) {
- logger.warn("Colname is not part of table metadata: "+e);
- throw e;
- }
- String typeStr = colType.getName().toString().toUpperCase();
- switch (typeStr) {
- case "BIGINT":
- return musicRow.getLong(colname);
- case "BOOLEAN":
- return musicRow.getBool(colname);
- case "BLOB":
- return musicRow.getString(colname);
- case "DATE":
- return musicRow.getString(colname);
- case "DECIMAL":
- return musicRow.getDecimal(colname);
- case "DOUBLE":
- return musicRow.getDouble(colname);
- case "SMALLINT":
- case "INT":
- return musicRow.getInt(colname);
- case "TIMESTAMP":
- return musicRow.getTimestamp(colname);
- case "UUID":
- return musicRow.getUUID(colname);
- default:
- logger.error(EELFLoggerDelegate.errorLogger, "UNEXPECTED COLUMN TYPE: columname="+colname+", columntype="+typeStr);
- // fall thru
- case "VARCHAR":
- return musicRow.getString(colname);
- }
- }
-
- /**
- * This method is called whenever there is an INSERT or UPDATE to a local SQL table, wherein it updates the
- * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. Music propagates
- * it to the other replicas.
- *
- * @param tableName This is the table that has changed.
- * @param changedRow This is information about the row that has changed
- */
- @Override
- public void updateDirtyRowAndEntityTableInMusic(TableInfo ti, String tableName, JSONObject changedRow) {
- // Build the CQL command
- Object[] objects = getObjects(ti,tableName,changedRow);
- StringBuilder fields = new StringBuilder();
- StringBuilder values = new StringBuilder();
- String rowid = tableName;
- Object[] newrow = new Object[objects.length];
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- String pfx = "";
- int keyoffset=0;
- for (int i = 0; i < objects.length; i++) {
- if (!ti.hasKey() && i==0) {
- //We need to tack on cassandra's uid in place of a primary key
- fields.append(MDBC_PRIMARYKEY_NAME);
- values.append("?");
- newrow[i] = UUID.fromString((String) objects[i]);
- pQueryObject.addValue(newrow[i]);
- keyoffset=-1;
- pfx = ", ";
- continue;
- }
- fields.append(pfx).append(ti.columns.get(i+keyoffset));
- values.append(pfx).append("?");
- pfx = ", ";
- if (objects[i] instanceof byte[]) {
- // Cassandra doesn't seem to have a Codec to translate a byte[] to a ByteBuffer
- newrow[i] = ByteBuffer.wrap((byte[]) objects[i]);
- pQueryObject.addValue(newrow[i]);
- } else if (objects[i] instanceof Reader) {
- // Cassandra doesn't seem to have a Codec to translate a Reader to a ByteBuffer either...
- newrow[i] = ByteBuffer.wrap(readBytesFromReader((Reader) objects[i]));
- pQueryObject.addValue(newrow[i]);
- } else {
- newrow[i] = objects[i];
- pQueryObject.addValue(newrow[i]);
- }
- if (i+keyoffset>=0 && ti.iskey.get(i+keyoffset)) {
- rowid += "_" + newrow[i].toString();
- }
- }
-
- if (in_progress.contains(rowid)) {
- // This call to updateDirtyRowAndEntityTableInMusic() was called as a result of a Cassandra -> H2 update; ignore
- logger.debug(EELFLoggerDelegate.applicationLogger, "updateDirtyRowAndEntityTableInMusic: bypassing MUSIC update on "+rowid);
-
- } else {
- // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update
- String cql = String.format("INSERT INTO %s.%s (%s) VALUES (%s);", music_ns, tableName, fields.toString(), values.toString());
-
- pQueryObject.appendQueryString(cql);
- String primaryKey = getMusicKeyFromRow(ti,tableName, changedRow);
- updateMusicDB(tableName, primaryKey, pQueryObject);
-
- /*PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(newrow);
- bound.setReadTimeoutMillis(60000);
- Session sess = getMusicSession();
- synchronized (sess) {
- sess.execute(bound);
- }*/
- // Mark the dirty rows in music for all the replicas but us
- markDirtyRow(ti,tableName, changedRow);
- }
- }
-
-
-
- private byte[] readBytesFromReader(Reader rdr) {
- StringBuilder sb = new StringBuilder();
- try {
- int ch;
- while ((ch = rdr.read()) >= 0) {
- sb.append((char)ch);
- }
- } catch (IOException e) {
- logger.warn("readBytesFromReader: "+e);
- }
- return sb.toString().getBytes();
- }
-
- protected PreparedStatement getPreparedStatementFromCache(String cql) {
- // Note: have to hope that the Session never changes!
- if (!ps_cache.containsKey(cql)) {
- Session sess = getMusicSession();
- PreparedStatement ps = sess.prepare(cql);
- ps_cache.put(cql, ps);
- }
- return ps_cache.get(cql);
- }
-
- /**
- * This method gets a connection to Music
- * @return the Cassandra Session to use
- */
- protected Session getMusicSession() {
- // create cassandra session
- if (musicSession == null) {
- logger.info(EELFLoggerDelegate.applicationLogger, "Creating New Music Session");
- mCon = new MusicConnector(musicAddress);
- musicSession = mCon.getSession();
- }
- return musicSession;
- }
-
- /**
- * This method executes a write query in Music
- * @param cql the CQL to be sent to Cassandra
- */
- protected void executeMusicWriteQuery(String cql) {
- logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- ReturnType rt = MusicCore.eventualPut(pQueryObject);
- if(rt.getResult().getResult().toLowerCase().equals("failure")) {
- logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage());
- }
- /*Session sess = getMusicSession();
- SimpleStatement s = new SimpleStatement(cql);
- s.setReadTimeoutMillis(60000);
- synchronized (sess) {
- sess.execute(s);
- }*/
- }
-
- /**
- * This method executes a read query in Music
- * @param cql the CQL to be sent to Cassandra
- * @return a ResultSet containing the rows returned from the query
- */
- protected ResultSet executeMusicRead(String cql) {
- logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- ResultSet results = null;
- try {
- results = MusicCore.get(pQueryObject);
- } catch (MusicServiceException e) {
-
- e.printStackTrace();
- }
- return results;
- /*Session sess = getMusicSession();
- synchronized (sess) {
- return sess.execute(cql);
- }*/
- }
-
- /**
- * Returns the default primary key name that this mixin uses
- */
- public String getMusicDefaultPrimaryKeyName() {
- return MDBC_PRIMARYKEY_NAME;
- }
-
- /**
- * Return the function for cassandra's primary key generation
- */
- public String generateUniqueKey() {
- return MDBCUtils.generateUniqueKey().toString();
- }
-
- @Override
- public String getMusicKeyFromRowWithoutPrimaryIndexes(TableInfo ti, String table, JSONObject dbRow) {
- //\TODO this operation is super expensive to perform, both latency and BW
- // it is better to add additional where clauses, and have the primary key
- // to be composed of known columns of the table
- // Adding this primary indexes would be an additional burden to the developers, which spanner
- // also does, but otherwise performance is really bad
- // At least it should have a set of columns that are guaranteed to be unique
- StringBuilder cqlOperation = new StringBuilder();
- cqlOperation.append("SELECT * FROM ")
- .append(music_ns)
- .append(".")
- .append(table);
- ResultSet musicResults = executeMusicRead(cqlOperation.toString());
- Object[] dbRowObjects = getObjects(ti,table,dbRow);
- while (!musicResults.isExhausted()) {
- Row musicRow = musicResults.one();
- if (rowIs(ti, musicRow, dbRowObjects)) {
- return ((UUID)getValue(musicRow, MDBC_PRIMARYKEY_NAME)).toString();
- }
- }
- //should never reach here
- return null;
- }
-
- /**
- * Checks to see if this row is in list of database entries
- * @param ti
- * @param musicRow
- * @param dbRow
- * @return
- */
- private boolean rowIs(TableInfo ti, Row musicRow, Object[] dbRow) {
- //System.out.println("Comparing " + musicRow.toString());
- boolean sameRow=true;
- for (int i=0; i<ti.columns.size(); i++) {
- Object val = getValue(musicRow, ti.columns.get(i));
- if (!dbRow[i].equals(val)) {
- sameRow=false;
- break;
- }
- }
- return sameRow;
- }
-
- @Override
- public String getMusicKeyFromRow(TableInfo ti, String tableName, JSONObject row) {
- List<String> keyCols = ti.getKeyColumns();
- if(keyCols.isEmpty()){
- throw new IllegalArgumentException("Table doesn't have defined primary indexes ");
- }
- StringBuilder key = new StringBuilder();
- String pfx = "";
- for(String keyCol: keyCols) {
- key.append(pfx);
- key.append(row.get(keyCol));
- pfx = ",";
- }
- String keyStr = key.toString();
- return keyStr;
- }
-
- public void updateMusicDB(String tableName, String primaryKey, PreparedQueryObject pQObject) {
- if(MusicMixin.criticalTables.contains(tableName)) {
- ReturnType rt = null;
- try {
- rt = MusicCore.atomicPut(music_ns, tableName, primaryKey, pQObject, null);
- } catch (MusicLockingException e) {
- e.printStackTrace();
- } catch (MusicServiceException e) {
- e.printStackTrace();
- } catch (MusicQueryException e) {
- e.printStackTrace();
- }
- if(rt.getResult().getResult().toLowerCase().equals("failure")) {
- System.out.println("Failure while critical put..."+rt.getMessage());
- }
- } else {
- ReturnType rt = MusicCore.eventualPut(pQObject);
- if(rt.getResult().getResult().toLowerCase().equals("failure")) {
- System.out.println("Failure while critical put..."+rt.getMessage());
- }
- }
- }
-
-
- private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, String table, UUID redoUuid){
- PreparedQueryObject query = new PreparedQueryObject();
- StringBuilder appendBuilder = new StringBuilder();
- appendBuilder.append("UPDATE ")
- .append(music_ns)
- .append(".")
- .append(mriTable)
- .append(" SET txredolog = txredolog +[('")
- .append(table)
- .append("',")
- .append(redoUuid)
- .append(")] WHERE rangeid = ")
- .append(uuid)
- .append(";");
- query.appendQueryString(appendBuilder.toString());
- return query;
- }
-
- protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException {
- UUID mriIndex = partition.getMusicRangeInformationIndex();
- String lockId;
- lockId = MusicCore.createLockReference(fullyQualifiedKey);
- //\TODO Handle better failures to acquire locks
- ReturnType lockReturn;
- try {
- lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId);
- } catch (MusicLockingException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Lock was not acquire correctly for key "+fullyQualifiedKey);
- throw new MDBCServiceException("Lock was not acquire correctly for key "+fullyQualifiedKey);
- } catch (MusicServiceException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Error in music, when locking key: "+fullyQualifiedKey);
- throw new MDBCServiceException("Error in music, when locking: "+fullyQualifiedKey);
- } catch (MusicQueryException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Error in executing query music, when locking key: "+fullyQualifiedKey);
- throw new MDBCServiceException("Error in executing query music, when locking: "+fullyQualifiedKey);
- }
- //\TODO this is wrong, we should have a better way to obtain a lock forcefully, clean the queue and obtain the lock
- if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) {
- try {
- MusicCore.forciblyReleaseLock(fullyQualifiedKey,lockId);
- CassaLockStore lockingServiceHandle = MusicCore.getLockingServiceHandle();
- CassaLockStore.LockObject lockOwner = lockingServiceHandle.peekLockQueue(music_ns,
- this.musicRangeInformationTableName, mriIndex.toString());
- while(lockOwner.lockRef != lockId) {
- MusicCore.forciblyReleaseLock(fullyQualifiedKey, lockOwner.lockRef);
- try {
- lockOwner = lockingServiceHandle.peekLockQueue(music_ns,
- this.musicRangeInformationTableName, mriIndex.toString());
- } catch(NullPointerException e){
- //Ignore null pointer exception
- lockId = MusicCore.createLockReference(fullyQualifiedKey);
- break;
- }
- }
- lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId);
-
- } catch (MusicLockingException e) {
- throw new MDBCServiceException("Could not lock the corresponding lock");
- } catch (MusicServiceException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Error in music, when locking key: "+fullyQualifiedKey);
- throw new MDBCServiceException("Error in music, when locking: "+fullyQualifiedKey);
- } catch (MusicQueryException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Error in executing query music, when locking key: "+fullyQualifiedKey);
- throw new MDBCServiceException("Error in executing query music, when locking: "+fullyQualifiedKey);
- }
- }
- if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) {
- throw new MDBCServiceException("Could not lock the corresponding lock");
- }
- //TODO: Java newbie here, verify that this lockId is actually assigned to the global DatabasePartition in the StateManager instance
- partition.setLockId(lockId);
- return lockId;
- }
-
-
-
- protected void appendIndexToMri(String lockId, UUID commitId, UUID MriIndex) throws MDBCServiceException{
- PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MriIndex, musicTxDigestTableName, commitId);
- ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, MriIndex.toString(), appendQuery, lockId, null);
- if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
- logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage());
- throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage());
- }
- }
-
- @Override
- public void commitLog(DBInterface dbi, DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{
- UUID mriIndex = partition.getMusicRangeInformationIndex();
- if(mriIndex==null) {
- //\TODO Fetch MriIndex from the Range Information Table
- throw new MDBCServiceException("TIT Index retrieval not yet implemented");
- }
- String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex.toString();
- //0. See if reference to lock was already created
- String lockId = partition.getLockId();
- if(lockId == null || lockId.isEmpty()) {
- lockId = createAndAssignLock(fullyQualifiedMriKey,partition);
- }
-
- UUID commitId;
- //Generate a local commit id
- if(progressKeeper.containsTx(txId)) {
- commitId = progressKeeper.getCommitId(txId);
- }
- else{
- logger.error(EELFLoggerDelegate.errorLogger, "Tx with id "+txId+" was not created in the TxCommitProgress ");
- throw new MDBCServiceException("Tx with id "+txId+" was not created in the TxCommitProgress ");
- }
- //Add creation type of transaction digest
-
- //1. Push new row to RRT and obtain its index
- String serializedTransactionDigest;
- try {
- serializedTransactionDigest = MDBCUtils.toString(transactionDigest);
- } catch (IOException e) {
- throw new MDBCServiceException("Failed to serialized transaction digest with error "+e.toString());
- }
- MusicTxDigestId digestId = new MusicTxDigestId(commitId);
- addTxDigest(digestId, serializedTransactionDigest);
- //2. Save RRT index to RQ
- if(progressKeeper!= null) {
- progressKeeper.setRecordId(txId,digestId);
- }
- //3. Append RRT index into the corresponding TIT row array
- appendToRedoLog(mriIndex,partition,digestId);
- }
-
- /**
- * @param tableName
- * @param string
- * @param rowValues
- * @return
- */
- @SuppressWarnings("unused")
- private String getUid(String tableName, String string, Object[] rowValues) {
- //
- // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update
- String cql = String.format("SELECT * FROM %s.%s;", music_ns, tableName);
- PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind();
- bound.setReadTimeoutMillis(60000);
- Session sess = getMusicSession();
- ResultSet rs;
- synchronized (sess) {
- rs = sess.execute(bound);
- }
-
- //should never reach here
- logger.error(EELFLoggerDelegate.errorLogger, "Could not find the row in the primary key");
- return null;
- }
-
- @Override
- public Object[] getObjects(TableInfo ti, String tableName, JSONObject row) {
- // \FIXME: we may need to add the primary key of the row if it was autogenerated by MUSIC
- List<String> cols = ti.columns;
- int size = cols.size();
- boolean hasDefault = false;
- if(row.has(getMusicDefaultPrimaryKeyName())) {
- size++;
- hasDefault = true;
- }
-
- Object[] objects = new Object[size];
- int idx = 0;
- if(hasDefault) {
- objects[idx++] = row.getString(getMusicDefaultPrimaryKeyName());
- }
- for(String col : ti.columns) {
- objects[idx]=row.get(col);
- }
- return objects;
- }
-
- @Override
- public List<UUID> getPartitionIndexes() {
- ArrayList<UUID> partitions = new ArrayList<UUID>();
- String cql = String.format("SELECT rangeid FROM %s.%s", music_ns, musicRangeInformationTableName);
- ResultSet rs = executeMusicRead(cql);
- for (Row r: rs) {
- partitions.add(r.getUUID("rangeid"));
- }
- return partitions;
- }
-
- @Override
- public MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException {
- //TODO: verify that lock id is valid before calling the database operations function
- //UUID id = partition.getMusicRangeInformationIndex();
-
- String cql = String.format("SELECT * FROM %s.%s WHERE rangeid = ?;", music_ns, musicRangeInformationTableName);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- pQueryObject.addValue(partitionIndex);
- Row newRow;
- try {
- newRow = executeMusicUnlockedQuorumGet(pQueryObject);
- } catch (MDBCServiceException e) {
- logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName);
- throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
- }
-
- List<TupleValue> log = newRow.getList("txredolog",TupleValue.class);
- List<MusicTxDigestId> digestIds = new ArrayList<>();
- for(TupleValue t: log){
- //final String tableName = t.getString(0);
- final UUID index = t.getUUID(1);
- digestIds.add(new MusicTxDigestId(index));
- }
- List<Range> partitions = new ArrayList<>();
- Set<String> tables = newRow.getSet("keys",String.class);
- for (String table:tables){
- partitions.add(new Range(table));
- }
- return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex, ""),
- digestIds, newRow.getString("ownerid"),newRow.getString("metricprocessid"));
- }
-
-
- /**
- * This function creates the TransactionInformation table. It contain information related
- * to the transactions happening in a given partition.
- * * The schema of the table is
- * * Id, uiid.
- * * Partition, uuid id of the partition
- * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables
- * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables
- * * Redo: list of uiids associated to the Redo Records Table
- *
- */
- private void createMusicRangeInformationTable() throws MDBCServiceException {
- String tableName = this.musicRangeInformationTableName;
- String priKey = "rangeid";
- StringBuilder fields = new StringBuilder();
- fields.append("rangeid uuid, ");
- fields.append("keys set<text>, ");
- fields.append("ownerid text, ");
- fields.append("metricprocessid text, ");
- //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly
- fields.append("txredolog list<frozen<tuple<text,uuid>>> ");
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));",
- this.music_ns, tableName, fields, priKey);
- try {
- executeMusicWriteQuery(this.music_ns,tableName,cql);
- } catch (MDBCServiceException e) {
- logger.error("Initialization error: Failure to create transaction information table");
- throw(e);
- }
- }
-
-
- @Override
- public DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException {
- DatabasePartition newPartition = info.getDBPartition();
- String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+newPartition.getMusicRangeInformationIndex().toString();
- String lockId = createAndAssignLock(fullyQualifiedMriKey,newPartition);
- createEmptyMriRow(info.getMetricProcessId(),lockId,new ArrayList<Range>());
- throw new UnsupportedOperationException();
- }
-
- /**
- * Creates a new empty MRI row
- * @param processId id of the process that is going to own initially this.
- * @return uuid associated to the new row
- */
- private UUID createEmptyMriRow(String processId, String lockId, List<Range> ranges)
- throws MDBCServiceException {
- UUID id = MDBCUtils.generateUniqueKey();
- return createEmptyMriRow(id,processId,lockId,ranges);
- }
-
- /**
- * Creates a new empty MRI row
- * @param processId id of the process that is going to own initially this.
- * @return uuid associated to the new row
- */
- private UUID createEmptyMriRow(UUID id, String processId, String lockId, List<Range> ranges)
- throws MDBCServiceException{
- StringBuilder insert = new StringBuilder("INSERT INTO ")
- .append(this.music_ns)
- .append('.')
- .append(this.musicRangeInformationTableName)
- .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ")
- .append("(")
- .append(id)
- .append(",{");
- boolean first=true;
- for(Range r: ranges){
- if(first){ first=false; }
- else {
- insert.append(',');
- }
- insert.append("'").append(r.toString()).append("'");
- }
- insert.append("},'")
- .append((lockId==null)?"":lockId)
- .append("','")
- .append(processId)
- .append("',[]);");
- PreparedQueryObject query = new PreparedQueryObject();
- query.appendQueryString(insert.toString());
- try {
- executeMusicLockedPut(this.music_ns,this.musicRangeInformationTableName,id.toString(),query,lockId,null);
- } catch (MDBCServiceException e) {
- logger.error("Initialization error: Failure to add new row to transaction information");
- throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
- }
- return id;
- }
-
- @Override
- public void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException {
- PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, mriRowId, musicTxDigestTableName, newRecord.txId);
- ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, mriRowId.toString(), appendQuery, partition.getLockId(), null);
- if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
- logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage());
- throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage());
- }
- }
-
- public void createMusicTxDigest() throws MDBCServiceException {
- createMusicTxDigest(-1);
- }
-
-
- /**
- * This function creates the MusicTxDigest table. It contain information related to each transaction committed
- * * LeaseId: id associated with the lease, text
- * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
- * * TransactionDigest: text that contains all the changes in the transaction
- */
- private void createMusicTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException {
- String tableName = this.musicTxDigestTableName;
- if(musicTxDigestTableNumber >= 0) {
- tableName = tableName +
- "-" +
- Integer.toString(musicTxDigestTableNumber);
- }
- String priKey = "txid";
- StringBuilder fields = new StringBuilder();
- fields.append("txid uuid, ");
- fields.append("transactiondigest text ");//notice lack of ','
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey);
- try {
- executeMusicWriteQuery(this.music_ns,tableName,cql);
- } catch (MDBCServiceException e) {
- logger.error("Initialization error: Failure to create redo records table");
- throw(e);
- }
- }
-
-
- @Override
- public void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException {
- //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest);
- PreparedQueryObject query = new PreparedQueryObject();
- String cqlQuery = "INSERT INTO " +
- this.music_ns +
- '.' +
- this.musicTxDigestTableName +
- " (txid,transactiondigest) " +
- "VALUES (" +
- newId.txId + ",'" +
- transactionDigest +
- "');";
- query.appendQueryString(cqlQuery);
- //\TODO check if I am not shooting on my own foot
- try {
- MusicCore.nonKeyRelatedPut(query,"critical");
- } catch (MusicServiceException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage());
- throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString());
- }
- }
-
-
-
- @Override
- public HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException {
- String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- pQueryObject.addValue(id.txId);
- Row newRow;
- try {
- newRow = executeMusicUnlockedQuorumGet(pQueryObject);
- } catch (MDBCServiceException e) {
- logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.txId);
- throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
- }
- String digest = newRow.getString("transactiondigest");
- HashMap<Range,StagingTable> changes;
- try {
- changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest);
- } catch (IOException e) {
- logger.error("IOException when deserializing digest failed with an invalid class for id:"+id.txId);
- throw new MDBCServiceException("Deserializng digest failed with ioexception");
- } catch (ClassNotFoundException e) {
- logger.error("Deserializng digest failed with an invalid class for id:"+id.txId);
- throw new MDBCServiceException("Deserializng digest failed with an invalid class");
- }
- return changes;
- }
-
- @Override
- public void own(List<Range> ranges){
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void appendRange(String rangeId, List<Range> ranges){
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void relinquish(String ownerId, String rangeId){
- throw new UnsupportedOperationException();
- }
-
- /**
- * This method executes a write query in Music
- * @param cql the CQL to be sent to Cassandra
- */
- private static void executeMusicWriteQuery(String keyspace, String table, String cql)
- throws MDBCServiceException {
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- ResultType rt = null;
- try {
- rt = MusicCore.createTable(keyspace,table,pQueryObject,"critical");
- } catch (MusicServiceException e) {
- //\TODO: handle better, at least transform into an MDBCServiceException
- e.printStackTrace();
- }
- String result = rt.getResult();
- if (result==null || result.toLowerCase().equals("failure")) {
- throw new MDBCServiceException("Music eventual put failed");
- }
- }
-
- private static Row executeMusicLockedGet(String keyspace, String table, PreparedQueryObject cqlObject, String primaryKey,
- String lock)
- throws MDBCServiceException{
- ResultSet result;
- try {
- result = MusicCore.criticalGet(keyspace,table,primaryKey,cqlObject,lock);
- } catch(MusicServiceException e){
- //\TODO: handle better, at least transform into an MDBCServiceException
- e.printStackTrace();
- throw new MDBCServiceException("Error executing critical get");
- }
- if(result.isExhausted()){
- throw new MDBCServiceException("There is not a row that matches the id "+primaryKey);
- }
- return result.one();
- }
-
- private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject)
- throws MDBCServiceException{
- ResultSet result = MusicCore.quorumGet(cqlObject);
- //\TODO: handle better, at least transform into an MDBCServiceException
- if(result.isExhausted()){
- throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]");
- }
- return result.one();
- }
-
- private void executeMusicLockedPut(String namespace, String tableName,
- String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId,
- MusicCore.Condition conditionInfo) throws MDBCServiceException {
- ReturnType rt ;
- if(lockId==null) {
- try {
- rt = MusicCore.atomicPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, conditionInfo);
- } catch (MusicLockingException e) {
- logger.error("Music locked put failed");
- throw new MDBCServiceException("Music locked put failed");
- } catch (MusicServiceException e) {
- logger.error("Music service fail: Music locked put failed");
- throw new MDBCServiceException("Music service fail: Music locked put failed");
- } catch (MusicQueryException e) {
- logger.error("Music query fail: locked put failed");
- throw new MDBCServiceException("Music query fail: Music locked put failed");
- }
- }
- else {
- rt = MusicCore.criticalPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, lockId, conditionInfo);
- }
- if (rt.getResult().getResult().toLowerCase().equals("failure")) {
- throw new MDBCServiceException("Music locked put failed");
- }
- }
-}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
index 3d85a27..0f05734 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
@@ -25,8 +25,6 @@ import java.sql.Connection;
import java.util.Properties;
import org.onap.music.logging.EELFLoggerDelegate;
-import org.onap.music.mdbc.DatabasePartition;
-import org.onap.music.mdbc.MusicSqlManager;
/**
* This class is used to construct instances of Mixins that implement either the {@link org.onap.music.mdbc.mixins.DBInterface}
@@ -50,7 +48,7 @@ public class MixinFactory {
* @param info the Properties to use as an argument to the constructor
* @return the newly constructed DBInterface, or null if one cannot be found.
*/
- public static DBInterface createDBInterface(String name, MusicSqlManager msm, String url, Connection conn, Properties info) {
+ public static DBInterface createDBInterface(String name, MusicInterface mi, String url, Connection conn, Properties info) {
for (Class<?> cl : Utils.getClassesImplementing(DBInterface.class)) {
try {
Constructor<?> con = cl.getConstructor();
@@ -59,10 +57,10 @@ public class MixinFactory {
String miname = dbi.getMixinName();
logger.info(EELFLoggerDelegate.applicationLogger,"Checking "+miname);
if (miname.equalsIgnoreCase(name)) {
- con = cl.getConstructor(MusicSqlManager.class, String.class, Connection.class, Properties.class);
+ con = cl.getConstructor(MusicInterface.class, String.class, Connection.class, Properties.class);
if (con != null) {
logger.info(EELFLoggerDelegate.applicationLogger,"Found match: "+miname);
- return (DBInterface) con.newInstance(msm, url, conn, info);
+ return (DBInterface) con.newInstance(mi, url, conn, info);
}
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java
index 0732dc8..8181159 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java
@@ -44,21 +44,21 @@ import com.datastax.driver.core.Row;
/**
* This class provides the methods that MDBC needs to access Cassandra directly in order to provide persistence
* to calls to the user's DB. It stores dirty row references in one table (called DIRTY____) rather than one dirty
- * table per real table (as {@link org.onap.music.mdbc.mixins.CassandraMixin} does).
+ * table per real table (as {@link org.onap.music.mdbc.mixins.MusicMixin} does).
*
* @author Robert P. Eby
*/
-public class Cassandra2Mixin extends CassandraMixin {
+public class Music2Mixin extends MusicMixin {
private static final String DIRTY_TABLE = "DIRTY____"; // it seems Cassandra won't allow __DIRTY__
private boolean dirty_table_created = false;
- private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Cassandra2Mixin.class);
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Music2Mixin.class);
- public Cassandra2Mixin() {
+ public Music2Mixin() {
super();
}
- public Cassandra2Mixin(String url, Properties info) throws MDBCServiceException {
+ public Music2Mixin(String url, Properties info) throws MDBCServiceException {
super(url, info);
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
index 52b3036..5b10a73 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
@@ -162,14 +162,13 @@ public interface MusicInterface {
/**
* Commits the corresponding REDO-log into MUSIC
*
- * @param dbi, the database interface use in the local SQL cache, where the music interface is being used
* @param partition
* @param transactionDigest digest of the transaction that is being committed into the Redo log in music. It has to be a HashMap, because it is required to be serializable
* @param txId id associated with the log being send
* @param progressKeeper data structure that is used to handle to detect failures, and know what to do
* @throws MDBCServiceException
*/
- void commitLog(DBInterface dbi, DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException;
+ void commitLog(DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException;
MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index a7ea680..258ea4f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -20,211 +20,1488 @@
package org.onap.music.mdbc.mixins;
import java.io.IOException;
-import java.io.InputStream;
-import java.util.*;
+import java.io.Reader;
+import java.nio.ByteBuffer;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
-import org.onap.music.mdbc.LockId;
-import org.json.JSONObject;
-import org.onap.music.exceptions.MusicLockingException;
-
-import org.onap.music.exceptions.MDBCServiceException;
-import org.onap.music.mdbc.DatabasePartition;
-import org.onap.music.mdbc.Range;
-import org.onap.music.mdbc.TableInfo;
+import org.onap.music.mdbc.*;
import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigest;
import org.onap.music.mdbc.tables.TxCommitProgress;
+import org.json.JSONObject;
+import org.onap.music.datastore.CassaLockStore;
+import org.onap.music.datastore.PreparedQueryObject;
+import org.onap.music.exceptions.MusicLockingException;
+import org.onap.music.exceptions.MusicQueryException;
+import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.main.MusicCore;
+import org.onap.music.main.MusicCore.Condition;
+import org.onap.music.main.ResultType;
+import org.onap.music.main.ReturnType;
-/**
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.logging.EELFLoggerDelegate;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TupleValue;
+/**
+ * This class provides the methods that MDBC needs to access Cassandra directly in order to provide persistence
+ * to calls to the user's DB. It does not do any table or row locking.
+ *
+ * <p>This code only supports the following limited list of H2 and Cassandra data types:</p>
+ * <table summary="">
+ * <tr><th>H2 Data Type</th><th>Mapped to Cassandra Data Type</th></tr>
+ * <tr><td>BIGINT</td><td>BIGINT</td></tr>
+ * <tr><td>BOOLEAN</td><td>BOOLEAN</td></tr>
+ * <tr><td>CLOB</td><td>BLOB</td></tr>
+ * <tr><td>DOUBLE</td><td>DOUBLE</td></tr>
+ * <tr><td>INTEGER</td><td>INT</td></tr>
+ * <tr><td>TIMESTAMP</td><td>TIMESTAMP</td></tr>
+ * <tr><td>VARBINARY</td><td>BLOB</td></tr>
+ * <tr><td>VARCHAR</td><td>VARCHAR</td></tr>
+ * </table>
*
+ * @author Robert P. Eby
*/
public class MusicMixin implements MusicInterface {
+ /** The property name to use to identify this replica to MusicSqlManager */
+ public static final String KEY_MY_ID = "myid";
+ /** The property name to use for the comma-separated list of replica IDs. */
+ public static final String KEY_REPLICAS = "replica_ids";
+ /** The property name to use to identify the IP address for Cassandra. */
+ public static final String KEY_MUSIC_ADDRESS = "cassandra.host";
+ /** The property name to use to provide the replication factor for Cassandra. */
+ public static final String KEY_MUSIC_RFACTOR = "music_rfactor";
+ /** The property name to use to provide the replication factor for Cassandra. */
+ public static final String KEY_MUSIC_NAMESPACE = "music_namespace";
+ /** Namespace for the tables in MUSIC (Cassandra) */
+ public static final String DEFAULT_MUSIC_NAMESPACE = "namespace";
+ /** The default property value to use for the Cassandra IP address. */
+ public static final String DEFAULT_MUSIC_ADDRESS = "localhost";
+ /** The default property value to use for the Cassandra replication factor. */
+ public static final int DEFAULT_MUSIC_RFACTOR = 1;
+ /** The default primary string column, if none is provided. */
+ public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid";
+ /** Type of the primary key, if none is defined by the user */
+ public static final String MDBC_PRIMARYKEY_TYPE = "uuid";
+
+
+ //\TODO Add logic to change the names when required and create the tables when necessary
+ private String musicTxDigestTableName = "musictxdigest";
+ private String musicRangeInformationTableName = "musicrangeinformation";
- public static Map<Integer, Set<String>> currentLockMap = new HashMap<>();
- public static List<String> criticalTables = new ArrayList<>();
-
- @Override
- public String getMixinName() {
- //
- return null;
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicMixin.class);
+
+ private static final Map<Integer, String> typemap = new HashMap<>();
+ static {
+ // We only support the following type mappings currently (from DB -> Cassandra).
+ // Anything else will likely cause a NullPointerException
+ typemap.put(Types.BIGINT, "BIGINT"); // aka. IDENTITY
+ typemap.put(Types.BLOB, "VARCHAR");
+ typemap.put(Types.BOOLEAN, "BOOLEAN");
+ typemap.put(Types.CLOB, "BLOB");
+ typemap.put(Types.DATE, "VARCHAR");
+ typemap.put(Types.DOUBLE, "DOUBLE");
+ typemap.put(Types.DECIMAL, "DECIMAL");
+ typemap.put(Types.INTEGER, "INT");
+ //typemap.put(Types.TIMESTAMP, "TIMESTAMP");
+ typemap.put(Types.SMALLINT, "SMALLINT");
+ typemap.put(Types.TIMESTAMP, "VARCHAR");
+ typemap.put(Types.VARBINARY, "BLOB");
+ typemap.put(Types.VARCHAR, "VARCHAR");
+ typemap.put(Types.CHAR, "VARCHAR");
+ //The "Hacks", these don't have a direct mapping
+ //typemap.put(Types.DATE, "VARCHAR");
+ //typemap.put(Types.DATE, "TIMESTAMP");
}
- @Override
- public String getMusicDefaultPrimaryKeyName() {
- //
- return null;
- }
+ protected final String music_ns;
+ protected final String myId;
+ protected final String[] allReplicaIds;
+ private final String musicAddress;
+ private final int music_rfactor;
+ private MusicConnector mCon = null;
+ private Session musicSession = null;
+ private boolean keyspace_created = false;
+ private Map<String, PreparedStatement> ps_cache = new HashMap<>();
+ private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>());
+ public MusicMixin() {
+ //this.logger = null;
+ this.musicAddress = null;
+ this.music_ns = null;
+ this.music_rfactor = 0;
+ this.myId = null;
+ this.allReplicaIds = null;
+ }
+
+ public MusicMixin(String url, Properties info) throws MDBCServiceException {
+ // Default values -- should be overridden in the Properties
+ // Default to using the host_ids of the various peers as the replica IDs (this is probably preferred)
+ this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS);
+ logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: musicAddress="+musicAddress);
+
+ String s = info.getProperty(KEY_MUSIC_RFACTOR);
+ this.music_rfactor = (s == null) ? DEFAULT_MUSIC_RFACTOR : Integer.parseInt(s);
+
+ this.myId = info.getProperty(KEY_MY_ID, getMyHostId());
+ logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: myId="+myId);
+
+
+ this.allReplicaIds = info.getProperty(KEY_REPLICAS, getAllHostIds()).split(",");
+ logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: allReplicaIds="+info.getProperty(KEY_REPLICAS, this.myId));
+
+ this.music_ns = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE);
+ logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns);
+ createKeyspace();
+ }
+
+ /**
+ * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables.
+ * The keyspace name comes from the initialization properties passed to the JDBC driver.
+ */
@Override
- public String generateUniqueKey() {
- //
- return null;
+ public void createKeyspace() throws MDBCServiceException {
+
+ Map<String,Object> replicationInfo = new HashMap<>();
+ replicationInfo.put("'class'", "'SimpleStrategy'");
+ replicationInfo.put("'replication_factor'", music_rfactor);
+
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ queryObject.appendQueryString(
+ "CREATE KEYSPACE IF NOT EXISTS " + this.music_ns +
+ " WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":"));
+
+ try {
+ MusicCore.nonKeyRelatedPut(queryObject, "eventual");
+ } catch (MusicServiceException e) {
+ if (!e.getMessage().equals("Keyspace "+music_ns+" already exists")) {
+ throw new MDBCServiceException("Error creating namespace: "+music_ns+". Internal error:"+e.getErrorMessage());
+ }
+ }
+ }
+
+ private String getMyHostId() {
+ ResultSet rs = executeMusicRead("SELECT HOST_ID FROM SYSTEM.LOCAL");
+ Row row = rs.one();
+ return (row == null) ? "UNKNOWN" : row.getUUID("HOST_ID").toString();
+ }
+ private String getAllHostIds() {
+ ResultSet results = executeMusicRead("SELECT HOST_ID FROM SYSTEM.PEERS");
+ StringBuilder sb = new StringBuilder(myId);
+ for (Row row : results) {
+ sb.append(",");
+ sb.append(row.getUUID("HOST_ID").toString());
+ }
+ return sb.toString();
}
+ /**
+ * Get the name of this MusicInterface mixin object.
+ * @return the name
+ */
@Override
- public String getMusicKeyFromRow(TableInfo ti, String table, JSONObject dbRow) {
- //
- return null;
+ public String getMixinName() {
+ return "cassandra";
}
-
+ /**
+ * Do what is needed to close down the MUSIC connection.
+ */
@Override
public void close() {
- //
-
+ if (musicSession != null) {
+ musicSession.close();
+ musicSession = null;
+ }
}
-
@Override
- public void createKeyspace() {
- //
-
+ public void initializeMetricDataStructures() throws MDBCServiceException {
+ try {
+ createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number
+ createMusicRangeInformationTable();
+ }
+ catch(MDBCServiceException e){
+ logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC");
+ }
}
+ /**
+ * This method performs all necessary initialization in Music/Cassandra to store the table <i>tableName</i>.
+ * @param tableName the table to initialize MUSIC for
+ */
@Override
public void initializeMusicForTable(TableInfo ti, String tableName) {
- //
-
+ /**
+ * This code creates two tables for every table in SQL:
+ * (i) a table with the exact same name as the SQL table storing the SQL data.
+ * (ii) a "dirty bits" table that stores the keys in the Cassandra table that are yet to be
+ * updated in the SQL table (they were written by some other node).
+ */
+ StringBuilder fields = new StringBuilder();
+ StringBuilder prikey = new StringBuilder();
+ String pfx = "", pfx2 = "";
+ for (int i = 0; i < ti.columns.size(); i++) {
+ fields.append(pfx)
+ .append(ti.columns.get(i))
+ .append(" ")
+ .append(typemap.get(ti.coltype.get(i)));
+ if (ti.iskey.get(i)) {
+ // Primary key column
+ prikey.append(pfx2).append(ti.columns.get(i));
+ pfx2 = ", ";
+ }
+ pfx = ", ";
+ }
+ if (prikey.length()==0) {
+ fields.append(pfx).append(MDBC_PRIMARYKEY_NAME)
+ .append(" ")
+ .append(MDBC_PRIMARYKEY_TYPE);
+ prikey.append(MDBC_PRIMARYKEY_NAME);
+ }
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", music_ns, tableName, fields.toString(), prikey.toString());
+ executeMusicWriteQuery(cql);
}
+ // **************************************************
+ // Dirty Tables (in MUSIC) methods
+ // **************************************************
+
+ /**
+ * Create a <i>dirty row</i> table for the real table <i>tableName</i>. The primary keys columns from the real table are recreated in
+ * the dirty table, along with a "REPLICA__" column that names the replica that should update it's internal state from MUSIC.
+ * @param tableName the table to create a "dirty" table for
+ */
@Override
public void createDirtyRowTable(TableInfo ti, String tableName) {
- //
-
+ // create dirtybitsTable at all replicas
+// for (String repl : allReplicaIds) {
+//// String dirtyRowsTableName = "dirty_"+tableName+"_"+allReplicaIds[i];
+//// String dirtyTableQuery = "CREATE TABLE IF NOT EXISTS "+music_ns+"."+ dirtyRowsTableName+" (dirtyRowKeys text PRIMARY KEY);";
+// cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s_%s (dirtyRowKeys TEXT PRIMARY KEY);", music_ns, tableName, repl);
+// executeMusicWriteQuery(cql);
+// }
+ StringBuilder ddl = new StringBuilder("REPLICA__ TEXT");
+ StringBuilder cols = new StringBuilder("REPLICA__");
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ // Only use the primary keys columns in the "Dirty" table
+ ddl.append(", ")
+ .append(ti.columns.get(i))
+ .append(" ")
+ .append(typemap.get(ti.coltype.get(i)));
+ cols.append(", ").append(ti.columns.get(i));
+ }
+ }
+ if(cols.length()==0) {
+ //fixme
+ System.err.println("Create dirty row table found no primary key");
+ }
+ ddl.append(", PRIMARY KEY(").append(cols).append(")");
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s (%s);", music_ns, tableName, ddl.toString());
+ executeMusicWriteQuery(cql);
}
-
+ /**
+ * Drop the dirty row table for <i>tableName</i> from MUSIC.
+ * @param tableName the table being dropped
+ */
@Override
public void dropDirtyRowTable(String tableName) {
- //
-
+ String cql = String.format("DROP TABLE %s.DIRTY_%s;", music_ns, tableName);
+ executeMusicWriteQuery(cql);
}
-
- @Override
- public void clearMusicForTable(String tableName) {
- //
-
- }
-
+ /**
+ * Mark rows as "dirty" in the dirty rows table for <i>tableName</i>. Rows are marked for all replicas but
+ * this one (this replica already has the up to date data).
+ * @param tableName the table we are marking dirty
+ * @param keys an ordered list of the values being put into the table. The values that correspond to the tables'
+ * primary key are copied into the dirty row table.
+ */
@Override
public void markDirtyRow(TableInfo ti, String tableName, JSONObject keys) {
- //
-
+ Object[] keyObj = getObjects(ti,tableName, keys);
+ StringBuilder cols = new StringBuilder("REPLICA__");
+ PreparedQueryObject pQueryObject = null;
+ StringBuilder vals = new StringBuilder("?");
+ List<Object> vallist = new ArrayList<Object>();
+ vallist.add(""); // placeholder for replica
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ cols.append(", ").append(ti.columns.get(i));
+ vals.append(", ").append("?");
+ vallist.add(keyObj[i]);
+ }
+ }
+ if(cols.length()==0) {
+ //FIXME
+ System.err.println("markDIrtyRow need to fix primary key");
+ }
+ String cql = String.format("INSERT INTO %s.DIRTY_%s (%s) VALUES (%s);", music_ns, tableName, cols.toString(), vals.toString());
+ /*Session sess = getMusicSession();
+ PreparedStatement ps = getPreparedStatementFromCache(cql);*/
+ String primaryKey;
+ if(ti.hasKey()) {
+ primaryKey = getMusicKeyFromRow(ti,tableName, keys);
+ }
+ else {
+ primaryKey = getMusicKeyFromRowWithoutPrimaryIndexes(ti,tableName, keys);
+ }
+ System.out.println("markDirtyRow: PK value: "+primaryKey);
+
+ Object pkObj = null;
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ pkObj = keyObj[i];
+ }
+ }
+ for (String repl : allReplicaIds) {
+ pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ pQueryObject.addValue(tableName);
+ pQueryObject.addValue(repl);
+ pQueryObject.addValue(pkObj);
+ updateMusicDB(tableName, primaryKey, pQueryObject);
+ //if (!repl.equals(myId)) {
+ /*logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
+ vallist.set(0, repl);
+ BoundStatement bound = ps.bind(vallist.toArray());
+ bound.setReadTimeoutMillis(60000);
+ synchronized (sess) {
+ sess.execute(bound);
+ }*/
+ //}
+
+ }
}
-
+ /**
+ * Remove the entries from the dirty row (for this replica) that correspond to a set of primary keys
+ * @param tableName the table we are removing dirty entries from
+ * @param keys the primary key values to use in the DELETE. Note: this is *only* the primary keys, not a full table row.
+ */
@Override
public void cleanDirtyRow(TableInfo ti, String tableName, JSONObject keys) {
- //
-
+ Object[] keysObjects = getObjects(ti,tableName,keys);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ StringBuilder cols = new StringBuilder("REPLICA__=?");
+ List<Object> vallist = new ArrayList<Object>();
+ vallist.add(myId);
+ int n = 0;
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ cols.append(" AND ").append(ti.columns.get(i)).append("=?");
+ vallist.add(keysObjects[n++]);
+ pQueryObject.addValue(keysObjects[n++]);
+ }
+ }
+ String cql = String.format("DELETE FROM %s.DIRTY_%s WHERE %s;", music_ns, tableName, cols.toString());
+ logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
+ pQueryObject.appendQueryString(cql);
+ ReturnType rt = MusicCore.eventualPut(pQueryObject);
+ if(rt.getResult().getResult().toLowerCase().equals("failure")) {
+ System.out.println("Failure while cleanDirtyRow..."+rt.getMessage());
+ }
+ /*Session sess = getMusicSession();
+ PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(vallist.toArray());
+ bound.setReadTimeoutMillis(60000);
+ synchronized (sess) {
+ sess.execute(bound);
+ }*/
}
-
+ /**
+ * Get a list of "dirty rows" for a table. The dirty rows returned apply only to this replica,
+ * and consist of a Map of primary key column names and values.
+ * @param tableName the table we are querying for
+ * @return a list of maps; each list item is a map of the primary key names and values for that "dirty row".
+ */
@Override
- public List<Map<String, Object>> getDirtyRows(TableInfo ti, String tableName) {
- //
- return null;
+ public List<Map<String,Object>> getDirtyRows(TableInfo ti, String tableName) {
+ String cql = String.format("SELECT * FROM %s.DIRTY_%s WHERE REPLICA__=?;", music_ns, tableName);
+ ResultSet results = null;
+ logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
+
+ /*Session sess = getMusicSession();
+ PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(new Object[] { myId });
+ bound.setReadTimeoutMillis(60000);
+ synchronized (sess) {
+ results = sess.execute(bound);
+ }*/
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ try {
+ results = MusicCore.get(pQueryObject);
+ } catch (MusicServiceException e) {
+
+ e.printStackTrace();
+ }
+
+ ColumnDefinitions cdef = results.getColumnDefinitions();
+ List<Map<String,Object>> list = new ArrayList<Map<String,Object>>();
+ for (Row row : results) {
+ Map<String,Object> objs = new HashMap<String,Object>();
+ for (int i = 0; i < cdef.size(); i++) {
+ String colname = cdef.getName(i).toUpperCase();
+ String coltype = cdef.getType(i).getName().toString().toUpperCase();
+ if (!colname.equals("REPLICA__")) {
+ switch (coltype) {
+ case "BIGINT":
+ objs.put(colname, row.getLong(colname));
+ break;
+ case "BOOLEAN":
+ objs.put(colname, row.getBool(colname));
+ break;
+ case "BLOB":
+ objs.put(colname, row.getString(colname));
+ break;
+ case "DATE":
+ objs.put(colname, row.getString(colname));
+ break;
+ case "DOUBLE":
+ objs.put(colname, row.getDouble(colname));
+ break;
+ case "DECIMAL":
+ objs.put(colname, row.getDecimal(colname));
+ break;
+ case "INT":
+ objs.put(colname, row.getInt(colname));
+ break;
+ case "TIMESTAMP":
+ objs.put(colname, row.getTimestamp(colname));
+ break;
+ case "VARCHAR":
+ default:
+ objs.put(colname, row.getString(colname));
+ break;
+ }
+ }
+ }
+ list.add(objs);
+ }
+ return list;
}
+ /**
+ * Drops the named table and its dirty row table (for all replicas) from MUSIC. The dirty row table is dropped first.
+ * @param tableName This is the table that has been dropped
+ */
+ @Override
+ public void clearMusicForTable(String tableName) {
+ dropDirtyRowTable(tableName);
+ String cql = String.format("DROP TABLE %s.%s;", music_ns, tableName);
+ executeMusicWriteQuery(cql);
+ }
+ /**
+ * This function is called whenever there is a DELETE to a row on a local SQL table, wherein it updates the
+ * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. MUSIC propagates
+ * it to the other replicas.
+ *
+ * @param tableName This is the table that has changed.
+ * @param oldRow This is a copy of the old row being deleted
+ */
@Override
public void deleteFromEntityTableInMusic(TableInfo ti, String tableName, JSONObject oldRow) {
- //
+ Object[] objects = getObjects(ti,tableName,oldRow);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ if (ti.hasKey()) {
+ assert(ti.columns.size() == objects.length);
+ } else {
+ assert(ti.columns.size()+1 == objects.length);
+ }
+
+ StringBuilder where = new StringBuilder();
+ List<Object> vallist = new ArrayList<Object>();
+ String pfx = "";
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ where.append(pfx)
+ .append(ti.columns.get(i))
+ .append("=?");
+ vallist.add(objects[i]);
+ pQueryObject.addValue(objects[i]);
+ pfx = " AND ";
+ }
+ }
+ if (!ti.hasKey()) {
+ where.append(MDBC_PRIMARYKEY_NAME + "=?");
+ //\FIXME this is wrong, old row is not going to contain the UUID, this needs to be fixed
+ vallist.add(UUID.fromString((String) objects[0]));
+ pQueryObject.addValue(UUID.fromString((String) objects[0]));
+ }
+
+ String cql = String.format("DELETE FROM %s.%s WHERE %s;", music_ns, tableName, where.toString());
+ logger.error(EELFLoggerDelegate.errorLogger,"Executing MUSIC write:"+ cql);
+ pQueryObject.appendQueryString(cql);
+
+ /*PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(vallist.toArray());
+ bound.setReadTimeoutMillis(60000);
+ Session sess = getMusicSession();
+ synchronized (sess) {
+ sess.execute(bound);
+ }*/
+ String primaryKey = getMusicKeyFromRow(ti,tableName, oldRow);
+ updateMusicDB(tableName, primaryKey, pQueryObject);
+
+ // Mark the dirty rows in music for all the replicas but us
+ markDirtyRow(ti,tableName, oldRow);
}
+ public Set<String> getMusicTableSet(String ns) {
+ Set<String> set = new TreeSet<String>();
+ String cql = String.format("SELECT TABLE_NAME FROM SYSTEM_SCHEMA.TABLES WHERE KEYSPACE_NAME = '%s'", ns);
+ ResultSet rs = executeMusicRead(cql);
+ for (Row row : rs) {
+ set.add(row.getString("TABLE_NAME").toUpperCase());
+ }
+ return set;
+ }
+ /**
+ * This method is called whenever there is a SELECT on a local SQL table, wherein it first checks the local
+ * dirty bits table to see if there are any keys in Cassandra whose value has not yet been sent to SQL
+ * @param tableName This is the table on which the select is being performed
+ */
@Override
public void readDirtyRowsAndUpdateDb(DBInterface dbi, String tableName) {
- //
+ // Read dirty rows of this table from Music
+ TableInfo ti = dbi.getTableInfo(tableName);
+ List<Map<String,Object>> objlist = getDirtyRows(ti,tableName);
+ PreparedQueryObject pQueryObject = null;
+ String pre_cql = String.format("SELECT * FROM %s.%s WHERE ", music_ns, tableName);
+ List<Object> vallist = new ArrayList<Object>();
+ StringBuilder sb = new StringBuilder();
+ //\TODO Perform a batch operation instead of each row at a time
+ for (Map<String,Object> map : objlist) {
+ pQueryObject = new PreparedQueryObject();
+ sb.setLength(0);
+ vallist.clear();
+ String pfx = "";
+ for (String key : map.keySet()) {
+ sb.append(pfx).append(key).append("=?");
+ vallist.add(map.get(key));
+ pQueryObject.addValue(map.get(key));
+ pfx = " AND ";
+ }
+
+ String cql = pre_cql + sb.toString();
+ System.out.println("readDirtyRowsAndUpdateDb: cql: "+cql);
+ pQueryObject.appendQueryString(cql);
+ ResultSet dirtyRows = null;
+ try {
+ //\TODO Why is this an eventual put?, this should be an atomic
+ dirtyRows = MusicCore.get(pQueryObject);
+ } catch (MusicServiceException e) {
+
+ e.printStackTrace();
+ }
+ /*
+ Session sess = getMusicSession();
+ PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(vallist.toArray());
+ bound.setReadTimeoutMillis(60000);
+ ResultSet dirtyRows = null;
+ synchronized (sess) {
+ dirtyRows = sess.execute(bound);
+ }*/
+ List<Row> rows = dirtyRows.all();
+ if (rows.isEmpty()) {
+ // No rows, the row must have been deleted
+ deleteRowFromSqlDb(dbi,tableName, map);
+ } else {
+ for (Row row : rows) {
+ writeMusicRowToSQLDb(dbi,tableName, row);
+ }
+ }
+ }
+ }
+
+ private void deleteRowFromSqlDb(DBInterface dbi, String tableName, Map<String, Object> map) {
+ dbi.deleteRowFromSqlDb(tableName, map);
+ TableInfo ti = dbi.getTableInfo(tableName);
+ List<Object> vallist = new ArrayList<Object>();
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ String col = ti.columns.get(i);
+ Object val = map.get(col);
+ vallist.add(val);
+ }
+ }
+ cleanDirtyRow(ti, tableName, new JSONObject(vallist));
+ }
+ /**
+ * This functions copies the contents of a row in Music into the corresponding row in the SQL table
+ * @param tableName This is the name of the table in both Music and swl
+ * @param musicRow This is the row in Music that is being copied into SQL
+ */
+ private void writeMusicRowToSQLDb(DBInterface dbi, String tableName, Row musicRow) {
+ // First construct the map of columns and their values
+ TableInfo ti = dbi.getTableInfo(tableName);
+ Map<String, Object> map = new HashMap<String, Object>();
+ List<Object> vallist = new ArrayList<Object>();
+ String rowid = tableName;
+ for (String col : ti.columns) {
+ Object val = getValue(musicRow, col);
+ map.put(col, val);
+ if (ti.iskey(col)) {
+ vallist.add(val);
+ rowid += "_" + val.toString();
+ }
+ }
+
+ logger.debug("Blocking rowid: "+rowid);
+ in_progress.add(rowid); // Block propagation of the following INSERT/UPDATE
+
+ dbi.insertRowIntoSqlDb(tableName, map);
+
+ logger.debug("Unblocking rowid: "+rowid);
+ in_progress.remove(rowid); // Unblock propagation
+
+// try {
+// String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString());
+// executeSQLWrite(sql);
+// } catch (SQLException e) {
+// logger.debug("Insert failed because row exists, do an update");
+// // TODO - rewrite this UPDATE command should not update key fields
+// String sql = String.format("UPDATE %s SET (%s) = (%s) WHERE %s", tableName, fields.toString(), values.toString(), where.toString());
+// try {
+// executeSQLWrite(sql);
+// } catch (SQLException e1) {
+// e1.printStackTrace();
+// }
+// }
+
+ ti = dbi.getTableInfo(tableName);
+ cleanDirtyRow(ti, tableName, new JSONObject(vallist));
+
+// String selectQuery = "select "+ primaryKeyName+" FROM "+tableName+" WHERE "+primaryKeyName+"="+primaryKeyValue+";";
+// java.sql.ResultSet rs = executeSQLRead(selectQuery);
+// String dbWriteQuery=null;
+// try {
+// if(rs.next()){//this entry is there, do an update
+// dbWriteQuery = "UPDATE "+tableName+" SET "+columnNameString+" = "+ valueString +"WHERE "+primaryKeyName+"="+primaryKeyValue+";";
+// }else
+// dbWriteQuery = "INSERT INTO "+tableName+" VALUES"+valueString+";";
+// executeSQLWrite(dbWriteQuery);
+// } catch (SQLException e) {
+// // ZZTODO Auto-generated catch block
+// e.printStackTrace();
+// }
+ //clean the music dirty bits table
+// String dirtyRowIdsTableName = music_ns+".DIRTY_"+tableName+"_"+myId;
+// String deleteQuery = "DELETE FROM "+dirtyRowIdsTableName+" WHERE dirtyRowKeys=$$"+primaryKeyValue+"$$;";
+// executeMusicWriteQuery(deleteQuery);
+ }
+ private Object getValue(Row musicRow, String colname) {
+ ColumnDefinitions cdef = musicRow.getColumnDefinitions();
+ DataType colType;
+ try {
+ colType= cdef.getType(colname);
+ }
+ catch(IllegalArgumentException e) {
+ logger.warn("Colname is not part of table metadata: "+e);
+ throw e;
+ }
+ String typeStr = colType.getName().toString().toUpperCase();
+ switch (typeStr) {
+ case "BIGINT":
+ return musicRow.getLong(colname);
+ case "BOOLEAN":
+ return musicRow.getBool(colname);
+ case "BLOB":
+ return musicRow.getString(colname);
+ case "DATE":
+ return musicRow.getString(colname);
+ case "DECIMAL":
+ return musicRow.getDecimal(colname);
+ case "DOUBLE":
+ return musicRow.getDouble(colname);
+ case "SMALLINT":
+ case "INT":
+ return musicRow.getInt(colname);
+ case "TIMESTAMP":
+ return musicRow.getTimestamp(colname);
+ case "UUID":
+ return musicRow.getUUID(colname);
+ default:
+ logger.error(EELFLoggerDelegate.errorLogger, "UNEXPECTED COLUMN TYPE: columname="+colname+", columntype="+typeStr);
+ // fall thru
+ case "VARCHAR":
+ return musicRow.getString(colname);
+ }
}
+ /**
+ * This method is called whenever there is an INSERT or UPDATE to a local SQL table, wherein it updates the
+ * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. Music propagates
+ * it to the other replicas.
+ *
+ * @param tableName This is the table that has changed.
+ * @param changedRow This is information about the row that has changed
+ */
@Override
public void updateDirtyRowAndEntityTableInMusic(TableInfo ti, String tableName, JSONObject changedRow) {
- updateDirtyRowAndEntityTableInMusic(tableName, changedRow, false);
+ // Build the CQL command
+ Object[] objects = getObjects(ti,tableName,changedRow);
+ StringBuilder fields = new StringBuilder();
+ StringBuilder values = new StringBuilder();
+ String rowid = tableName;
+ Object[] newrow = new Object[objects.length];
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ String pfx = "";
+ int keyoffset=0;
+ for (int i = 0; i < objects.length; i++) {
+ if (!ti.hasKey() && i==0) {
+ //We need to tack on cassandra's uid in place of a primary key
+ fields.append(MDBC_PRIMARYKEY_NAME);
+ values.append("?");
+ newrow[i] = UUID.fromString((String) objects[i]);
+ pQueryObject.addValue(newrow[i]);
+ keyoffset=-1;
+ pfx = ", ";
+ continue;
+ }
+ fields.append(pfx).append(ti.columns.get(i+keyoffset));
+ values.append(pfx).append("?");
+ pfx = ", ";
+ if (objects[i] instanceof byte[]) {
+ // Cassandra doesn't seem to have a Codec to translate a byte[] to a ByteBuffer
+ newrow[i] = ByteBuffer.wrap((byte[]) objects[i]);
+ pQueryObject.addValue(newrow[i]);
+ } else if (objects[i] instanceof Reader) {
+ // Cassandra doesn't seem to have a Codec to translate a Reader to a ByteBuffer either...
+ newrow[i] = ByteBuffer.wrap(readBytesFromReader((Reader) objects[i]));
+ pQueryObject.addValue(newrow[i]);
+ } else {
+ newrow[i] = objects[i];
+ pQueryObject.addValue(newrow[i]);
+ }
+ if (i+keyoffset>=0 && ti.iskey.get(i+keyoffset)) {
+ rowid += "_" + newrow[i].toString();
+ }
+ }
+ if (in_progress.contains(rowid)) {
+ // This call to updateDirtyRowAndEntityTableInMusic() was called as a result of a Cassandra -> H2 update; ignore
+ logger.debug(EELFLoggerDelegate.applicationLogger, "updateDirtyRowAndEntityTableInMusic: bypassing MUSIC update on "+rowid);
+
+ } else {
+ // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update
+ String cql = String.format("INSERT INTO %s.%s (%s) VALUES (%s);", music_ns, tableName, fields.toString(), values.toString());
+
+ pQueryObject.appendQueryString(cql);
+ String primaryKey = getMusicKeyFromRow(ti,tableName, changedRow);
+ updateMusicDB(tableName, primaryKey, pQueryObject);
+
+ /*PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(newrow);
+ bound.setReadTimeoutMillis(60000);
+ Session sess = getMusicSession();
+ synchronized (sess) {
+ sess.execute(bound);
+ }*/
+ // Mark the dirty rows in music for all the replicas but us
+ markDirtyRow(ti,tableName, changedRow);
+ }
}
+
- public void updateDirtyRowAndEntityTableInMusic(String tableName, JSONObject changedRow, boolean isCritical) {
- }
- public static void releaseZKLocks(Set<LockId> lockIds) {
- for (LockId lockId : lockIds) {
- System.out.println("Releasing lock: " + lockId);
- try {
- MusicCore.voluntaryReleaseLock(lockId.getFullyQualifiedLockKey(), lockId.getLockReference());
- MusicCore.destroyLockRef(lockId.getFullyQualifiedLockKey(), lockId.getLockReference());
- } catch (MusicLockingException e) {
- e.printStackTrace();
+ private byte[] readBytesFromReader(Reader rdr) {
+ StringBuilder sb = new StringBuilder();
+ try {
+ int ch;
+ while ((ch = rdr.read()) >= 0) {
+ sb.append((char)ch);
}
+ } catch (IOException e) {
+ logger.warn("readBytesFromReader: "+e);
}
+ return sb.toString().getBytes();
}
- @Override
- public String getMusicKeyFromRowWithoutPrimaryIndexes(TableInfo ti, String tableName, JSONObject changedRow) {
- //
- return null;
+ protected PreparedStatement getPreparedStatementFromCache(String cql) {
+ // Note: have to hope that the Session never changes!
+ if (!ps_cache.containsKey(cql)) {
+ Session sess = getMusicSession();
+ PreparedStatement ps = sess.prepare(cql);
+ ps_cache.put(cql, ps);
+ }
+ return ps_cache.get(cql);
}
- @Override
- public void initializeMetricDataStructures() {
- //
+ /**
+ * This method gets a connection to Music
+ * @return the Cassandra Session to use
+ */
+ protected Session getMusicSession() {
+ // create cassandra session
+ if (musicSession == null) {
+ logger.info(EELFLoggerDelegate.applicationLogger, "Creating New Music Session");
+ mCon = new MusicConnector(musicAddress);
+ musicSession = mCon.getSession();
+ }
+ return musicSession;
+ }
+
+ /**
+ * This method executes a write query in Music
+ * @param cql the CQL to be sent to Cassandra
+ */
+ protected void executeMusicWriteQuery(String cql) {
+ logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ ReturnType rt = MusicCore.eventualPut(pQueryObject);
+ if(rt.getResult().getResult().toLowerCase().equals("failure")) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage());
+ }
+ /*Session sess = getMusicSession();
+ SimpleStatement s = new SimpleStatement(cql);
+ s.setReadTimeoutMillis(60000);
+ synchronized (sess) {
+ sess.execute(s);
+ }*/
+ }
+
+ /**
+ * This method executes a read query in Music
+ * @param cql the CQL to be sent to Cassandra
+ * @return a ResultSet containing the rows returned from the query
+ */
+ protected ResultSet executeMusicRead(String cql) {
+ logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ ResultSet results = null;
+ try {
+ results = MusicCore.get(pQueryObject);
+ } catch (MusicServiceException e) {
+
+ e.printStackTrace();
+ }
+ return results;
+ /*Session sess = getMusicSession();
+ synchronized (sess) {
+ return sess.execute(cql);
+ }*/
+ }
+ /**
+ * Returns the default primary key name that this mixin uses
+ */
+ public String getMusicDefaultPrimaryKeyName() {
+ return MDBC_PRIMARYKEY_NAME;
}
+ /**
+ * Return the function for cassandra's primary key generation
+ */
+ public String generateUniqueKey() {
+ return MDBCUtils.generateUniqueKey().toString();
+ }
+
@Override
- public Object[] getObjects(TableInfo ti, String tableName, JSONObject row) {
+ public String getMusicKeyFromRowWithoutPrimaryIndexes(TableInfo ti, String table, JSONObject dbRow) {
+ //\TODO this operation is super expensive to perform, both latency and BW
+ // it is better to add additional where clauses, and have the primary key
+ // to be composed of known columns of the table
+ // Adding this primary indexes would be an additional burden to the developers, which spanner
+ // also does, but otherwise performance is really bad
+ // At least it should have a set of columns that are guaranteed to be unique
+ StringBuilder cqlOperation = new StringBuilder();
+ cqlOperation.append("SELECT * FROM ")
+ .append(music_ns)
+ .append(".")
+ .append(table);
+ ResultSet musicResults = executeMusicRead(cqlOperation.toString());
+ Object[] dbRowObjects = getObjects(ti,table,dbRow);
+ while (!musicResults.isExhausted()) {
+ Row musicRow = musicResults.one();
+ if (rowIs(ti, musicRow, dbRowObjects)) {
+ return ((UUID)getValue(musicRow, MDBC_PRIMARYKEY_NAME)).toString();
+ }
+ }
+ //should never reach here
return null;
}
+
+ /**
+ * Checks to see if this row is in list of database entries
+ * @param ti
+ * @param musicRow
+ * @param dbRow
+ * @return
+ */
+ private boolean rowIs(TableInfo ti, Row musicRow, Object[] dbRow) {
+ //System.out.println("Comparing " + musicRow.toString());
+ boolean sameRow=true;
+ for (int i=0; i<ti.columns.size(); i++) {
+ Object val = getValue(musicRow, ti.columns.get(i));
+ if (!dbRow[i].equals(val)) {
+ sameRow=false;
+ break;
+ }
+ }
+ return sameRow;
+ }
@Override
- public void commitLog(DBInterface dbi, DatabasePartition partition, HashMap<Range, StagingTable> transactionDigest, String txId, TxCommitProgress progressKeeper)
- throws MDBCServiceException {
- // TODO Auto-generated method stub
+ public String getMusicKeyFromRow(TableInfo ti, String tableName, JSONObject row) {
+ List<String> keyCols = ti.getKeyColumns();
+ if(keyCols.isEmpty()){
+ throw new IllegalArgumentException("Table doesn't have defined primary indexes ");
+ }
+ StringBuilder key = new StringBuilder();
+ String pfx = "";
+ for(String keyCol: keyCols) {
+ key.append(pfx);
+ key.append(row.get(keyCol));
+ pfx = ",";
+ }
+ String keyStr = key.toString();
+ return keyStr;
}
- @Override
- public HashMap<Range, StagingTable> getTxDigest(MusicTxDigestId id) {
- return null;
+ public void updateMusicDB(String tableName, String primaryKey, PreparedQueryObject pQObject) {
+ ReturnType rt = MusicCore.eventualPut(pQObject);
+ if(rt.getResult().getResult().toLowerCase().equals("failure")) {
+ System.out.println("Failure while critical put..."+rt.getMessage());
+ }
}
+ /**
+ * Build a preparedQueryObject that appends a transaction to the mriTable
+ * @param mriTable
+ * @param uuid
+ * @param table
+ * @param redoUuid
+ * @return
+ */
+ private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, String table, UUID redoUuid){
+ PreparedQueryObject query = new PreparedQueryObject();
+ StringBuilder appendBuilder = new StringBuilder();
+ appendBuilder.append("UPDATE ")
+ .append(music_ns)
+ .append(".")
+ .append(mriTable)
+ .append(" SET txredolog = txredolog +[('")
+ .append(table)
+ .append("',")
+ .append(redoUuid)
+ .append(")] WHERE rangeid = ")
+ .append(uuid)
+ .append(";");
+ query.appendQueryString(appendBuilder.toString());
+ return query;
+ }
+
+ protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException {
+ UUID mriIndex = partition.getMusicRangeInformationIndex();
+ String lockId;
+ lockId = MusicCore.createLockReference(fullyQualifiedKey);
+ //\TODO Handle better failures to acquire locks
+ ReturnType lockReturn;
+ try {
+ lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId);
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Lock was not acquire correctly for key "+fullyQualifiedKey);
+ throw new MDBCServiceException("Lock was not acquire correctly for key "+fullyQualifiedKey);
+ } catch (MusicServiceException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Error in music, when locking key: "+fullyQualifiedKey);
+ throw new MDBCServiceException("Error in music, when locking: "+fullyQualifiedKey);
+ } catch (MusicQueryException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Error in executing query music, when locking key: "+fullyQualifiedKey);
+ throw new MDBCServiceException("Error in executing query music, when locking: "+fullyQualifiedKey);
+ }
+ //\TODO this is wrong, we should have a better way to obtain a lock forcefully, clean the queue and obtain the lock
+ if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) {
+ try {
+ MusicCore.forciblyReleaseLock(fullyQualifiedKey,lockId);
+ CassaLockStore lockingServiceHandle = MusicCore.getLockingServiceHandle();
+ CassaLockStore.LockObject lockOwner = lockingServiceHandle.peekLockQueue(music_ns,
+ this.musicRangeInformationTableName, mriIndex.toString());
+ while(lockOwner.lockRef != lockId) {
+ MusicCore.forciblyReleaseLock(fullyQualifiedKey, lockOwner.lockRef);
+ try {
+ lockOwner = lockingServiceHandle.peekLockQueue(music_ns,
+ this.musicRangeInformationTableName, mriIndex.toString());
+ } catch(NullPointerException e){
+ //Ignore null pointer exception
+ lockId = MusicCore.createLockReference(fullyQualifiedKey);
+ break;
+ }
+ }
+ lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId);
+
+ } catch (MusicLockingException e) {
+ throw new MDBCServiceException("Could not lock the corresponding lock");
+ } catch (MusicServiceException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Error in music, when locking key: "+fullyQualifiedKey);
+ throw new MDBCServiceException("Error in music, when locking: "+fullyQualifiedKey);
+ } catch (MusicQueryException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Error in executing query music, when locking key: "+fullyQualifiedKey);
+ throw new MDBCServiceException("Error in executing query music, when locking: "+fullyQualifiedKey);
+ }
+ }
+ if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) {
+ throw new MDBCServiceException("Could not lock the corresponding lock");
+ }
+ //TODO: Java newbie here, verify that this lockId is actually assigned to the global DatabasePartition in the StateManager instance
+ partition.setLockId(lockId);
+ return lockId;
+ }
+
+
+ protected void appendIndexToMri(String lockId, UUID commitId, UUID MriIndex) throws MDBCServiceException{
+ PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MriIndex, musicTxDigestTableName, commitId);
+ ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, MriIndex.toString(), appendQuery, lockId, null);
+ if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
+ logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage());
+ throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage());
+ }
+ }
+
+ /**
+ * Writes the transaction information to metric's txDigest and musicRangeInformation table
+ * This officially commits the transaction globally
+ */
@Override
- public DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) {
+ public void commitLog(DatabasePartition partition,
+ HashMap<Range,StagingTable> transactionDigest, String txId ,
+ TxCommitProgress progressKeeper) throws MDBCServiceException {
+ if (transactionDigest==null || transactionDigest.size()==0) {
+ return;
+ }
+ logger.info("Commiting lock for " + partition.getMusicRangeInformationIndex() + ", txID=" + txId);
+ UUID mriIndex = partition.getMusicRangeInformationIndex();
+ if(mriIndex==null) {
+ //\TODO Fetch MriIndex from the Range Information Table
+ throw new MDBCServiceException("TIT Index retrieval not yet implemented");
+ }
+ String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex.toString();
+ //0. See if reference to lock was already created
+ String lockId = partition.getLockId();
+ if(lockId == null || lockId.isEmpty()) {
+ lockId = createAndAssignLock(fullyQualifiedMriKey,partition);
+ }
+
+ UUID commitId;
+ //Generate a local commit id
+ if(progressKeeper.containsTx(txId)) {
+ commitId = progressKeeper.getCommitId(txId);
+ }
+ else{
+ logger.error(EELFLoggerDelegate.errorLogger, "Tx with id "+txId+" was not created in the TxCommitProgress ");
+ throw new MDBCServiceException("Tx with id "+txId+" was not created in the TxCommitProgress ");
+ }
+ //Add creation type of transaction digest
+
+ //1. Push new row to RRT and obtain its index
+ String serializedTransactionDigest;
+ try {
+ serializedTransactionDigest = MDBCUtils.toString(transactionDigest);
+ } catch (IOException e) {
+ throw new MDBCServiceException("Failed to serialized transaction digest with error "+e.toString());
+ }
+ MusicTxDigestId digestId = new MusicTxDigestId(commitId);
+ addTxDigest(digestId, serializedTransactionDigest);
+ //2. Save RRT index to RQ
+ if(progressKeeper!= null) {
+ progressKeeper.setRecordId(txId,digestId);
+ }
+ //3. Append RRT index into the corresponding TIT row array
+ appendToRedoLog(mriIndex,partition,digestId);
+ }
+
+ /**
+ * @param tableName
+ * @param string
+ * @param rowValues
+ * @return
+ */
+ @SuppressWarnings("unused")
+ private String getUid(String tableName, String string, Object[] rowValues) {
+ //
+ // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update
+ String cql = String.format("SELECT * FROM %s.%s;", music_ns, tableName);
+ PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind();
+ bound.setReadTimeoutMillis(60000);
+ Session sess = getMusicSession();
+ ResultSet rs;
+ synchronized (sess) {
+ rs = sess.execute(bound);
+ }
+
+ //should never reach here
+ logger.error(EELFLoggerDelegate.errorLogger, "Could not find the row in the primary key");
return null;
}
@Override
- public void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) {
- }
+ public Object[] getObjects(TableInfo ti, String tableName, JSONObject row) {
+ // \FIXME: we may need to add the primary key of the row if it was autogenerated by MUSIC
+ List<String> cols = ti.columns;
+ int size = cols.size();
+ boolean hasDefault = false;
+ if(row.has(getMusicDefaultPrimaryKeyName())) {
+ size++;
+ hasDefault = true;
+ }
- @Override
- public void addTxDigest(MusicTxDigestId newId, String transactionDigest) {
+ Object[] objects = new Object[size];
+ int idx = 0;
+ if(hasDefault) {
+ objects[idx++] = row.getString(getMusicDefaultPrimaryKeyName());
+ }
+ for(String col : ti.columns) {
+ objects[idx]=row.get(col);
+ }
+ return objects;
}
-
+
@Override
- public void own(List<Range> ranges) {
- throw new java.lang.UnsupportedOperationException("function not implemented yet");
+ public List<UUID> getPartitionIndexes() {
+ ArrayList<UUID> partitions = new ArrayList<UUID>();
+ String cql = String.format("SELECT rangeid FROM %s.%s", music_ns, musicRangeInformationTableName);
+ ResultSet rs = executeMusicRead(cql);
+ for (Row r: rs) {
+ partitions.add(r.getUUID("rangeid"));
+ }
+ return partitions;
+ }
+
+ @Override
+ public MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException {
+ //TODO: verify that lock id is valid before calling the database operations function
+ //UUID id = partition.getMusicRangeInformationIndex();
+
+ String cql = String.format("SELECT * FROM %s.%s WHERE rangeid = ?;", music_ns, musicRangeInformationTableName);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ pQueryObject.addValue(partitionIndex);
+ Row newRow;
+ try {
+ newRow = executeMusicUnlockedQuorumGet(pQueryObject);
+ } catch (MDBCServiceException e) {
+ logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName);
+ throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
+ }
+
+ List<TupleValue> log = newRow.getList("txredolog",TupleValue.class);
+ List<MusicTxDigestId> digestIds = new ArrayList<>();
+ for(TupleValue t: log){
+ //final String tableName = t.getString(0);
+ final UUID index = t.getUUID(1);
+ digestIds.add(new MusicTxDigestId(index));
+ }
+ List<Range> partitions = new ArrayList<>();
+ Set<String> tables = newRow.getSet("keys",String.class);
+ for (String table:tables){
+ partitions.add(new Range(table));
+ }
+ return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex, ""),
+ digestIds, newRow.getString("ownerid"),newRow.getString("metricprocessid"));
+ }
+
+
+ /**
+ * This function creates the TransactionInformation table. It contain information related
+ * to the transactions happening in a given partition.
+ * * The schema of the table is
+ * * Id, uiid.
+ * * Partition, uuid id of the partition
+ * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables
+ * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables
+ * * Redo: list of uiids associated to the Redo Records Table
+ *
+ */
+ private void createMusicRangeInformationTable() throws MDBCServiceException {
+ String tableName = this.musicRangeInformationTableName;
+ String priKey = "rangeid";
+ StringBuilder fields = new StringBuilder();
+ fields.append("rangeid uuid, ");
+ fields.append("keys set<text>, ");
+ fields.append("ownerid text, ");
+ fields.append("metricprocessid text, ");
+ //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly
+ fields.append("txredolog list<frozen<tuple<text,uuid>>> ");
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));",
+ this.music_ns, tableName, fields, priKey);
+ try {
+ executeMusicWriteQuery(this.music_ns,tableName,cql);
+ } catch (MDBCServiceException e) {
+ logger.error("Initialization error: Failure to create transaction information table");
+ throw(e);
+ }
+ }
+
+
+ @Override
+ public DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException {
+ DatabasePartition newPartition = info.getDBPartition();
+ String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+newPartition.getMusicRangeInformationIndex().toString();
+ String lockId = createAndAssignLock(fullyQualifiedMriKey,newPartition);
+ createEmptyMriRow(info.getMetricProcessId(),lockId,new ArrayList<Range>());
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Creates a new empty MRI row
+ * @param processId id of the process that is going to own initially this.
+ * @return uuid associated to the new row
+ */
+ private UUID createEmptyMriRow(String processId, String lockId, List<Range> ranges)
+ throws MDBCServiceException {
+ UUID id = MDBCUtils.generateUniqueKey();
+ return createEmptyMriRow(id,processId,lockId,ranges);
+ }
+
+ /**
+ * Creates a new empty MRI row
+ * @param processId id of the process that is going to own initially this.
+ * @return uuid associated to the new row
+ */
+ private UUID createEmptyMriRow(UUID id, String processId, String lockId, List<Range> ranges)
+ throws MDBCServiceException{
+ StringBuilder insert = new StringBuilder("INSERT INTO ")
+ .append(this.music_ns)
+ .append('.')
+ .append(this.musicRangeInformationTableName)
+ .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ")
+ .append("(")
+ .append(id)
+ .append(",{");
+ boolean first=true;
+ for(Range r: ranges){
+ if(first){ first=false; }
+ else {
+ insert.append(',');
+ }
+ insert.append("'").append(r.toString()).append("'");
+ }
+ insert.append("},'")
+ .append((lockId==null)?"":lockId)
+ .append("','")
+ .append(processId)
+ .append("',[]);");
+ PreparedQueryObject query = new PreparedQueryObject();
+ query.appendQueryString(insert.toString());
+ try {
+ executeMusicLockedPut(this.music_ns,this.musicRangeInformationTableName,id.toString(),query,lockId,null);
+ } catch (MDBCServiceException e) {
+ logger.error("Initialization error: Failure to add new row to transaction information");
+ throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
+ }
+ return id;
+ }
+
+ @Override
+ public void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException {
+ logger.info("Appending to redo log for partition " + partition.getMusicRangeInformationIndex() + " txId=" + newRecord.txId);
+ PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, mriRowId, musicTxDigestTableName, newRecord.txId);
+ ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, mriRowId.toString(), appendQuery, partition.getLockId(), null);
+ if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
+ logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage());
+ throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage());
+ }
+ }
+
+ public void createMusicTxDigest() throws MDBCServiceException {
+ createMusicTxDigest(-1);
+ }
+
+
+ /**
+ * This function creates the MusicTxDigest table. It contain information related to each transaction committed
+ * * LeaseId: id associated with the lease, text
+ * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
+ * * TransactionDigest: text that contains all the changes in the transaction
+ */
+ private void createMusicTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException {
+ String tableName = this.musicTxDigestTableName;
+ if(musicTxDigestTableNumber >= 0) {
+ tableName = tableName +
+ "-" +
+ Integer.toString(musicTxDigestTableNumber);
+ }
+ String priKey = "txid";
+ StringBuilder fields = new StringBuilder();
+ fields.append("txid uuid, ");
+ fields.append("transactiondigest text ");//notice lack of ','
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey);
+ try {
+ executeMusicWriteQuery(this.music_ns,tableName,cql);
+ } catch (MDBCServiceException e) {
+ logger.error("Initialization error: Failure to create redo records table");
+ throw(e);
+ }
}
+
+
+ /**
+ * Writes the transaction history to the txDigest
+ */
+ @Override
+ public void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException {
+ //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest);
+ PreparedQueryObject query = new PreparedQueryObject();
+ String cqlQuery = "INSERT INTO " +
+ this.music_ns +
+ '.' +
+ this.musicTxDigestTableName +
+ " (txid,transactiondigest) " +
+ "VALUES (" +
+ newId.txId + ",'" +
+ transactionDigest +
+ "');";
+ query.appendQueryString(cqlQuery);
+ //\TODO check if I am not shooting on my own foot
+ try {
+ MusicCore.nonKeyRelatedPut(query,"critical");
+ } catch (MusicServiceException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage());
+ throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString());
+ }
+ }
+
- @Override
- public void appendRange(String rangeId, List<Range> ranges) {
- throw new java.lang.UnsupportedOperationException("function not implemented yet");
+
+ @Override
+ public HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException {
+ String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ pQueryObject.addValue(id.txId);
+ Row newRow;
+ try {
+ newRow = executeMusicUnlockedQuorumGet(pQueryObject);
+ } catch (MDBCServiceException e) {
+ logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.txId);
+ throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
+ }
+ String digest = newRow.getString("transactiondigest");
+ HashMap<Range,StagingTable> changes;
+ try {
+ changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest);
+ } catch (IOException e) {
+ logger.error("IOException when deserializing digest failed with an invalid class for id:"+id.txId);
+ throw new MDBCServiceException("Deserializng digest failed with ioexception");
+ } catch (ClassNotFoundException e) {
+ logger.error("Deserializng digest failed with an invalid class for id:"+id.txId);
+ throw new MDBCServiceException("Deserializng digest failed with an invalid class");
+ }
+ return changes;
+ }
+
+ @Override
+ public void own(List<Range> ranges){
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendRange(String rangeId, List<Range> ranges){
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void relinquish(String ownerId, String rangeId){
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * This method executes a write query in Music
+ * @param cql the CQL to be sent to Cassandra
+ */
+ private static void executeMusicWriteQuery(String keyspace, String table, String cql)
+ throws MDBCServiceException {
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ ResultType rt = null;
+ try {
+ rt = MusicCore.createTable(keyspace,table,pQueryObject,"critical");
+ } catch (MusicServiceException e) {
+ //\TODO: handle better, at least transform into an MDBCServiceException
+ e.printStackTrace();
+ }
+ String result = rt.getResult();
+ if (result==null || result.toLowerCase().equals("failure")) {
+ throw new MDBCServiceException("Music eventual put failed");
+ }
}
- @Override
- public void relinquish(String ownerId, String rangeId) {
- throw new java.lang.UnsupportedOperationException("function not implemented yet");
+ private static Row executeMusicLockedGet(String keyspace, String table, PreparedQueryObject cqlObject, String primaryKey,
+ String lock)
+ throws MDBCServiceException{
+ ResultSet result;
+ try {
+ result = MusicCore.criticalGet(keyspace,table,primaryKey,cqlObject,lock);
+ } catch(MusicServiceException e){
+ //\TODO: handle better, at least transform into an MDBCServiceException
+ e.printStackTrace();
+ throw new MDBCServiceException("Error executing critical get");
+ }
+ if(result.isExhausted()){
+ throw new MDBCServiceException("There is not a row that matches the id "+primaryKey);
+ }
+ return result.one();
}
- @Override
- public List<UUID> getPartitionIndexes() {
- // TODO Auto-generated method stub
- return null;
+ private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject)
+ throws MDBCServiceException{
+ ResultSet result = MusicCore.quorumGet(cqlObject);
+ //\TODO: handle better, at least transform into an MDBCServiceException
+ if(result.isExhausted()){
+ throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]");
+ }
+ return result.one();
}
- @Override
- public MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException {
- // TODO Auto-generated method stub
- return null;
+ private void executeMusicLockedPut(String namespace, String tableName,
+ String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId,
+ MusicCore.Condition conditionInfo) throws MDBCServiceException {
+ ReturnType rt ;
+ if(lockId==null) {
+ try {
+ rt = MusicCore.atomicPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, conditionInfo);
+ } catch (MusicLockingException e) {
+ logger.error("Music locked put failed");
+ throw new MDBCServiceException("Music locked put failed");
+ } catch (MusicServiceException e) {
+ logger.error("Music service fail: Music locked put failed");
+ throw new MDBCServiceException("Music service fail: Music locked put failed");
+ } catch (MusicQueryException e) {
+ logger.error("Music query fail: locked put failed");
+ throw new MDBCServiceException("Music query fail: Music locked put failed");
+ }
+ }
+ else {
+ rt = MusicCore.criticalPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, lockId, conditionInfo);
+ }
+ if (rt.getResult().getResult().toLowerCase().equals("failure")) {
+ throw new MDBCServiceException("Music locked put failed");
+ }
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
index 5943b34..383d522 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
@@ -37,7 +37,6 @@ import org.json.JSONObject;
import org.json.JSONTokener;
import org.onap.music.logging.EELFLoggerDelegate;
-import org.onap.music.mdbc.MusicSqlManager;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
import org.onap.music.mdbc.tables.OperationType;
@@ -71,7 +70,7 @@ public class MySQLMixin implements DBInterface {
"CREATE TABLE IF NOT EXISTS "+TRANS_TBL+
" (IX INT AUTO_INCREMENT, OP CHAR(1), TABLENAME VARCHAR(255), NEWROWDATA VARCHAR(1024), KEYDATA VARCHAR(1024), CONNECTION_ID INT,PRIMARY KEY (IX))";
- private final MusicSqlManager msm;
+ private final MusicInterface mi;
private final int connId;
private final String dbName;
private final Connection dbConnection;
@@ -79,14 +78,14 @@ public class MySQLMixin implements DBInterface {
private boolean server_tbl_created = false;
public MySQLMixin() {
- this.msm = null;
+ this.mi = null;
this.connId = 0;
this.dbName = null;
this.dbConnection = null;
this.tables = null;
}
- public MySQLMixin(MusicSqlManager msm, String url, Connection conn, Properties info) {
- this.msm = msm;
+ public MySQLMixin(MusicInterface mi, String url, Connection conn, Properties info) {
+ this.mi = mi;
this.connId = generateConnID(conn);
this.dbName = getDBName(conn);
this.dbConnection = conn;
@@ -585,13 +584,13 @@ NEW.field refers to the new value
// the actual columns, otherwise performance when doing range queries are going
// to be even worse (see the else bracket down)
//
- String musicKey = msm.generateUniqueKey();
+ String musicKey = mi.generateUniqueKey();
/*} else {
//get key from data
musicKey = msm.getMusicKeyFromRowWithoutPrimaryIndexes(tbl,newRow);
}*/
- newRow.put(msm.getMusicDefaultPrimaryKeyName(), musicKey);
- keydataStr.put(msm.getMusicDefaultPrimaryKeyName(), musicKey);
+ newRow.put(mi.getMusicDefaultPrimaryKeyName(), musicKey);
+ keydataStr.put(mi.getMusicDefaultPrimaryKeyName(), musicKey);
}
/*else {
//Use the keys
@@ -646,8 +645,8 @@ NEW.field refers to the new value
JSONObject jo = new JSONObject();
if (!getTableInfo(tableName).hasKey()) {
- String musicKey = msm.generateUniqueKey();
- jo.put(msm.getMusicDefaultPrimaryKeyName(), musicKey);
+ String musicKey = mi.generateUniqueKey();
+ jo.put(mi.getMusicDefaultPrimaryKeyName(), musicKey);
}
for (String col : ti.columns) {
@@ -655,7 +654,7 @@ NEW.field refers to the new value
}
@SuppressWarnings("unused")
- Object[] row = Utils.jsonToRow(ti,tableName, jo,msm.getMusicDefaultPrimaryKeyName());
+ Object[] row = Utils.jsonToRow(ti,tableName, jo, mi.getMusicDefaultPrimaryKeyName());
//\FIXME this is wrong now, update of the dirty row and entity is now handled by the archival process
//msm.updateDirtyRowAndEntityTableInMusic(ti,tableName, jo);
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
index 0a5bd60..cb5d28e 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
@@ -40,7 +40,7 @@ import org.onap.music.mdbc.MdbcServerLogic;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.StateManager;
import org.onap.music.mdbc.configurations.NodeConfiguration;
-import org.onap.music.mdbc.mixins.CassandraMixin;
+import org.onap.music.mdbc.mixins.MusicMixin;
import org.onap.music.mdbc.mixins.MusicInterface;
import com.datastax.driver.core.Row;
@@ -88,7 +88,7 @@ public class MusicTxDigest {
while(colIterator.hasNext()) {
String col = colIterator.next();
//FIXME: should not explicitly refer to cassandramixin
- if (col.equals(CassandraMixin.MDBC_PRIMARYKEY_NAME)) {
+ if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) {
//reserved name
continue;
}
@@ -140,6 +140,8 @@ public class MusicTxDigest {
case SELECT:
//no update happened, do nothing
break;
+ default:
+ logger.error(op.getOperationType() + "not implemented for replay");
}
}
diff --git a/mdbc-server/src/main/resources/mdbc.properties b/mdbc-server/src/main/resources/mdbc.properties
index d3feee2..73e8f77 100755
--- a/mdbc-server/src/main/resources/mdbc.properties
+++ b/mdbc-server/src/main/resources/mdbc.properties
@@ -2,11 +2,9 @@
# A list of all Mixins that should be checked by MDBC
#
MIXINS= \
- org.onap.music.mdbc.mixins.H2Mixin \
- org.onap.music.mdbc.mixins.H2ServerMixin \
org.onap.music.mdbc.mixins.MySQLMixin \
- org.onap.music.mdbc.mixins.CassandraMixin \
- org.onap.music.mdbc.mixins.Cassandra2Mixin
+ org.onap.music.mdbc.mixins.MusicMixin \
+ org.onap.music.mdbc.mixins.Music2Mixin
DEFAULT_DRIVERS=\
org.h2.Driver \
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java
index eab38d3..388f8b8 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java
@@ -38,5 +38,13 @@ public class MusicTxDigestTest {
HashMap<Range, StagingTable> digest = (HashMap<Range, StagingTable>) MDBCUtils.fromString(t1);
txDigest.replayTxDigest(digest);
}
+
+ @Test
+ public void testEmptyDigest() throws Exception {
+ String t1 = "rO0ABXNyABFqYXZhLnV0aWwuSGFzaE1hcAUH2sHDFmDRAwACRgAKbG9hZEZhY3RvckkACXRocmVzaG9sZHhwP0AAAAAAAAB3CAAAABAAAAAAeA==";
+ MusicTxDigest txDigest = new MusicTxDigest(null);
+ HashMap<Range, StagingTable> digest = (HashMap<Range, StagingTable>) MDBCUtils.fromString(t1);
+ txDigest.replayTxDigest(digest);
+ }
}
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/test/TestCommon.java b/mdbc-server/src/test/java/org/onap/music/mdbc/test/TestCommon.java
index c618f3b..7e7dc88 100755
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/test/TestCommon.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/test/TestCommon.java
@@ -24,7 +24,7 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
-import org.onap.music.mdbc.mixins.CassandraMixin;
+import org.onap.music.mdbc.mixins.MusicMixin;
public class TestCommon {
public static final String DB_DRIVER = "avatica.Driver";
@@ -34,9 +34,9 @@ public class TestCommon {
public Connection getDBConnection(String url, String keyspace, String id) throws SQLException, ClassNotFoundException {
Class.forName(DB_DRIVER);
Properties driver_info = new Properties();
- driver_info.put(CassandraMixin.KEY_MY_ID, id);
- driver_info.put(CassandraMixin.KEY_REPLICAS, "0,1,2");
- driver_info.put(CassandraMixin.KEY_MUSIC_ADDRESS, "localhost");
+ driver_info.put(MusicMixin.KEY_MY_ID, id);
+ driver_info.put(MusicMixin.KEY_REPLICAS, "0,1,2");
+ driver_info.put(MusicMixin.KEY_MUSIC_ADDRESS, "localhost");
driver_info.put("user", DB_USER);
driver_info.put("password", DB_PASSWORD);
return DriverManager.getConnection(url, driver_info);