From 1359f1023594c201b91fff73b2baa3f5d5cf6fd6 Mon Sep 17 00:00:00 2001 From: "Tschaen, Brendan" Date: Tue, 20 Nov 2018 15:29:12 -0500 Subject: MusicSqlManager Removal, othe code refactors MusicSqlManager not required in current setup Current code condensed and absorbed into MdbcConnection and MusicMixin Modify deprecated names "CassandraMixin" to "MusicMixin" Change-Id: If68f8937a385c19d901c56e5795ddc0acf45d399 Issue-ID: MUSIC-198 Signed-off-by: Tschaen, Brendan --- .../main/java/org/onap/music/mdbc/MDBCUtils.java | 2 +- .../org/onap/music/mdbc/MdbcCallableStatement.java | 8 +- .../java/org/onap/music/mdbc/MdbcConnection.java | 162 ++- .../org/onap/music/mdbc/MdbcPreparedStatement.java | 56 +- .../java/org/onap/music/mdbc/MdbcServerLogic.java | 10 +- .../java/org/onap/music/mdbc/MdbcStatement.java | 50 +- .../java/org/onap/music/mdbc/MusicSqlManager.java | 329 ----- .../java/org/onap/music/mdbc/ProxyStatement.java | 46 +- .../java/org/onap/music/mdbc/StateManager.java | 9 +- .../mdbc/configurations/TablesConfiguration.java | 2 +- .../onap/music/mdbc/examples/EtdbTestClient.java | 156 -- .../onap/music/mdbc/examples/MdbcTestClient.java | 155 ++ .../onap/music/mdbc/mixins/Cassandra2Mixin.java | 308 ---- .../org/onap/music/mdbc/mixins/CassandraMixin.java | 1502 -------------------- .../org/onap/music/mdbc/mixins/MixinFactory.java | 8 +- .../org/onap/music/mdbc/mixins/Music2Mixin.java | 308 ++++ .../org/onap/music/mdbc/mixins/MusicInterface.java | 3 +- .../org/onap/music/mdbc/mixins/MusicMixin.java | 1493 +++++++++++++++++-- .../org/onap/music/mdbc/mixins/MySQLMixin.java | 21 +- .../org/onap/music/mdbc/tables/MusicTxDigest.java | 6 +- mdbc-server/src/main/resources/mdbc.properties | 6 +- .../org/onap/music/mdbc/MusicTxDigestTest.java | 8 + .../java/org/onap/music/mdbc/test/TestCommon.java | 8 +- 23 files changed, 2093 insertions(+), 2563 deletions(-) delete mode 100755 mdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java delete mode 100755 mdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java create mode 100755 mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java delete mode 100755 mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java delete mode 100755 mdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java create mode 100755 mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java index 2442af1..2723490 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java @@ -34,7 +34,7 @@ import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.logging.format.AppMessages; import org.onap.music.logging.format.ErrorSeverity; import org.onap.music.logging.format.ErrorTypes; -import org.onap.music.mdbc.mixins.CassandraMixin; +import org.onap.music.mdbc.mixins.MusicMixin; import org.onap.music.mdbc.mixins.Utils; import org.onap.music.mdbc.tables.Operation; import org.onap.music.mdbc.tables.StagingTable; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcCallableStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcCallableStatement.java index b820686..d4d20ad 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcCallableStatement.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcCallableStatement.java @@ -53,12 +53,12 @@ public class MdbcCallableStatement extends MdbcPreparedStatement implements Call @SuppressWarnings("unused") private static final String DATASTAX_PREFIX = "com.datastax.driver"; - public MdbcCallableStatement(Statement stmt, MusicSqlManager m) { - super(stmt, m); + public MdbcCallableStatement(Statement stmt, MdbcConnection mConn) { + super(stmt, mConn); } - public MdbcCallableStatement(Statement stmt, String sql, MusicSqlManager mgr) { - super(stmt, sql, mgr); + public MdbcCallableStatement(Statement stmt, String sql, MdbcConnection mConn) { + super(stmt, sql, mConn); } @Override diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java index 352eacf..cca14c0 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java @@ -34,8 +34,12 @@ import java.sql.SQLXML; import java.sql.Savepoint; import java.sql.Statement; import java.sql.Struct; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.Executor; import org.onap.music.exceptions.MDBCServiceException; @@ -44,7 +48,10 @@ import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.logging.format.AppMessages; import org.onap.music.logging.format.ErrorSeverity; import org.onap.music.logging.format.ErrorTypes; +import org.onap.music.mdbc.mixins.DBInterface; +import org.onap.music.mdbc.mixins.MixinFactory; import org.onap.music.mdbc.mixins.MusicInterface; +import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.TxCommitProgress; @@ -61,25 +68,28 @@ public class MdbcConnection implements Connection { private final String id; // This is the transaction id, assigned to this connection. There is no need to change the id, if connection is reused private final Connection conn; // the JDBC Connection to the actual underlying database - private final MusicSqlManager mgr; // there should be one MusicSqlManager in use per Connection + private final MusicInterface mi; private final TxCommitProgress progressKeeper; private final DatabasePartition partition; + private final DBInterface dbi; + private final HashMap transactionDigest; + private final Set table_set; public MdbcConnection(String id, String url, Connection c, Properties info, MusicInterface mi, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException { this.id = id; + this.table_set = Collections.synchronizedSet(new HashSet()); + this.transactionDigest = new HashMap(); + if (c == null) { throw new MDBCServiceException("Connection is null"); } this.conn = c; + info.putAll(MDBCUtils.getMdbcProperties()); + String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT); + this.dbi = MixinFactory.createDBInterface(mixinDb, mi, url, conn, info); + this.mi = mi; try { - this.mgr = new MusicSqlManager(url, c, info, mi); - } catch (MDBCServiceException e) { - logger.error("Failure in creating Music SQL Manager"); - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL); - throw e; - } - try { - this.mgr.setAutoCommit(c.getAutoCommit(),null,null,null); + this.setAutoCommit(c.getAutoCommit()); } catch (SQLException e) { logger.error("Failure in autocommit"); logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL); @@ -87,19 +97,15 @@ public class MdbcConnection implements Connection { // Verify the tables in MUSIC match the tables in the database // and create triggers on any tables that need them - //mgr.synchronizeTableData(); - if ( mgr != null ) try { - mgr.synchronizeTables(); + try { + this.synchronizeTables(); } catch (QueryException e) { logger.error("Error syncrhonizing tables"); logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL); } - else { - logger.error(EELFLoggerDelegate.errorLogger, "MusicSqlManager was not correctly created", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL); - throw new MDBCServiceException("Music SQL Manager object is null or invalid"); - } this.progressKeeper = progressKeeper; this.partition = partition; + logger.debug("Mdbc connection created with id: "+id); } @@ -117,18 +123,18 @@ public class MdbcConnection implements Connection { @Override public Statement createStatement() throws SQLException { - return new MdbcCallableStatement(conn.createStatement(), mgr); + return new MdbcCallableStatement(conn.createStatement(), this); } @Override public PreparedStatement prepareStatement(String sql) throws SQLException { //TODO: grab the sql call from here and all the other preparestatement calls - return new MdbcPreparedStatement(conn.prepareStatement(sql), sql, mgr); + return new MdbcPreparedStatement(conn.prepareStatement(sql), sql, this); } @Override public CallableStatement prepareCall(String sql) throws SQLException { - return new MdbcCallableStatement(conn.prepareCall(sql), mgr); + return new MdbcCallableStatement(conn.prepareCall(sql), this); } @Override @@ -141,13 +147,23 @@ public class MdbcConnection implements Connection { boolean b = conn.getAutoCommit(); if (b != autoCommit) { if(progressKeeper!=null) progressKeeper.commitRequested(id); - try { - mgr.setAutoCommit(autoCommit,id,progressKeeper,partition); - if(progressKeeper!=null) - progressKeeper.setMusicDone(id); - } catch (MDBCServiceException e) { - logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL); - throw new SQLException("Failure commiting to MUSIC"); + logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b); + if (b) { + // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction + if(id == null || id.isEmpty()) { + logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); + throw new SQLException("tx id is null"); + } + try { + mi.commitLog(partition, transactionDigest, id, progressKeeper); + } catch (MDBCServiceException e) { + // TODO Auto-generated catch block + logger.error("Cannot commit log to music" + e.getStackTrace()); + throw new SQLException(e.getMessage()); + } + } + if(progressKeeper!=null) { + progressKeeper.setMusicDone(id); } conn.setAutoCommit(autoCommit); if(progressKeeper!=null) { @@ -164,6 +180,11 @@ public class MdbcConnection implements Connection { return conn.getAutoCommit(); } + /** + * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed, + * they are performed now and copied into MUSIC. + * @throws SQLException + */ @Override public void commit() throws SQLException { if(progressKeeper.isComplete(id)) { @@ -174,7 +195,9 @@ public class MdbcConnection implements Connection { } try { - mgr.commit(id,progressKeeper,partition); + logger.debug(EELFLoggerDelegate.applicationLogger, " commit "); + // transaction was committed -- add all the updates into the REDO-Log in MUSIC + mi.commitLog(partition, transactionDigest, id, progressKeeper); } catch (MDBCServiceException e) { //If the commit fail, then a new commitId should be used logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL); @@ -196,19 +219,26 @@ public class MdbcConnection implements Connection { } } + /** + * Perform a rollback, as requested by the JDBC driver. If any row updates have been delayed, + * they are discarded. + */ @Override public void rollback() throws SQLException { - mgr.rollback(); + logger.debug(EELFLoggerDelegate.applicationLogger, "Rollback");; + transactionDigest.clear(); conn.rollback(); progressKeeper.reinitializeTxProgress(id); } + /** + * Close this MdbcConnection. + */ @Override public void close() throws SQLException { logger.debug("Closing mdbc connection with id:"+id); - if (mgr != null) { - logger.debug("Closing mdbc manager with id:"+id); - mgr.close(); + if (dbi != null) { + dbi.close(); } if (conn != null && !conn.isClosed()) { logger.debug("Closing jdbc from mdbc with id:"+id); @@ -269,18 +299,18 @@ public class MdbcConnection implements Connection { @Override public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { - return new MdbcCallableStatement(conn.createStatement(resultSetType, resultSetConcurrency), mgr); + return new MdbcCallableStatement(conn.createStatement(resultSetType, resultSetConcurrency), this); } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { - return new MdbcCallableStatement(conn.prepareStatement(sql, resultSetType, resultSetConcurrency), sql, mgr); + return new MdbcCallableStatement(conn.prepareStatement(sql, resultSetType, resultSetConcurrency), sql, this); } @Override public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { - return new MdbcCallableStatement(conn.prepareCall(sql, resultSetType, resultSetConcurrency), mgr); + return new MdbcCallableStatement(conn.prepareCall(sql, resultSetType, resultSetConcurrency), this); } @Override @@ -326,34 +356,34 @@ public class MdbcConnection implements Connection { @Override public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { - return new MdbcCallableStatement(conn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability), mgr); + return new MdbcCallableStatement(conn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability), this); } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { - return new MdbcCallableStatement(conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql, mgr); + return new MdbcCallableStatement(conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql, this); } @Override public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { - return new MdbcCallableStatement(conn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability), mgr); + return new MdbcCallableStatement(conn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability), this); } @Override public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { - return new MdbcPreparedStatement(conn.prepareStatement(sql, autoGeneratedKeys), sql, mgr); + return new MdbcPreparedStatement(conn.prepareStatement(sql, autoGeneratedKeys), sql, this); } @Override public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { - return new MdbcPreparedStatement(conn.prepareStatement(sql, columnIndexes), sql, mgr); + return new MdbcPreparedStatement(conn.prepareStatement(sql, columnIndexes), sql, this); } @Override public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { - return new MdbcPreparedStatement(conn.prepareStatement(sql, columnNames), sql, mgr); + return new MdbcPreparedStatement(conn.prepareStatement(sql, columnNames), sql, this); } @Override @@ -435,4 +465,56 @@ public class MdbcConnection implements Connection { public int getNetworkTimeout() throws SQLException { return conn.getNetworkTimeout(); } + + + /** + * Code to be run within the DB driver before a SQL statement is executed. This is where tables + * can be synchronized before a SELECT, for those databases that do not support SELECT triggers. + * @param sql the SQL statement that is about to be executed + */ + public void preStatementHook(String sql) { + dbi.preStatementHook(sql); + } + + /** + * Code to be run within the DB driver after a SQL statement has been executed. This is where remote + * statement actions can be copied back to Cassandra/MUSIC. + * @param sql the SQL statement that was executed + */ + public void postStatementHook(String sql) { + dbi.postStatementHook(sql, transactionDigest); + } + + /** + * Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the + * proxy first starts, and whenever there is the possibility that tables were created or dropped. It is synchronized + * in order to prevent multiple threads from running this code in parallel. + */ + public void synchronizeTables() throws QueryException { + Set set1 = dbi.getSQLTableSet(); // set of tables in the database + logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1); + for (String tableName : set1) { + // This map will be filled in if this table was previously discovered + tableName = tableName.toUpperCase(); + if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) { + logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName); + try { + TableInfo ti = dbi.getTableInfo(tableName); + mi.initializeMusicForTable(ti,tableName); + //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted + ti = dbi.getTableInfo(tableName); + mi.createDirtyRowTable(ti,tableName); + dbi.createSQLTriggers(tableName); + table_set.add(tableName); + dbi.synchronizeData(tableName); + logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" + + table_set.size() + "/" + set1.size() + "tables uploaded"); + } catch (Exception e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); + //logger.error(EELFLoggerDelegate.errorLogger, "Exception synchronizeTables: "+e); + throw new QueryException(); + } + } + } + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java index fa08d3c..092aa94 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java @@ -62,13 +62,13 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat String[] params; // holds the parameters if prepared statement, indexing starts at 1 - public MdbcPreparedStatement(Statement stmt, MusicSqlManager m) { - super(stmt, m); + public MdbcPreparedStatement(Statement stmt, MdbcConnection mConn) { + super(stmt, mConn); this.sql = null; } - public MdbcPreparedStatement(Statement stmt, String sql, MusicSqlManager mgr) { - super(stmt, sql, mgr); + public MdbcPreparedStatement(Statement stmt, String sql, MdbcConnection mConn) { + super(stmt, sql, mConn); this.sql = sql; //indexing starts at 1 params = new String[StringUtils.countMatches(sql, "?")+1]; @@ -89,9 +89,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat logger.debug(EELFLoggerDelegate.applicationLogger,"executeQuery: "+sql); ResultSet r = null; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); r = stmt.executeQuery(sql); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -108,9 +108,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat int n = 0; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); n = stmt.executeUpdate(sql); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -189,9 +189,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql); boolean b = false; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); b = stmt.execute(sql); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -301,9 +301,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql); int n = 0; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); n = stmt.executeUpdate(sql, autoGeneratedKeys); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -319,9 +319,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql); int n = 0; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); n = stmt.executeUpdate(sql, columnIndexes); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -337,9 +337,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql); int n = 0; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); n = stmt.executeUpdate(sql, columnNames); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -355,9 +355,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql); boolean b = false; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); b = stmt.execute(sql, autoGeneratedKeys); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -373,9 +373,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql); boolean b = false; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); b = stmt.execute(sql, columnIndexes); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -391,9 +391,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql); boolean b = false; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); b = stmt.execute(sql, columnNames); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -439,9 +439,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat logger.debug(EELFLoggerDelegate.applicationLogger,"executeQuery: "+sql); ResultSet r = null; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); r = ((PreparedStatement)stmt).executeQuery();; - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { e.printStackTrace(); @@ -458,9 +458,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql); int n = 0; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); n = ((PreparedStatement)stmt).executeUpdate(); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { e.printStackTrace(); @@ -579,9 +579,9 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql); boolean b = false; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); b = ((PreparedStatement)stmt).execute(); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { e.printStackTrace(); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java index cccea92..757b468 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java @@ -97,7 +97,7 @@ public class MdbcServerLogic extends JdbcMeta{ //\TODO: don't use connectionCache, use this.manager internal state Connection conn = connectionCache.getIfPresent(id); if (conn == null) { - this.manager.CloseConnection(id); + this.manager.closeConnection(id); logger.error(EELFLoggerDelegate.errorLogger,"Connection not found: invalid id, closed, or expired: " + id); throw new RuntimeException(" Connection not found: invalid id, closed, or expired: " + id); @@ -119,8 +119,8 @@ public class MdbcServerLogic extends JdbcMeta{ } // Avoid global synchronization of connection opening try { - this.manager.OpenConnection(ch.id, info); - Connection conn = this.manager.GetConnection(ch.id); + this.manager.openConnection(ch.id, info); + Connection conn = this.manager.getConnection(ch.id); if(conn == null) { logger.error(EELFLoggerDelegate.errorLogger, "Connection created was null"); throw new RuntimeException("Connection created was null for connection: " + ch.id); @@ -130,7 +130,7 @@ public class MdbcServerLogic extends JdbcMeta{ // Race condition: someone beat us to storing the connection in the cache. if (loadedConn != null) { //\TODO check if we added an additional race condition for this - this.manager.CloseConnection(ch.id); + this.manager.closeConnection(ch.id); conn.close(); throw new RuntimeException("Connection already exists: " + ch.id); } @@ -156,7 +156,7 @@ public class MdbcServerLogic extends JdbcMeta{ throw new RuntimeException(e.getMessage()); } finally { connectionCache.invalidate(ch.id); - this.manager.CloseConnection(ch.id); + this.manager.closeConnection(ch.id); logger.info("connection closed with id {}", ch.id); } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java index 320531f..97f4be4 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java @@ -42,18 +42,18 @@ public class MdbcStatement implements Statement { private static final String DATASTAX_PREFIX = "com.datastax.driver"; final Statement stmt; // the Statement that we are proxying - final MusicSqlManager mgr; + final MdbcConnection mConn; //\TODO We may need to all pass the connection object to support autocommit - public MdbcStatement(Statement s, MusicSqlManager m) { + public MdbcStatement(Statement s, MdbcConnection mConn) { this.stmt = s; - this.mgr = m; + this.mConn = mConn; } - public MdbcStatement(Statement stmt, String sql, MusicSqlManager mgr) { + public MdbcStatement(Statement stmt, String sql, MdbcConnection mConn) { //\TODO why there is a constructor with a sql parameter in a not PreparedStatement this.stmt = stmt; - this.mgr = mgr; + this.mConn = mConn; } @Override @@ -73,9 +73,9 @@ public class MdbcStatement implements Statement { logger.debug(EELFLoggerDelegate.applicationLogger,"executeQuery: "+sql); ResultSet r = null; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); r = stmt.executeQuery(sql); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -92,9 +92,9 @@ public class MdbcStatement implements Statement { int n = 0; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); n = stmt.executeUpdate(sql); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -175,9 +175,9 @@ public class MdbcStatement implements Statement { boolean b = false; //\TODO Add the result of the postStatementHook to b try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); b = stmt.execute(sql); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -287,9 +287,9 @@ public class MdbcStatement implements Statement { logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql); int n = 0; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); n = stmt.executeUpdate(sql, autoGeneratedKeys); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -305,9 +305,9 @@ public class MdbcStatement implements Statement { logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql); int n = 0; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); n = stmt.executeUpdate(sql, columnIndexes); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -323,9 +323,9 @@ public class MdbcStatement implements Statement { logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql); int n = 0; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); n = stmt.executeUpdate(sql, columnNames); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -341,9 +341,9 @@ public class MdbcStatement implements Statement { logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql); boolean b = false; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); b = stmt.execute(sql, autoGeneratedKeys); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -359,9 +359,9 @@ public class MdbcStatement implements Statement { logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql); boolean b = false; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); b = stmt.execute(sql, columnIndexes); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -378,9 +378,9 @@ public class MdbcStatement implements Statement { //\TODO Idem to the other execute without columnNames boolean b = false; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); b = stmt.execute(sql, columnNames); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -423,9 +423,9 @@ public class MdbcStatement implements Statement { protected void synchronizeTables(String sql) { if (sql == null || sql.trim().toLowerCase().startsWith("create")) { - if (mgr != null) { + if (mConn != null) { try { - mgr.synchronizeTables(); + mConn.synchronizeTables(); } catch (QueryException e) { logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java deleted file mode 100755 index c2019cf..0000000 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java +++ /dev/null @@ -1,329 +0,0 @@ -/* - * ============LICENSE_START==================================================== - * org.onap.music.mdbc - * ============================================================================= - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END====================================================== - */ -package org.onap.music.mdbc; - -import java.sql.Connection; -import java.util.*; - -import org.json.JSONObject; - -import org.onap.music.mdbc.mixins.DBInterface; -import org.onap.music.mdbc.mixins.MixinFactory; -import org.onap.music.mdbc.mixins.MusicInterface; -import org.onap.music.mdbc.mixins.Utils; -import org.onap.music.mdbc.tables.StagingTable; -import org.onap.music.mdbc.tables.TxCommitProgress; -import org.onap.music.exceptions.MDBCServiceException; -import org.onap.music.exceptions.QueryException; -import org.onap.music.logging.*; -import org.onap.music.logging.format.AppMessages; -import org.onap.music.logging.format.ErrorSeverity; -import org.onap.music.logging.format.ErrorTypes; - -/** -*

-* MUSIC SQL Manager - code that helps take data written to a SQL database and seamlessly integrates it -* with MUSIC that maintains data in a No-SQL data-store -* (Cassandra) and protects access to it with a distributed -* locking service (based on Zookeeper). -*

-*

-* This code will support transactions by taking note of the value of the autoCommit flag, and of calls -* to commit() and rollback(). These calls should be made by the user's JDBC -* client. -*

-* -* @author Bharath Balasubramanian, Robert Eby -*/ -public class MusicSqlManager { - - private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicSqlManager.class); - - private final DBInterface dbi; - private final MusicInterface mi; - private final Set table_set; - private final HashMap transactionDigest; - private boolean autocommit; // a copy of the autocommit flag from the JDBC Connection - - /** - * Build a MusicSqlManager for a DB connection. This construct may only be called by getMusicSqlManager(), - * which will ensure that only one MusicSqlManager is created per URL. - * This is the location where the appropriate mixins to use for the MusicSqlManager should be determined. - * They should be picked based upon the URL and the properties passed to this constructor. - *

- * At the present time, we only support the use of the H2Mixin (for access to a local H2 database), - * with the CassandraMixin (for direct access to a Cassandra noSQL DB as the persistence layer). - *

- * - * @param url the JDBC URL which was used to connection to the database - * @param conn the actual connection to the database - * @param info properties passed from the initial JDBC connect() call - * @throws MDBCServiceException - */ - public MusicSqlManager(String url, Connection conn, Properties info, MusicInterface mi) throws MDBCServiceException { - try { - info.putAll(MDBCUtils.getMdbcProperties()); - String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT); - this.dbi = MixinFactory.createDBInterface(mixinDb, this, url, conn, info); - this.mi = mi; - this.table_set = Collections.synchronizedSet(new HashSet()); - this.autocommit = true; - this.transactionDigest = new HashMap(); - - }catch(Exception e) { - throw new MDBCServiceException(e.getMessage()); - } - } - - public void setAutoCommit(boolean b,String txId, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException { - if (b != autocommit) { - autocommit = b; - logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b); - if (b) { - // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction - if(txId == null || txId.isEmpty()) { - logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); - throw new MDBCServiceException("tx id is null"); - } - commit(txId,progressKeeper,partition); - } - } - } - - /** - * Close this MusicSqlManager. - */ - public void close() { - if (dbi != null) { - dbi.close(); - } - } - - /** - * Code to be run within the DB driver before a SQL statement is executed. This is where tables - * can be synchronized before a SELECT, for those databases that do not support SELECT triggers. - * @param sql the SQL statement that is about to be executed - */ - public void preStatementHook(final String sql) { - dbi.preStatementHook(sql); - } - /** - * Code to be run within the DB driver after a SQL statement has been executed. This is where remote - * statement actions can be copied back to Cassandra/MUSIC. - * @param sql the SQL statement that was executed - */ - public void postStatementHook(final String sql) { - dbi.postStatementHook(sql,transactionDigest); - } - /** - * Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the - * proxy first starts, and whenever there is the possibility that tables were created or dropped. It is synchronized - * in order to prevent multiple threads from running this code in parallel. - */ - public synchronized void synchronizeTables() throws QueryException { - Set set1 = dbi.getSQLTableSet(); // set of tables in the database - logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1); - for (String tableName : set1) { - // This map will be filled in if this table was previously discovered - tableName = tableName.toUpperCase(); - if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) { - logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName); - try { - TableInfo ti = dbi.getTableInfo(tableName); - mi.initializeMusicForTable(ti,tableName); - //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted - ti = dbi.getTableInfo(tableName); - mi.createDirtyRowTable(ti,tableName); - dbi.createSQLTriggers(tableName); - table_set.add(tableName); - synchronizeTableData(tableName); - logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" + - table_set.size() + "/" + set1.size() + "tables uploaded"); - } catch (Exception e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); - //logger.error(EELFLoggerDelegate.errorLogger, "Exception synchronizeTables: "+e); - throw new QueryException(); - } - } - } - -// Set set2 = getMusicTableSet(music_ns); - // not working - fix later -// for (String tbl : set2) { -// if (!set1.contains(tbl)) { -// logger.debug("Old table dropped: "+tbl); -// dropSQLTriggers(tbl, conn); -// // ZZTODO drop camunda table ? -// } -// } - } - - /** - * On startup, copy dirty data from Cassandra to H2. May not be needed. - * @param tableName - */ - public void synchronizeTableData(String tableName) { - // TODO - copy MUSIC -> H2 - dbi.synchronizeData(tableName); - } - /** - * This method is called whenever there is a SELECT on a local SQL table, and should be called by the underlying databases - * triggering mechanism. It first checks the local dirty bits table to see if there are any keys in Cassandra whose value - * has not yet been sent to SQL. If there are, the appropriate values are copied from Cassandra to the local database. - * Under normal execution, this function behaves as a NOP operation. - * @param tableName This is the table on which the SELECT is being performed - */ - public void readDirtyRowsAndUpdateDb(String tableName) { - mi.readDirtyRowsAndUpdateDb(dbi,tableName); - } - - - - - /** - * This method gets the primary key that the music interfaces uses by default. - * If the front end uses a primary key, this will not match what is used in the MUSIC interface - * @return - */ - public String getMusicDefaultPrimaryKeyName() { - return mi.getMusicDefaultPrimaryKeyName(); - } - - /** - * Asks music interface to provide the function to create a primary key - * e.g. uuid(), 1, "unique_aksd419fjc" - * @return - */ - public String generateUniqueKey() { - // - return mi.generateUniqueKey(); - } - - - /** - * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed, - * they are performed now and copied into MUSIC. - * @throws MDBCServiceException - */ - public synchronized void commit(String txId, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException { - logger.debug(EELFLoggerDelegate.applicationLogger, " commit "); - // transaction was committed -- add all the updates into the REDO-Log in MUSIC - try { - mi.commitLog(dbi, partition, transactionDigest, txId, progressKeeper); - }catch(MDBCServiceException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL); - throw e; - } - } - - /** - * Perform a rollback, as requested by the JDBC driver. If any row updates have been delayed, - * they are discarded. - */ - public synchronized void rollback() { - // transaction was rolled back - discard the updates - logger.debug(EELFLoggerDelegate.applicationLogger, "Rollback");; - transactionDigest.clear(); - } - - /** - * Get all - * @param table - * @param dbRow - * @return - */ - public String getMusicKeyFromRowWithoutPrimaryIndexes(String table, JSONObject dbRow) { - TableInfo ti = dbi.getTableInfo(table); - return mi.getMusicKeyFromRowWithoutPrimaryIndexes(ti,table, dbRow); - } - - @Deprecated - public String getMusicKeyFromRow(String table, JSONObject dbRow) { - TableInfo ti = dbi.getTableInfo(table); - return mi.getMusicKeyFromRow(ti,table, dbRow); - } - - /** - * Returns all keys that matches the current sql statement, and not in already updated keys. - * - * @param sql the query that we are getting keys for - * @deprecated - */ - public ArrayList getMusicKeys(String sql) { - ArrayList musicKeys = new ArrayList(); - //\TODO See if this is required - /* - try { - net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql); - if (stmt instanceof Insert) { - Insert s = (Insert) stmt; - String tbl = s.getTable().getName(); - musicKeys.add(generatePrimaryKey()); - } else { - String tbl; - String where = ""; - if (stmt instanceof Update){ - Update u = (Update) stmt; - tbl = u.getTables().get(0).getName(); - where = u.getWhere().toString(); - } else if (stmt instanceof Delete) { - Delete d = (Delete) stmt; - tbl = d.getTable().getName(); - if (d.getWhere()!=null) { - where = d.getWhere().toString(); - } - } else { - System.err.println("Not recognized sql type"); - tbl = ""; - } - String dbiSelect = "SELECT * FROM " + tbl; - if (!where.equals("")) { - dbiSelect += "WHERE" + where; - } - ResultSet rs = dbi.executeSQLRead(dbiSelect); - musicKeys.addAll(getMusicKeysWhere(tbl, Utils.parseResults(dbi.getTableInfo(tbl), rs))); - rs.getStatement().close(); - } - } catch (JSQLParserException | SQLException e) { - - e.printStackTrace(); - } - System.err.print("MusicKeys:"); - for(String musicKey:musicKeys) { - System.out.print(musicKey + ","); - } - */ - return musicKeys; - } - - public void own(List ranges) { - throw new java.lang.UnsupportedOperationException("function not implemented yet"); - } - - public void appendRange(String rangeId, List ranges) { - throw new java.lang.UnsupportedOperationException("function not implemented yet"); - } - - public void relinquish(String ownerId, String rangeId) { - throw new java.lang.UnsupportedOperationException("function not implemented yet"); - } - - -} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java index 49b032e..3175f94 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java @@ -60,11 +60,11 @@ public class ProxyStatement implements CallableStatement { private static final String DATASTAX_PREFIX = "com.datastax.driver"; private final Statement stmt; // the Statement that we are proxying - private final MusicSqlManager mgr; + private final MdbcConnection mConn; - public ProxyStatement(Statement s, MusicSqlManager m) { + public ProxyStatement(Statement s, MdbcConnection mConn) { this.stmt = s; - this.mgr = m; + this.mConn = mConn; } @Override @@ -82,9 +82,9 @@ public class ProxyStatement implements CallableStatement { logger.debug("executeQuery: "+sql); ResultSet r = null; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); r = stmt.executeQuery(sql); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -100,9 +100,9 @@ public class ProxyStatement implements CallableStatement { logger.debug("executeUpdate: "+sql); int n = 0; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); n = stmt.executeUpdate(sql); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -178,9 +178,9 @@ public class ProxyStatement implements CallableStatement { logger.debug("execute: "+sql); boolean b = false; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); b = stmt.execute(sql); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -289,9 +289,9 @@ public class ProxyStatement implements CallableStatement { logger.debug("executeUpdate: "+sql); int n = 0; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); n = stmt.executeUpdate(sql, autoGeneratedKeys); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -307,9 +307,9 @@ public class ProxyStatement implements CallableStatement { logger.debug("executeUpdate: "+sql); int n = 0; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); n = stmt.executeUpdate(sql, columnIndexes); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -325,9 +325,9 @@ public class ProxyStatement implements CallableStatement { logger.debug("executeUpdate: "+sql); int n = 0; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); n = stmt.executeUpdate(sql, columnNames); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -343,9 +343,9 @@ public class ProxyStatement implements CallableStatement { logger.debug("execute: "+sql); boolean b = false; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); b = stmt.execute(sql, autoGeneratedKeys); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -361,9 +361,9 @@ public class ProxyStatement implements CallableStatement { logger.debug("execute: "+sql); boolean b = false; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); b = stmt.execute(sql, columnIndexes); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -379,9 +379,9 @@ public class ProxyStatement implements CallableStatement { logger.debug("execute: "+sql); boolean b = false; try { - mgr.preStatementHook(sql); + mConn.preStatementHook(sql); b = stmt.execute(sql, columnNames); - mgr.postStatementHook(sql); + mConn.postStatementHook(sql); synchronizeTables(sql); } catch (Exception e) { String nm = e.getClass().getName(); @@ -1268,9 +1268,9 @@ public class ProxyStatement implements CallableStatement { private void synchronizeTables(String sql) { if (sql == null || sql.trim().toLowerCase().startsWith("create")) { - if (mgr != null) { + if (mConn != null) { try { - mgr.synchronizeTables(); + mConn.synchronizeTables(); } catch (QueryException e) { e.printStackTrace(); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index 4bb0c85..1f2ad91 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -27,7 +27,6 @@ import org.onap.music.logging.format.ErrorSeverity; import org.onap.music.logging.format.ErrorTypes; import org.onap.music.mdbc.mixins.MixinFactory; import org.onap.music.mdbc.mixins.MusicInterface; -import org.onap.music.mdbc.mixins.MusicMixin; import org.onap.music.mdbc.tables.MusicTxDigest; import org.onap.music.mdbc.tables.TxCommitProgress; @@ -151,7 +150,7 @@ public class StateManager { } - public void CloseConnection(String connectionId){ + public void closeConnection(String connectionId){ //\TODO check if there is a race condition if(mdbcConnections.containsKey(connectionId)) { transactionInfo.deleteTxProgress(connectionId); @@ -166,7 +165,7 @@ public class StateManager { } } - public void OpenConnection(String id, Properties information){ + public void openConnection(String id, Properties information){ if(!mdbcConnections.containsKey(id)){ Connection sqlConnection; MdbcConnection newConnection; @@ -208,7 +207,7 @@ public class StateManager { * @param id of the transaction, created using * @return */ - public Connection GetConnection(String id) { + public Connection getConnection(String id) { if(mdbcConnections.containsKey(id)) { //\TODO: Verify if this make sense // Intent: reinitialize transaction progress, when it already completed the previous tx for the same connection @@ -253,7 +252,7 @@ public class StateManager { return newConnection; } - public void InitializeSystem() { + public void initializeSystem() { //\TODO Prefetch data to system using the data ranges as guide throw new UnsupportedOperationException("Function initialize system needs to be implemented id MdbcStateManager"); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java index f3f5d22..3e3447d 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java @@ -24,7 +24,7 @@ import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.RedoRow; -import org.onap.music.mdbc.mixins.CassandraMixin; +import org.onap.music.mdbc.mixins.MusicMixin; import org.onap.music.mdbc.tables.MusicTxDigest; import com.google.gson.Gson; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java deleted file mode 100755 index eede9be..0000000 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * ============LICENSE_START==================================================== - * org.onap.music.mdbc - * ============================================================================= - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END====================================================== - */ -package org.onap.music.mdbc.examples; - -import java.sql.*; -import org.apache.calcite.avatica.remote.Driver; - -public class EtdbTestClient { - - public static class Hr { - public final Employee[] emps = { - new Employee(100, "Bill"), - new Employee(200, "Eric"), - new Employee(150, "Sebastian"), - }; - } - - public static class Employee { - public final int empid; - public final String name; - - public Employee(int empid, String name) { - this.empid = empid; - this.name = name; - } - } - - public static void main(String[] args){ - try { - Class.forName("org.apache.calcite.avatica.remote.Driver"); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - System.exit(1); - } - Connection connection; - try { - connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf"); - } catch (SQLException e) { - e.printStackTrace(); - return; - } - - try { - connection.setAutoCommit(false); - } catch (SQLException e) { - e.printStackTrace(); - return; - } - - - final String sql = "CREATE TABLE IF NOT EXISTS Persons (\n" + - " PersonID int,\n" + - " LastName varchar(255),\n" + - " FirstName varchar(255),\n" + - " Address varchar(255),\n" + - " City varchar(255),\n" + - " PRIMARY KEY (PersonID,LastName)" + - ");"; - Statement stmt; - try { - stmt = connection.createStatement(); - } catch (SQLException e) { - e.printStackTrace(); - return; - } - - boolean execute; - try { - execute = stmt.execute(sql); - } catch (SQLException e) { - e.printStackTrace(); - return; - } - - if (execute) { - try { - connection.commit(); - } catch (SQLException e) { - e.printStackTrace(); - } - } - - try { - stmt.close(); - } catch (SQLException e) { - e.printStackTrace(); - } - - final String insertSQL = "INSERT INTO Persons VALUES (1, 'Martinez', 'Juan', 'KACB', 'ATLANTA');"; - final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=1;"; - final String insertSQL2 = "INSERT INTO Persons VALUES (2, 'Smith', 'JOHN', 'GNOC', 'BEDMINSTER');"; - final String insertSQL3 = "UPDATE Persons SET FirstName='JOSH' WHERE LastName='Smith';"; - final String insertSQL4 = "UPDATE Persons SET FirstName='JOHN' WHERE LastName='Smith';"; - - - Statement insertStmt; - try { - insertStmt = connection.createStatement(); - } catch (SQLException e) { - e.printStackTrace(); - return; - } - - try { - execute = insertStmt.execute(insertSQL); - execute = insertStmt.execute(insertSQL1); - execute = insertStmt.execute(insertSQL2); - execute = insertStmt.execute(insertSQL3); - execute = insertStmt.execute(insertSQL4); - - } catch (SQLException e) { - e.printStackTrace(); - return; - } - - try { - connection.commit(); - } catch (SQLException e) { - e.printStackTrace(); - return; - } - - try { - stmt.close(); - insertStmt.close(); - } catch (SQLException e) { - e.printStackTrace(); - } - - try { - connection.commit(); - connection.close(); - } catch (SQLException e) { - e.printStackTrace(); - } - - - } -} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java new file mode 100755 index 0000000..35293ef --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java @@ -0,0 +1,155 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ +package org.onap.music.mdbc.examples; + +import java.sql.*; +import org.apache.calcite.avatica.remote.Driver; + +public class MdbcTestClient { + + public static class Hr { + public final Employee[] emps = { + new Employee(100, "Bill"), + new Employee(200, "Eric"), + new Employee(150, "Sebastian"), + }; + } + + public static class Employee { + public final int empid; + public final String name; + + public Employee(int empid, String name) { + this.empid = empid; + this.name = name; + } + } + + public static void main(String[] args){ + try { + Class.forName("org.apache.calcite.avatica.remote.Driver"); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + System.exit(1); + } + Connection connection; + try { + connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf"); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + try { + connection.setAutoCommit(false); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + + final String sql = "CREATE TABLE IF NOT EXISTS Persons (\n" + + " PersonID int,\n" + + " LastName varchar(255),\n" + + " FirstName varchar(255),\n" + + " Address varchar(255),\n" + + " City varchar(255),\n" + + " PRIMARY KEY (PersonID,LastName)" + + ");"; + Statement stmt; + try { + stmt = connection.createStatement(); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + boolean execute; + try { + execute = stmt.execute(sql); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + if (execute) { + try { + connection.commit(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + + try { + stmt.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + + final String insertSQL = "INSERT INTO Persons VALUES (1, 'Martinez', 'Juan', 'KACB', 'ATLANTA');"; + final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=1;"; + final String insertSQL2 = "INSERT INTO Persons VALUES (2, 'Smith', 'JOHN', 'GNOC', 'BEDMINSTER');"; + final String insertSQL3 = "UPDATE Persons SET FirstName='JOSH' WHERE LastName='Smith';"; + final String insertSQL4 = "UPDATE Persons SET FirstName='JOHN' WHERE LastName='Smith';"; + + + Statement insertStmt; + try { + insertStmt = connection.createStatement(); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + try { + execute = insertStmt.execute(insertSQL); + execute = insertStmt.execute(insertSQL1); + execute = insertStmt.execute(insertSQL2); + execute = insertStmt.execute(insertSQL3); + execute = insertStmt.execute(insertSQL4); + + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + try { + connection.commit(); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + try { + stmt.close(); + insertStmt.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + + try { + connection.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + + + } +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java deleted file mode 100755 index 0732dc8..0000000 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java +++ /dev/null @@ -1,308 +0,0 @@ -/* - * ============LICENSE_START==================================================== - * org.onap.music.mdbc - * ============================================================================= - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END====================================================== - */ -package org.onap.music.mdbc.mixins; - -import java.sql.Types; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.json.JSONObject; -import org.json.JSONTokener; -import org.onap.music.datastore.PreparedQueryObject; -import org.onap.music.exceptions.MDBCServiceException; -import org.onap.music.exceptions.MusicServiceException; -import org.onap.music.main.MusicCore; -import org.onap.music.main.ReturnType; - -import org.onap.music.logging.EELFLoggerDelegate; -import org.onap.music.mdbc.DatabasePartition; -import org.onap.music.mdbc.TableInfo; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; - -/** - * This class provides the methods that MDBC needs to access Cassandra directly in order to provide persistence - * to calls to the user's DB. It stores dirty row references in one table (called DIRTY____) rather than one dirty - * table per real table (as {@link org.onap.music.mdbc.mixins.CassandraMixin} does). - * - * @author Robert P. Eby - */ -public class Cassandra2Mixin extends CassandraMixin { - private static final String DIRTY_TABLE = "DIRTY____"; // it seems Cassandra won't allow __DIRTY__ - private boolean dirty_table_created = false; - - private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Cassandra2Mixin.class); - - public Cassandra2Mixin() { - super(); - } - - public Cassandra2Mixin(String url, Properties info) throws MDBCServiceException { - super(url, info); - } - - /** - * Get the name of this MusicInterface mixin object. - * @return the name - */ - @Override - public String getMixinName() { - return "cassandra2"; - } - /** - * Do what is needed to close down the MUSIC connection. - */ - @Override - public void close() { - super.close(); - } - - /** - * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables. - * The keyspace name comes from the initialization properties passed to the JDBC driver. - * @throws MusicServiceException - */ - @Override - public void createKeyspace() throws MDBCServiceException { - super.createKeyspace(); - } - - /** - * This method performs all necessary initialization in Music/Cassandra to store the table tableName. - * @param tableName the table to initialize MUSIC for - */ - @Override - public void initializeMusicForTable(TableInfo ti, String tableName) { - super.initializeMusicForTable(ti, tableName); - } - - /** - * Create a dirty row table for the real table tableName. The primary keys columns from the real table are recreated in - * the dirty table, along with a "REPLICA__" column that names the replica that should update it's internal state from MUSIC. - * @param tableName the table to create a "dirty" table for - */ - @Override - public void createDirtyRowTable(TableInfo ti, String tableName) { - if (!dirty_table_created) { - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (tablename TEXT, replica TEXT, keyset TEXT, PRIMARY KEY(tablename, replica, keyset));", music_ns, DIRTY_TABLE); - executeMusicWriteQuery(cql); - dirty_table_created = true; - } - } - /** - * Drop the dirty row table for tableName from MUSIC. - * @param tableName the table being dropped - */ - @Override - public void dropDirtyRowTable(String tableName) { - // no-op - } - - private String buildJSON(TableInfo ti, String tableName, Object[] keys) { - // Build JSON string representing this keyset - JSONObject jo = new JSONObject(); - int j = 0; - for (int i = 0; i < ti.columns.size(); i++) { - if (ti.iskey.get(i)) { - jo.put(ti.columns.get(i), keys[j++]); - } - } - return jo.toString(); - } - /** - * Remove the entries from the dirty row (for this replica) that correspond to a set of primary keys - * @param tableName the table we are removing dirty entries from - * @param keys the primary key values to use in the DELETE. Note: this is *only* the primary keys, not a full table row. - */ - @Override - public void cleanDirtyRow(TableInfo ti, String tableName, JSONObject keys) { - String cql = String.format("DELETE FROM %s.%s WHERE tablename = ? AND replica = ? AND keyset = ?;", music_ns, DIRTY_TABLE); - //Session sess = getMusicSession(); - //PreparedStatement ps = getPreparedStatementFromCache(cql); - Object[] values = new Object[] { tableName, myId, keys }; - logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql + " with values " + values[0] + " " + values[1] + " " + values[2]); - - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - pQueryObject.addValue(tableName); - pQueryObject.addValue(myId); - pQueryObject.addValue(keys); - ReturnType rt = MusicCore.eventualPut(pQueryObject); - if(rt.getResult().getResult().toLowerCase().equals("failure")) { - logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage()); - } - /*BoundStatement bound = ps.bind(values); - bound.setReadTimeoutMillis(60000); - synchronized (sess) { - sess.execute(bound); - }*/ - } - /** - * Get a list of "dirty rows" for a table. The dirty rows returned apply only to this replica, - * and consist of a Map of primary key column names and values. - * @param tableName the table we are querying for - * @return a list of maps; each list item is a map of the primary key names and values for that "dirty row". - */ - @SuppressWarnings("deprecation") - @Override - public List> getDirtyRows(TableInfo ti, String tableName) { - String cql = String.format("SELECT keyset FROM %s.%s WHERE tablename = ? AND replica = ?;", music_ns, DIRTY_TABLE); - logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql + " with values " + tableName + " " + myId); - - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - pQueryObject.addValue(tableName); - pQueryObject.addValue(myId); - ResultSet results = null; - try { - results = MusicCore.get(pQueryObject); - } catch (MusicServiceException e) { - e.printStackTrace(); - } - /*Session sess = getMusicSession(); - PreparedStatement ps = getPreparedStatementFromCache(cql); - BoundStatement bound = ps.bind(new Object[] { tableName, myId }); - bound.setReadTimeoutMillis(60000); - ResultSet results = null; - synchronized (sess) { - results = sess.execute(bound); - }*/ - List> list = new ArrayList>(); - for (Row row : results) { - String json = row.getString("keyset"); - JSONObject jo = new JSONObject(new JSONTokener(json)); - Map objs = new HashMap(); - for (String colname : jo.keySet()) { - int coltype = ti.getColType(colname); - switch (coltype) { - case Types.BIGINT: - objs.put(colname, jo.getLong(colname)); - break; - case Types.BOOLEAN: - objs.put(colname, jo.getBoolean(colname)); - break; - case Types.BLOB: - logger.error(EELFLoggerDelegate.errorLogger,"WE DO NOT SUPPORT BLOBS AS PRIMARY KEYS!! COLUMN NAME="+colname); - // throw an exception here??? - break; - case Types.DOUBLE: - objs.put(colname, jo.getDouble(colname)); - break; - case Types.INTEGER: - objs.put(colname, jo.getInt(colname)); - break; - case Types.TIMESTAMP: - objs.put(colname, new Date(jo.getString(colname))); - break; - case Types.VARCHAR: - default: - objs.put(colname, jo.getString(colname)); - break; - } - } - list.add(objs); - } - return list; - } - - /** - * Drops the named table and its dirty row table (for all replicas) from MUSIC. The dirty row table is dropped first. - * @param tableName This is the table that has been dropped - */ - @Override - public void clearMusicForTable(String tableName) { - super.clearMusicForTable(tableName); - } - /** - * This function is called whenever there is a DELETE to a row on a local SQL table, wherein it updates the - * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. MUSIC propagates - * it to the other replicas. - * - * @param tableName This is the table that has changed. - * @param oldRow This is a copy of the old row being deleted - */ - public void deleteFromEntityTableInMusic(TableInfo ti, String tableName, JSONObject oldRow) { - super.deleteFromEntityTableInMusic(ti, tableName, oldRow); - } - /** - * This method is called whenever there is a SELECT on a local SQL table, wherein it first checks the local - * dirty bits table to see if there are any keys in Cassandra whose value has not yet been sent to SQL - * @param tableName This is the table on which the select is being performed - */ - @Override - public void readDirtyRowsAndUpdateDb(DBInterface dbi, String tableName) { - super.readDirtyRowsAndUpdateDb(dbi, tableName); - } - - /** - * This method is called whenever there is an INSERT or UPDATE to a local SQL table, wherein it updates the - * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. Music propagates - * it to the other replicas. - * - * @param tableName This is the table that has changed. - * @param changedRow This is information about the row that has changed - */ - @Override - public void updateDirtyRowAndEntityTableInMusic(TableInfo ti, String tableName, JSONObject changedRow) { - super.updateDirtyRowAndEntityTableInMusic(ti, tableName, changedRow); - } - - /** - * Mark rows as "dirty" in the dirty rows table for tableName. Rows are marked for all replicas but - * this one (this replica already has the up to date data). - * @param tableName the table we are marking dirty - * @param keys an ordered list of the values being put into the table. The values that correspond to the tables' - * primary key are copied into the dirty row table. - */ - @Deprecated - public void markDirtyRow(TableInfo ti, String tableName, Object[] keys) { - String cql = String.format("INSERT INTO %s.%s (tablename, replica, keyset) VALUES (?, ?, ?);", music_ns, DIRTY_TABLE); - /*Session sess = getMusicSession(); - PreparedStatement ps = getPreparedStatementFromCache(cql);*/ - @SuppressWarnings("unused") - Object[] values = new Object[] { tableName, "", buildJSON(ti, tableName, keys) }; - PreparedQueryObject pQueryObject = null; - for (String repl : allReplicaIds) { - /*if (!repl.equals(myId)) { - values[1] = repl; - logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql + " with values " + values[0] + " " + values[1] + " " + values[2]); - - BoundStatement bound = ps.bind(values); - bound.setReadTimeoutMillis(60000); - synchronized (sess) { - sess.execute(bound); - } - }*/ - pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - pQueryObject.addValue(tableName); - pQueryObject.addValue(repl); - pQueryObject.addValue(buildJSON(ti, tableName, keys)); - ReturnType rt = MusicCore.eventualPut(pQueryObject); - if(rt.getResult().getResult().toLowerCase().equals("failure")) { - System.out.println("Failure while critical put..."+rt.getMessage()); - } - } - } -} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java deleted file mode 100755 index 3b1651f..0000000 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java +++ /dev/null @@ -1,1502 +0,0 @@ -/* - * ============LICENSE_START==================================================== - * org.onap.music.mdbc - * ============================================================================= - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END====================================================== - */ -package org.onap.music.mdbc.mixins; - -import java.io.IOException; -import java.io.Reader; -import java.nio.ByteBuffer; -import java.sql.Types; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.TreeSet; -import java.util.UUID; - -import org.onap.music.mdbc.*; -import org.onap.music.mdbc.tables.MusicTxDigestId; -import org.onap.music.mdbc.tables.StagingTable; -import org.onap.music.mdbc.tables.MriReference; -import org.onap.music.mdbc.tables.MusicRangeInformationRow; -import org.onap.music.mdbc.tables.MusicTxDigest; -import org.onap.music.mdbc.tables.TxCommitProgress; - -import org.json.JSONObject; -import org.onap.music.datastore.CassaLockStore; -import org.onap.music.datastore.PreparedQueryObject; -import org.onap.music.exceptions.MusicLockingException; -import org.onap.music.exceptions.MusicQueryException; -import org.onap.music.exceptions.MusicServiceException; -import org.onap.music.main.MusicCore; -import org.onap.music.main.MusicCore.Condition; -import org.onap.music.main.ResultType; -import org.onap.music.main.ReturnType; - -import org.onap.music.exceptions.MDBCServiceException; -import org.onap.music.logging.EELFLoggerDelegate; -import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.ColumnDefinitions; -import com.datastax.driver.core.DataType; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.TupleValue; - -/** - * This class provides the methods that MDBC needs to access Cassandra directly in order to provide persistence - * to calls to the user's DB. It does not do any table or row locking. - * - *

This code only supports the following limited list of H2 and Cassandra data types:

- * - * - * - * - * - * - * - * - * - * - *
H2 Data TypeMapped to Cassandra Data Type
BIGINTBIGINT
BOOLEANBOOLEAN
CLOBBLOB
DOUBLEDOUBLE
INTEGERINT
TIMESTAMPTIMESTAMP
VARBINARYBLOB
VARCHARVARCHAR
- * - * @author Robert P. Eby - */ -public class CassandraMixin implements MusicInterface { - /** The property name to use to identify this replica to MusicSqlManager */ - public static final String KEY_MY_ID = "myid"; - /** The property name to use for the comma-separated list of replica IDs. */ - public static final String KEY_REPLICAS = "replica_ids"; - /** The property name to use to identify the IP address for Cassandra. */ - public static final String KEY_MUSIC_ADDRESS = "cassandra.host"; - /** The property name to use to provide the replication factor for Cassandra. */ - public static final String KEY_MUSIC_RFACTOR = "music_rfactor"; - /** The property name to use to provide the replication factor for Cassandra. */ - public static final String KEY_MUSIC_NAMESPACE = "music_namespace"; - /** Namespace for the tables in MUSIC (Cassandra) */ - public static final String DEFAULT_MUSIC_NAMESPACE = "namespace"; - /** The default property value to use for the Cassandra IP address. */ - public static final String DEFAULT_MUSIC_ADDRESS = "localhost"; - /** The default property value to use for the Cassandra replication factor. */ - public static final int DEFAULT_MUSIC_RFACTOR = 1; - /** The default primary string column, if none is provided. */ - public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid"; - /** Type of the primary key, if none is defined by the user */ - public static final String MDBC_PRIMARYKEY_TYPE = "uuid"; - - - //\TODO Add logic to change the names when required and create the tables when necessary - private String musicTxDigestTableName = "musictxdigest"; - private String musicRangeInformationTableName = "musicrangeinformation"; - - private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassandraMixin.class); - - private static final Map typemap = new HashMap<>(); - static { - // We only support the following type mappings currently (from DB -> Cassandra). - // Anything else will likely cause a NullPointerException - typemap.put(Types.BIGINT, "BIGINT"); // aka. IDENTITY - typemap.put(Types.BLOB, "VARCHAR"); - typemap.put(Types.BOOLEAN, "BOOLEAN"); - typemap.put(Types.CLOB, "BLOB"); - typemap.put(Types.DATE, "VARCHAR"); - typemap.put(Types.DOUBLE, "DOUBLE"); - typemap.put(Types.DECIMAL, "DECIMAL"); - typemap.put(Types.INTEGER, "INT"); - //typemap.put(Types.TIMESTAMP, "TIMESTAMP"); - typemap.put(Types.SMALLINT, "SMALLINT"); - typemap.put(Types.TIMESTAMP, "VARCHAR"); - typemap.put(Types.VARBINARY, "BLOB"); - typemap.put(Types.VARCHAR, "VARCHAR"); - typemap.put(Types.CHAR, "VARCHAR"); - //The "Hacks", these don't have a direct mapping - //typemap.put(Types.DATE, "VARCHAR"); - //typemap.put(Types.DATE, "TIMESTAMP"); - } - - protected final String music_ns; - protected final String myId; - protected final String[] allReplicaIds; - private final String musicAddress; - private final int music_rfactor; - private MusicConnector mCon = null; - private Session musicSession = null; - private boolean keyspace_created = false; - private Map ps_cache = new HashMap<>(); - private Set in_progress = Collections.synchronizedSet(new HashSet()); - - public CassandraMixin() { - //this.logger = null; - this.musicAddress = null; - this.music_ns = null; - this.music_rfactor = 0; - this.myId = null; - this.allReplicaIds = null; - } - - public CassandraMixin(String url, Properties info) throws MDBCServiceException { - // Default values -- should be overridden in the Properties - // Default to using the host_ids of the various peers as the replica IDs (this is probably preferred) - this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS); - logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: musicAddress="+musicAddress); - - String s = info.getProperty(KEY_MUSIC_RFACTOR); - this.music_rfactor = (s == null) ? DEFAULT_MUSIC_RFACTOR : Integer.parseInt(s); - - this.myId = info.getProperty(KEY_MY_ID, getMyHostId()); - logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: myId="+myId); - - - this.allReplicaIds = info.getProperty(KEY_REPLICAS, getAllHostIds()).split(","); - logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: allReplicaIds="+info.getProperty(KEY_REPLICAS, this.myId)); - - this.music_ns = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE); - logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns); - createKeyspace(); - } - - /** - * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables. - * The keyspace name comes from the initialization properties passed to the JDBC driver. - */ - @Override - public void createKeyspace() throws MDBCServiceException { - - Map replicationInfo = new HashMap<>(); - replicationInfo.put("'class'", "'SimpleStrategy'"); - replicationInfo.put("'replication_factor'", music_rfactor); - - PreparedQueryObject queryObject = new PreparedQueryObject(); - queryObject.appendQueryString( - "CREATE KEYSPACE IF NOT EXISTS " + this.music_ns + - " WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":")); - - try { - MusicCore.nonKeyRelatedPut(queryObject, "eventual"); - } catch (MusicServiceException e) { - if (!e.getMessage().equals("Keyspace "+music_ns+" already exists")) { - throw new MDBCServiceException("Error creating namespace: "+music_ns+". Internal error:"+e.getErrorMessage()); - } - } - } - - private String getMyHostId() { - ResultSet rs = executeMusicRead("SELECT HOST_ID FROM SYSTEM.LOCAL"); - Row row = rs.one(); - return (row == null) ? "UNKNOWN" : row.getUUID("HOST_ID").toString(); - } - private String getAllHostIds() { - ResultSet results = executeMusicRead("SELECT HOST_ID FROM SYSTEM.PEERS"); - StringBuilder sb = new StringBuilder(myId); - for (Row row : results) { - sb.append(","); - sb.append(row.getUUID("HOST_ID").toString()); - } - return sb.toString(); - } - - /** - * Get the name of this MusicInterface mixin object. - * @return the name - */ - @Override - public String getMixinName() { - return "cassandra"; - } - /** - * Do what is needed to close down the MUSIC connection. - */ - @Override - public void close() { - if (musicSession != null) { - musicSession.close(); - musicSession = null; - } - } - @Override - public void initializeMetricDataStructures() throws MDBCServiceException { - try { - createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number - createMusicRangeInformationTable(); - } - catch(MDBCServiceException e){ - logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC"); - } - } - - /** - * This method performs all necessary initialization in Music/Cassandra to store the table tableName. - * @param tableName the table to initialize MUSIC for - */ - @Override - public void initializeMusicForTable(TableInfo ti, String tableName) { - /** - * This code creates two tables for every table in SQL: - * (i) a table with the exact same name as the SQL table storing the SQL data. - * (ii) a "dirty bits" table that stores the keys in the Cassandra table that are yet to be - * updated in the SQL table (they were written by some other node). - */ - StringBuilder fields = new StringBuilder(); - StringBuilder prikey = new StringBuilder(); - String pfx = "", pfx2 = ""; - for (int i = 0; i < ti.columns.size(); i++) { - fields.append(pfx) - .append(ti.columns.get(i)) - .append(" ") - .append(typemap.get(ti.coltype.get(i))); - if (ti.iskey.get(i)) { - // Primary key column - prikey.append(pfx2).append(ti.columns.get(i)); - pfx2 = ", "; - } - pfx = ", "; - } - if (prikey.length()==0) { - fields.append(pfx).append(MDBC_PRIMARYKEY_NAME) - .append(" ") - .append(MDBC_PRIMARYKEY_TYPE); - prikey.append(MDBC_PRIMARYKEY_NAME); - } - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", music_ns, tableName, fields.toString(), prikey.toString()); - executeMusicWriteQuery(cql); - } - - // ************************************************** - // Dirty Tables (in MUSIC) methods - // ************************************************** - - /** - * Create a dirty row table for the real table tableName. The primary keys columns from the real table are recreated in - * the dirty table, along with a "REPLICA__" column that names the replica that should update it's internal state from MUSIC. - * @param tableName the table to create a "dirty" table for - */ - @Override - public void createDirtyRowTable(TableInfo ti, String tableName) { - // create dirtybitsTable at all replicas -// for (String repl : allReplicaIds) { -//// String dirtyRowsTableName = "dirty_"+tableName+"_"+allReplicaIds[i]; -//// String dirtyTableQuery = "CREATE TABLE IF NOT EXISTS "+music_ns+"."+ dirtyRowsTableName+" (dirtyRowKeys text PRIMARY KEY);"; -// cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s_%s (dirtyRowKeys TEXT PRIMARY KEY);", music_ns, tableName, repl); -// executeMusicWriteQuery(cql); -// } - StringBuilder ddl = new StringBuilder("REPLICA__ TEXT"); - StringBuilder cols = new StringBuilder("REPLICA__"); - for (int i = 0; i < ti.columns.size(); i++) { - if (ti.iskey.get(i)) { - // Only use the primary keys columns in the "Dirty" table - ddl.append(", ") - .append(ti.columns.get(i)) - .append(" ") - .append(typemap.get(ti.coltype.get(i))); - cols.append(", ").append(ti.columns.get(i)); - } - } - if(cols.length()==0) { - //fixme - System.err.println("Create dirty row table found no primary key"); - } - ddl.append(", PRIMARY KEY(").append(cols).append(")"); - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s (%s);", music_ns, tableName, ddl.toString()); - executeMusicWriteQuery(cql); - } - /** - * Drop the dirty row table for tableName from MUSIC. - * @param tableName the table being dropped - */ - @Override - public void dropDirtyRowTable(String tableName) { - String cql = String.format("DROP TABLE %s.DIRTY_%s;", music_ns, tableName); - executeMusicWriteQuery(cql); - } - /** - * Mark rows as "dirty" in the dirty rows table for tableName. Rows are marked for all replicas but - * this one (this replica already has the up to date data). - * @param tableName the table we are marking dirty - * @param keys an ordered list of the values being put into the table. The values that correspond to the tables' - * primary key are copied into the dirty row table. - */ - @Override - public void markDirtyRow(TableInfo ti, String tableName, JSONObject keys) { - Object[] keyObj = getObjects(ti,tableName, keys); - StringBuilder cols = new StringBuilder("REPLICA__"); - PreparedQueryObject pQueryObject = null; - StringBuilder vals = new StringBuilder("?"); - List vallist = new ArrayList(); - vallist.add(""); // placeholder for replica - for (int i = 0; i < ti.columns.size(); i++) { - if (ti.iskey.get(i)) { - cols.append(", ").append(ti.columns.get(i)); - vals.append(", ").append("?"); - vallist.add(keyObj[i]); - } - } - if(cols.length()==0) { - //FIXME - System.err.println("markDIrtyRow need to fix primary key"); - } - String cql = String.format("INSERT INTO %s.DIRTY_%s (%s) VALUES (%s);", music_ns, tableName, cols.toString(), vals.toString()); - /*Session sess = getMusicSession(); - PreparedStatement ps = getPreparedStatementFromCache(cql);*/ - String primaryKey; - if(ti.hasKey()) { - primaryKey = getMusicKeyFromRow(ti,tableName, keys); - } - else { - primaryKey = getMusicKeyFromRowWithoutPrimaryIndexes(ti,tableName, keys); - } - System.out.println("markDirtyRow: PK value: "+primaryKey); - - Object pkObj = null; - for (int i = 0; i < ti.columns.size(); i++) { - if (ti.iskey.get(i)) { - pkObj = keyObj[i]; - } - } - for (String repl : allReplicaIds) { - pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - pQueryObject.addValue(tableName); - pQueryObject.addValue(repl); - pQueryObject.addValue(pkObj); - updateMusicDB(tableName, primaryKey, pQueryObject); - //if (!repl.equals(myId)) { - /*logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql); - vallist.set(0, repl); - BoundStatement bound = ps.bind(vallist.toArray()); - bound.setReadTimeoutMillis(60000); - synchronized (sess) { - sess.execute(bound); - }*/ - //} - - } - } - /** - * Remove the entries from the dirty row (for this replica) that correspond to a set of primary keys - * @param tableName the table we are removing dirty entries from - * @param keys the primary key values to use in the DELETE. Note: this is *only* the primary keys, not a full table row. - */ - @Override - public void cleanDirtyRow(TableInfo ti, String tableName, JSONObject keys) { - Object[] keysObjects = getObjects(ti,tableName,keys); - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - StringBuilder cols = new StringBuilder("REPLICA__=?"); - List vallist = new ArrayList(); - vallist.add(myId); - int n = 0; - for (int i = 0; i < ti.columns.size(); i++) { - if (ti.iskey.get(i)) { - cols.append(" AND ").append(ti.columns.get(i)).append("=?"); - vallist.add(keysObjects[n++]); - pQueryObject.addValue(keysObjects[n++]); - } - } - String cql = String.format("DELETE FROM %s.DIRTY_%s WHERE %s;", music_ns, tableName, cols.toString()); - logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql); - pQueryObject.appendQueryString(cql); - ReturnType rt = MusicCore.eventualPut(pQueryObject); - if(rt.getResult().getResult().toLowerCase().equals("failure")) { - System.out.println("Failure while cleanDirtyRow..."+rt.getMessage()); - } - /*Session sess = getMusicSession(); - PreparedStatement ps = getPreparedStatementFromCache(cql); - BoundStatement bound = ps.bind(vallist.toArray()); - bound.setReadTimeoutMillis(60000); - synchronized (sess) { - sess.execute(bound); - }*/ - } - /** - * Get a list of "dirty rows" for a table. The dirty rows returned apply only to this replica, - * and consist of a Map of primary key column names and values. - * @param tableName the table we are querying for - * @return a list of maps; each list item is a map of the primary key names and values for that "dirty row". - */ - @Override - public List> getDirtyRows(TableInfo ti, String tableName) { - String cql = String.format("SELECT * FROM %s.DIRTY_%s WHERE REPLICA__=?;", music_ns, tableName); - ResultSet results = null; - logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql); - - /*Session sess = getMusicSession(); - PreparedStatement ps = getPreparedStatementFromCache(cql); - BoundStatement bound = ps.bind(new Object[] { myId }); - bound.setReadTimeoutMillis(60000); - synchronized (sess) { - results = sess.execute(bound); - }*/ - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - try { - results = MusicCore.get(pQueryObject); - } catch (MusicServiceException e) { - - e.printStackTrace(); - } - - ColumnDefinitions cdef = results.getColumnDefinitions(); - List> list = new ArrayList>(); - for (Row row : results) { - Map objs = new HashMap(); - for (int i = 0; i < cdef.size(); i++) { - String colname = cdef.getName(i).toUpperCase(); - String coltype = cdef.getType(i).getName().toString().toUpperCase(); - if (!colname.equals("REPLICA__")) { - switch (coltype) { - case "BIGINT": - objs.put(colname, row.getLong(colname)); - break; - case "BOOLEAN": - objs.put(colname, row.getBool(colname)); - break; - case "BLOB": - objs.put(colname, row.getString(colname)); - break; - case "DATE": - objs.put(colname, row.getString(colname)); - break; - case "DOUBLE": - objs.put(colname, row.getDouble(colname)); - break; - case "DECIMAL": - objs.put(colname, row.getDecimal(colname)); - break; - case "INT": - objs.put(colname, row.getInt(colname)); - break; - case "TIMESTAMP": - objs.put(colname, row.getTimestamp(colname)); - break; - case "VARCHAR": - default: - objs.put(colname, row.getString(colname)); - break; - } - } - } - list.add(objs); - } - return list; - } - - /** - * Drops the named table and its dirty row table (for all replicas) from MUSIC. The dirty row table is dropped first. - * @param tableName This is the table that has been dropped - */ - @Override - public void clearMusicForTable(String tableName) { - dropDirtyRowTable(tableName); - String cql = String.format("DROP TABLE %s.%s;", music_ns, tableName); - executeMusicWriteQuery(cql); - } - /** - * This function is called whenever there is a DELETE to a row on a local SQL table, wherein it updates the - * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. MUSIC propagates - * it to the other replicas. - * - * @param tableName This is the table that has changed. - * @param oldRow This is a copy of the old row being deleted - */ - @Override - public void deleteFromEntityTableInMusic(TableInfo ti, String tableName, JSONObject oldRow) { - Object[] objects = getObjects(ti,tableName,oldRow); - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - if (ti.hasKey()) { - assert(ti.columns.size() == objects.length); - } else { - assert(ti.columns.size()+1 == objects.length); - } - - StringBuilder where = new StringBuilder(); - List vallist = new ArrayList(); - String pfx = ""; - for (int i = 0; i < ti.columns.size(); i++) { - if (ti.iskey.get(i)) { - where.append(pfx) - .append(ti.columns.get(i)) - .append("=?"); - vallist.add(objects[i]); - pQueryObject.addValue(objects[i]); - pfx = " AND "; - } - } - if (!ti.hasKey()) { - where.append(MDBC_PRIMARYKEY_NAME + "=?"); - //\FIXME this is wrong, old row is not going to contain the UUID, this needs to be fixed - vallist.add(UUID.fromString((String) objects[0])); - pQueryObject.addValue(UUID.fromString((String) objects[0])); - } - - String cql = String.format("DELETE FROM %s.%s WHERE %s;", music_ns, tableName, where.toString()); - logger.error(EELFLoggerDelegate.errorLogger,"Executing MUSIC write:"+ cql); - pQueryObject.appendQueryString(cql); - - /*PreparedStatement ps = getPreparedStatementFromCache(cql); - BoundStatement bound = ps.bind(vallist.toArray()); - bound.setReadTimeoutMillis(60000); - Session sess = getMusicSession(); - synchronized (sess) { - sess.execute(bound); - }*/ - String primaryKey = getMusicKeyFromRow(ti,tableName, oldRow); - - updateMusicDB(tableName, primaryKey, pQueryObject); - - // Mark the dirty rows in music for all the replicas but us - markDirtyRow(ti,tableName, oldRow); - } - - public Set getMusicTableSet(String ns) { - Set set = new TreeSet(); - String cql = String.format("SELECT TABLE_NAME FROM SYSTEM_SCHEMA.TABLES WHERE KEYSPACE_NAME = '%s'", ns); - ResultSet rs = executeMusicRead(cql); - for (Row row : rs) { - set.add(row.getString("TABLE_NAME").toUpperCase()); - } - return set; - } - /** - * This method is called whenever there is a SELECT on a local SQL table, wherein it first checks the local - * dirty bits table to see if there are any keys in Cassandra whose value has not yet been sent to SQL - * @param tableName This is the table on which the select is being performed - */ - @Override - public void readDirtyRowsAndUpdateDb(DBInterface dbi, String tableName) { - // Read dirty rows of this table from Music - TableInfo ti = dbi.getTableInfo(tableName); - List> objlist = getDirtyRows(ti,tableName); - PreparedQueryObject pQueryObject = null; - String pre_cql = String.format("SELECT * FROM %s.%s WHERE ", music_ns, tableName); - List vallist = new ArrayList(); - StringBuilder sb = new StringBuilder(); - //\TODO Perform a batch operation instead of each row at a time - for (Map map : objlist) { - pQueryObject = new PreparedQueryObject(); - sb.setLength(0); - vallist.clear(); - String pfx = ""; - for (String key : map.keySet()) { - sb.append(pfx).append(key).append("=?"); - vallist.add(map.get(key)); - pQueryObject.addValue(map.get(key)); - pfx = " AND "; - } - - String cql = pre_cql + sb.toString(); - System.out.println("readDirtyRowsAndUpdateDb: cql: "+cql); - pQueryObject.appendQueryString(cql); - ResultSet dirtyRows = null; - try { - //\TODO Why is this an eventual put?, this should be an atomic - dirtyRows = MusicCore.get(pQueryObject); - } catch (MusicServiceException e) { - - e.printStackTrace(); - } - /* - Session sess = getMusicSession(); - PreparedStatement ps = getPreparedStatementFromCache(cql); - BoundStatement bound = ps.bind(vallist.toArray()); - bound.setReadTimeoutMillis(60000); - ResultSet dirtyRows = null; - synchronized (sess) { - dirtyRows = sess.execute(bound); - }*/ - List rows = dirtyRows.all(); - if (rows.isEmpty()) { - // No rows, the row must have been deleted - deleteRowFromSqlDb(dbi,tableName, map); - } else { - for (Row row : rows) { - writeMusicRowToSQLDb(dbi,tableName, row); - } - } - } - } - - private void deleteRowFromSqlDb(DBInterface dbi, String tableName, Map map) { - dbi.deleteRowFromSqlDb(tableName, map); - TableInfo ti = dbi.getTableInfo(tableName); - List vallist = new ArrayList(); - for (int i = 0; i < ti.columns.size(); i++) { - if (ti.iskey.get(i)) { - String col = ti.columns.get(i); - Object val = map.get(col); - vallist.add(val); - } - } - cleanDirtyRow(ti, tableName, new JSONObject(vallist)); - } - /** - * This functions copies the contents of a row in Music into the corresponding row in the SQL table - * @param tableName This is the name of the table in both Music and swl - * @param musicRow This is the row in Music that is being copied into SQL - */ - private void writeMusicRowToSQLDb(DBInterface dbi, String tableName, Row musicRow) { - // First construct the map of columns and their values - TableInfo ti = dbi.getTableInfo(tableName); - Map map = new HashMap(); - List vallist = new ArrayList(); - String rowid = tableName; - for (String col : ti.columns) { - Object val = getValue(musicRow, col); - map.put(col, val); - if (ti.iskey(col)) { - vallist.add(val); - rowid += "_" + val.toString(); - } - } - - logger.debug("Blocking rowid: "+rowid); - in_progress.add(rowid); // Block propagation of the following INSERT/UPDATE - - dbi.insertRowIntoSqlDb(tableName, map); - - logger.debug("Unblocking rowid: "+rowid); - in_progress.remove(rowid); // Unblock propagation - -// try { -// String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString()); -// executeSQLWrite(sql); -// } catch (SQLException e) { -// logger.debug("Insert failed because row exists, do an update"); -// // TODO - rewrite this UPDATE command should not update key fields -// String sql = String.format("UPDATE %s SET (%s) = (%s) WHERE %s", tableName, fields.toString(), values.toString(), where.toString()); -// try { -// executeSQLWrite(sql); -// } catch (SQLException e1) { -// e1.printStackTrace(); -// } -// } - - ti = dbi.getTableInfo(tableName); - cleanDirtyRow(ti, tableName, new JSONObject(vallist)); - -// String selectQuery = "select "+ primaryKeyName+" FROM "+tableName+" WHERE "+primaryKeyName+"="+primaryKeyValue+";"; -// java.sql.ResultSet rs = executeSQLRead(selectQuery); -// String dbWriteQuery=null; -// try { -// if(rs.next()){//this entry is there, do an update -// dbWriteQuery = "UPDATE "+tableName+" SET "+columnNameString+" = "+ valueString +"WHERE "+primaryKeyName+"="+primaryKeyValue+";"; -// }else -// dbWriteQuery = "INSERT INTO "+tableName+" VALUES"+valueString+";"; -// executeSQLWrite(dbWriteQuery); -// } catch (SQLException e) { -// // ZZTODO Auto-generated catch block -// e.printStackTrace(); -// } - - //clean the music dirty bits table -// String dirtyRowIdsTableName = music_ns+".DIRTY_"+tableName+"_"+myId; -// String deleteQuery = "DELETE FROM "+dirtyRowIdsTableName+" WHERE dirtyRowKeys=$$"+primaryKeyValue+"$$;"; -// executeMusicWriteQuery(deleteQuery); - } - private Object getValue(Row musicRow, String colname) { - ColumnDefinitions cdef = musicRow.getColumnDefinitions(); - DataType colType; - try { - colType= cdef.getType(colname); - } - catch(IllegalArgumentException e) { - logger.warn("Colname is not part of table metadata: "+e); - throw e; - } - String typeStr = colType.getName().toString().toUpperCase(); - switch (typeStr) { - case "BIGINT": - return musicRow.getLong(colname); - case "BOOLEAN": - return musicRow.getBool(colname); - case "BLOB": - return musicRow.getString(colname); - case "DATE": - return musicRow.getString(colname); - case "DECIMAL": - return musicRow.getDecimal(colname); - case "DOUBLE": - return musicRow.getDouble(colname); - case "SMALLINT": - case "INT": - return musicRow.getInt(colname); - case "TIMESTAMP": - return musicRow.getTimestamp(colname); - case "UUID": - return musicRow.getUUID(colname); - default: - logger.error(EELFLoggerDelegate.errorLogger, "UNEXPECTED COLUMN TYPE: columname="+colname+", columntype="+typeStr); - // fall thru - case "VARCHAR": - return musicRow.getString(colname); - } - } - - /** - * This method is called whenever there is an INSERT or UPDATE to a local SQL table, wherein it updates the - * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. Music propagates - * it to the other replicas. - * - * @param tableName This is the table that has changed. - * @param changedRow This is information about the row that has changed - */ - @Override - public void updateDirtyRowAndEntityTableInMusic(TableInfo ti, String tableName, JSONObject changedRow) { - // Build the CQL command - Object[] objects = getObjects(ti,tableName,changedRow); - StringBuilder fields = new StringBuilder(); - StringBuilder values = new StringBuilder(); - String rowid = tableName; - Object[] newrow = new Object[objects.length]; - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - String pfx = ""; - int keyoffset=0; - for (int i = 0; i < objects.length; i++) { - if (!ti.hasKey() && i==0) { - //We need to tack on cassandra's uid in place of a primary key - fields.append(MDBC_PRIMARYKEY_NAME); - values.append("?"); - newrow[i] = UUID.fromString((String) objects[i]); - pQueryObject.addValue(newrow[i]); - keyoffset=-1; - pfx = ", "; - continue; - } - fields.append(pfx).append(ti.columns.get(i+keyoffset)); - values.append(pfx).append("?"); - pfx = ", "; - if (objects[i] instanceof byte[]) { - // Cassandra doesn't seem to have a Codec to translate a byte[] to a ByteBuffer - newrow[i] = ByteBuffer.wrap((byte[]) objects[i]); - pQueryObject.addValue(newrow[i]); - } else if (objects[i] instanceof Reader) { - // Cassandra doesn't seem to have a Codec to translate a Reader to a ByteBuffer either... - newrow[i] = ByteBuffer.wrap(readBytesFromReader((Reader) objects[i])); - pQueryObject.addValue(newrow[i]); - } else { - newrow[i] = objects[i]; - pQueryObject.addValue(newrow[i]); - } - if (i+keyoffset>=0 && ti.iskey.get(i+keyoffset)) { - rowid += "_" + newrow[i].toString(); - } - } - - if (in_progress.contains(rowid)) { - // This call to updateDirtyRowAndEntityTableInMusic() was called as a result of a Cassandra -> H2 update; ignore - logger.debug(EELFLoggerDelegate.applicationLogger, "updateDirtyRowAndEntityTableInMusic: bypassing MUSIC update on "+rowid); - - } else { - // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update - String cql = String.format("INSERT INTO %s.%s (%s) VALUES (%s);", music_ns, tableName, fields.toString(), values.toString()); - - pQueryObject.appendQueryString(cql); - String primaryKey = getMusicKeyFromRow(ti,tableName, changedRow); - updateMusicDB(tableName, primaryKey, pQueryObject); - - /*PreparedStatement ps = getPreparedStatementFromCache(cql); - BoundStatement bound = ps.bind(newrow); - bound.setReadTimeoutMillis(60000); - Session sess = getMusicSession(); - synchronized (sess) { - sess.execute(bound); - }*/ - // Mark the dirty rows in music for all the replicas but us - markDirtyRow(ti,tableName, changedRow); - } - } - - - - private byte[] readBytesFromReader(Reader rdr) { - StringBuilder sb = new StringBuilder(); - try { - int ch; - while ((ch = rdr.read()) >= 0) { - sb.append((char)ch); - } - } catch (IOException e) { - logger.warn("readBytesFromReader: "+e); - } - return sb.toString().getBytes(); - } - - protected PreparedStatement getPreparedStatementFromCache(String cql) { - // Note: have to hope that the Session never changes! - if (!ps_cache.containsKey(cql)) { - Session sess = getMusicSession(); - PreparedStatement ps = sess.prepare(cql); - ps_cache.put(cql, ps); - } - return ps_cache.get(cql); - } - - /** - * This method gets a connection to Music - * @return the Cassandra Session to use - */ - protected Session getMusicSession() { - // create cassandra session - if (musicSession == null) { - logger.info(EELFLoggerDelegate.applicationLogger, "Creating New Music Session"); - mCon = new MusicConnector(musicAddress); - musicSession = mCon.getSession(); - } - return musicSession; - } - - /** - * This method executes a write query in Music - * @param cql the CQL to be sent to Cassandra - */ - protected void executeMusicWriteQuery(String cql) { - logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql); - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - ReturnType rt = MusicCore.eventualPut(pQueryObject); - if(rt.getResult().getResult().toLowerCase().equals("failure")) { - logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage()); - } - /*Session sess = getMusicSession(); - SimpleStatement s = new SimpleStatement(cql); - s.setReadTimeoutMillis(60000); - synchronized (sess) { - sess.execute(s); - }*/ - } - - /** - * This method executes a read query in Music - * @param cql the CQL to be sent to Cassandra - * @return a ResultSet containing the rows returned from the query - */ - protected ResultSet executeMusicRead(String cql) { - logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql); - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - ResultSet results = null; - try { - results = MusicCore.get(pQueryObject); - } catch (MusicServiceException e) { - - e.printStackTrace(); - } - return results; - /*Session sess = getMusicSession(); - synchronized (sess) { - return sess.execute(cql); - }*/ - } - - /** - * Returns the default primary key name that this mixin uses - */ - public String getMusicDefaultPrimaryKeyName() { - return MDBC_PRIMARYKEY_NAME; - } - - /** - * Return the function for cassandra's primary key generation - */ - public String generateUniqueKey() { - return MDBCUtils.generateUniqueKey().toString(); - } - - @Override - public String getMusicKeyFromRowWithoutPrimaryIndexes(TableInfo ti, String table, JSONObject dbRow) { - //\TODO this operation is super expensive to perform, both latency and BW - // it is better to add additional where clauses, and have the primary key - // to be composed of known columns of the table - // Adding this primary indexes would be an additional burden to the developers, which spanner - // also does, but otherwise performance is really bad - // At least it should have a set of columns that are guaranteed to be unique - StringBuilder cqlOperation = new StringBuilder(); - cqlOperation.append("SELECT * FROM ") - .append(music_ns) - .append(".") - .append(table); - ResultSet musicResults = executeMusicRead(cqlOperation.toString()); - Object[] dbRowObjects = getObjects(ti,table,dbRow); - while (!musicResults.isExhausted()) { - Row musicRow = musicResults.one(); - if (rowIs(ti, musicRow, dbRowObjects)) { - return ((UUID)getValue(musicRow, MDBC_PRIMARYKEY_NAME)).toString(); - } - } - //should never reach here - return null; - } - - /** - * Checks to see if this row is in list of database entries - * @param ti - * @param musicRow - * @param dbRow - * @return - */ - private boolean rowIs(TableInfo ti, Row musicRow, Object[] dbRow) { - //System.out.println("Comparing " + musicRow.toString()); - boolean sameRow=true; - for (int i=0; i keyCols = ti.getKeyColumns(); - if(keyCols.isEmpty()){ - throw new IllegalArgumentException("Table doesn't have defined primary indexes "); - } - StringBuilder key = new StringBuilder(); - String pfx = ""; - for(String keyCol: keyCols) { - key.append(pfx); - key.append(row.get(keyCol)); - pfx = ","; - } - String keyStr = key.toString(); - return keyStr; - } - - public void updateMusicDB(String tableName, String primaryKey, PreparedQueryObject pQObject) { - if(MusicMixin.criticalTables.contains(tableName)) { - ReturnType rt = null; - try { - rt = MusicCore.atomicPut(music_ns, tableName, primaryKey, pQObject, null); - } catch (MusicLockingException e) { - e.printStackTrace(); - } catch (MusicServiceException e) { - e.printStackTrace(); - } catch (MusicQueryException e) { - e.printStackTrace(); - } - if(rt.getResult().getResult().toLowerCase().equals("failure")) { - System.out.println("Failure while critical put..."+rt.getMessage()); - } - } else { - ReturnType rt = MusicCore.eventualPut(pQObject); - if(rt.getResult().getResult().toLowerCase().equals("failure")) { - System.out.println("Failure while critical put..."+rt.getMessage()); - } - } - } - - - private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, String table, UUID redoUuid){ - PreparedQueryObject query = new PreparedQueryObject(); - StringBuilder appendBuilder = new StringBuilder(); - appendBuilder.append("UPDATE ") - .append(music_ns) - .append(".") - .append(mriTable) - .append(" SET txredolog = txredolog +[('") - .append(table) - .append("',") - .append(redoUuid) - .append(")] WHERE rangeid = ") - .append(uuid) - .append(";"); - query.appendQueryString(appendBuilder.toString()); - return query; - } - - protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException { - UUID mriIndex = partition.getMusicRangeInformationIndex(); - String lockId; - lockId = MusicCore.createLockReference(fullyQualifiedKey); - //\TODO Handle better failures to acquire locks - ReturnType lockReturn; - try { - lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId); - } catch (MusicLockingException e) { - logger.error(EELFLoggerDelegate.errorLogger, "Lock was not acquire correctly for key "+fullyQualifiedKey); - throw new MDBCServiceException("Lock was not acquire correctly for key "+fullyQualifiedKey); - } catch (MusicServiceException e) { - logger.error(EELFLoggerDelegate.errorLogger, "Error in music, when locking key: "+fullyQualifiedKey); - throw new MDBCServiceException("Error in music, when locking: "+fullyQualifiedKey); - } catch (MusicQueryException e) { - logger.error(EELFLoggerDelegate.errorLogger, "Error in executing query music, when locking key: "+fullyQualifiedKey); - throw new MDBCServiceException("Error in executing query music, when locking: "+fullyQualifiedKey); - } - //\TODO this is wrong, we should have a better way to obtain a lock forcefully, clean the queue and obtain the lock - if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) { - try { - MusicCore.forciblyReleaseLock(fullyQualifiedKey,lockId); - CassaLockStore lockingServiceHandle = MusicCore.getLockingServiceHandle(); - CassaLockStore.LockObject lockOwner = lockingServiceHandle.peekLockQueue(music_ns, - this.musicRangeInformationTableName, mriIndex.toString()); - while(lockOwner.lockRef != lockId) { - MusicCore.forciblyReleaseLock(fullyQualifiedKey, lockOwner.lockRef); - try { - lockOwner = lockingServiceHandle.peekLockQueue(music_ns, - this.musicRangeInformationTableName, mriIndex.toString()); - } catch(NullPointerException e){ - //Ignore null pointer exception - lockId = MusicCore.createLockReference(fullyQualifiedKey); - break; - } - } - lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId); - - } catch (MusicLockingException e) { - throw new MDBCServiceException("Could not lock the corresponding lock"); - } catch (MusicServiceException e) { - logger.error(EELFLoggerDelegate.errorLogger, "Error in music, when locking key: "+fullyQualifiedKey); - throw new MDBCServiceException("Error in music, when locking: "+fullyQualifiedKey); - } catch (MusicQueryException e) { - logger.error(EELFLoggerDelegate.errorLogger, "Error in executing query music, when locking key: "+fullyQualifiedKey); - throw new MDBCServiceException("Error in executing query music, when locking: "+fullyQualifiedKey); - } - } - if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) { - throw new MDBCServiceException("Could not lock the corresponding lock"); - } - //TODO: Java newbie here, verify that this lockId is actually assigned to the global DatabasePartition in the StateManager instance - partition.setLockId(lockId); - return lockId; - } - - - - protected void appendIndexToMri(String lockId, UUID commitId, UUID MriIndex) throws MDBCServiceException{ - PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MriIndex, musicTxDigestTableName, commitId); - ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, MriIndex.toString(), appendQuery, lockId, null); - if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){ - logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage()); - throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage()); - } - } - - @Override - public void commitLog(DBInterface dbi, DatabasePartition partition, HashMap transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{ - UUID mriIndex = partition.getMusicRangeInformationIndex(); - if(mriIndex==null) { - //\TODO Fetch MriIndex from the Range Information Table - throw new MDBCServiceException("TIT Index retrieval not yet implemented"); - } - String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex.toString(); - //0. See if reference to lock was already created - String lockId = partition.getLockId(); - if(lockId == null || lockId.isEmpty()) { - lockId = createAndAssignLock(fullyQualifiedMriKey,partition); - } - - UUID commitId; - //Generate a local commit id - if(progressKeeper.containsTx(txId)) { - commitId = progressKeeper.getCommitId(txId); - } - else{ - logger.error(EELFLoggerDelegate.errorLogger, "Tx with id "+txId+" was not created in the TxCommitProgress "); - throw new MDBCServiceException("Tx with id "+txId+" was not created in the TxCommitProgress "); - } - //Add creation type of transaction digest - - //1. Push new row to RRT and obtain its index - String serializedTransactionDigest; - try { - serializedTransactionDigest = MDBCUtils.toString(transactionDigest); - } catch (IOException e) { - throw new MDBCServiceException("Failed to serialized transaction digest with error "+e.toString()); - } - MusicTxDigestId digestId = new MusicTxDigestId(commitId); - addTxDigest(digestId, serializedTransactionDigest); - //2. Save RRT index to RQ - if(progressKeeper!= null) { - progressKeeper.setRecordId(txId,digestId); - } - //3. Append RRT index into the corresponding TIT row array - appendToRedoLog(mriIndex,partition,digestId); - } - - /** - * @param tableName - * @param string - * @param rowValues - * @return - */ - @SuppressWarnings("unused") - private String getUid(String tableName, String string, Object[] rowValues) { - // - // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update - String cql = String.format("SELECT * FROM %s.%s;", music_ns, tableName); - PreparedStatement ps = getPreparedStatementFromCache(cql); - BoundStatement bound = ps.bind(); - bound.setReadTimeoutMillis(60000); - Session sess = getMusicSession(); - ResultSet rs; - synchronized (sess) { - rs = sess.execute(bound); - } - - //should never reach here - logger.error(EELFLoggerDelegate.errorLogger, "Could not find the row in the primary key"); - return null; - } - - @Override - public Object[] getObjects(TableInfo ti, String tableName, JSONObject row) { - // \FIXME: we may need to add the primary key of the row if it was autogenerated by MUSIC - List cols = ti.columns; - int size = cols.size(); - boolean hasDefault = false; - if(row.has(getMusicDefaultPrimaryKeyName())) { - size++; - hasDefault = true; - } - - Object[] objects = new Object[size]; - int idx = 0; - if(hasDefault) { - objects[idx++] = row.getString(getMusicDefaultPrimaryKeyName()); - } - for(String col : ti.columns) { - objects[idx]=row.get(col); - } - return objects; - } - - @Override - public List getPartitionIndexes() { - ArrayList partitions = new ArrayList(); - String cql = String.format("SELECT rangeid FROM %s.%s", music_ns, musicRangeInformationTableName); - ResultSet rs = executeMusicRead(cql); - for (Row r: rs) { - partitions.add(r.getUUID("rangeid")); - } - return partitions; - } - - @Override - public MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException { - //TODO: verify that lock id is valid before calling the database operations function - //UUID id = partition.getMusicRangeInformationIndex(); - - String cql = String.format("SELECT * FROM %s.%s WHERE rangeid = ?;", music_ns, musicRangeInformationTableName); - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - pQueryObject.addValue(partitionIndex); - Row newRow; - try { - newRow = executeMusicUnlockedQuorumGet(pQueryObject); - } catch (MDBCServiceException e) { - logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName); - throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); - } - - List log = newRow.getList("txredolog",TupleValue.class); - List digestIds = new ArrayList<>(); - for(TupleValue t: log){ - //final String tableName = t.getString(0); - final UUID index = t.getUUID(1); - digestIds.add(new MusicTxDigestId(index)); - } - List partitions = new ArrayList<>(); - Set tables = newRow.getSet("keys",String.class); - for (String table:tables){ - partitions.add(new Range(table)); - } - return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex, ""), - digestIds, newRow.getString("ownerid"),newRow.getString("metricprocessid")); - } - - - /** - * This function creates the TransactionInformation table. It contain information related - * to the transactions happening in a given partition. - * * The schema of the table is - * * Id, uiid. - * * Partition, uuid id of the partition - * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables - * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables - * * Redo: list of uiids associated to the Redo Records Table - * - */ - private void createMusicRangeInformationTable() throws MDBCServiceException { - String tableName = this.musicRangeInformationTableName; - String priKey = "rangeid"; - StringBuilder fields = new StringBuilder(); - fields.append("rangeid uuid, "); - fields.append("keys set, "); - fields.append("ownerid text, "); - fields.append("metricprocessid text, "); - //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly - fields.append("txredolog list>> "); - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", - this.music_ns, tableName, fields, priKey); - try { - executeMusicWriteQuery(this.music_ns,tableName,cql); - } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to create transaction information table"); - throw(e); - } - } - - - @Override - public DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException { - DatabasePartition newPartition = info.getDBPartition(); - String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+newPartition.getMusicRangeInformationIndex().toString(); - String lockId = createAndAssignLock(fullyQualifiedMriKey,newPartition); - createEmptyMriRow(info.getMetricProcessId(),lockId,new ArrayList()); - throw new UnsupportedOperationException(); - } - - /** - * Creates a new empty MRI row - * @param processId id of the process that is going to own initially this. - * @return uuid associated to the new row - */ - private UUID createEmptyMriRow(String processId, String lockId, List ranges) - throws MDBCServiceException { - UUID id = MDBCUtils.generateUniqueKey(); - return createEmptyMriRow(id,processId,lockId,ranges); - } - - /** - * Creates a new empty MRI row - * @param processId id of the process that is going to own initially this. - * @return uuid associated to the new row - */ - private UUID createEmptyMriRow(UUID id, String processId, String lockId, List ranges) - throws MDBCServiceException{ - StringBuilder insert = new StringBuilder("INSERT INTO ") - .append(this.music_ns) - .append('.') - .append(this.musicRangeInformationTableName) - .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ") - .append("(") - .append(id) - .append(",{"); - boolean first=true; - for(Range r: ranges){ - if(first){ first=false; } - else { - insert.append(','); - } - insert.append("'").append(r.toString()).append("'"); - } - insert.append("},'") - .append((lockId==null)?"":lockId) - .append("','") - .append(processId) - .append("',[]);"); - PreparedQueryObject query = new PreparedQueryObject(); - query.appendQueryString(insert.toString()); - try { - executeMusicLockedPut(this.music_ns,this.musicRangeInformationTableName,id.toString(),query,lockId,null); - } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to add new row to transaction information"); - throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); - } - return id; - } - - @Override - public void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException { - PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, mriRowId, musicTxDigestTableName, newRecord.txId); - ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, mriRowId.toString(), appendQuery, partition.getLockId(), null); - if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){ - logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage()); - throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage()); - } - } - - public void createMusicTxDigest() throws MDBCServiceException { - createMusicTxDigest(-1); - } - - - /** - * This function creates the MusicTxDigest table. It contain information related to each transaction committed - * * LeaseId: id associated with the lease, text - * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later - * * TransactionDigest: text that contains all the changes in the transaction - */ - private void createMusicTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException { - String tableName = this.musicTxDigestTableName; - if(musicTxDigestTableNumber >= 0) { - tableName = tableName + - "-" + - Integer.toString(musicTxDigestTableNumber); - } - String priKey = "txid"; - StringBuilder fields = new StringBuilder(); - fields.append("txid uuid, "); - fields.append("transactiondigest text ");//notice lack of ',' - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey); - try { - executeMusicWriteQuery(this.music_ns,tableName,cql); - } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to create redo records table"); - throw(e); - } - } - - - @Override - public void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException { - //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest); - PreparedQueryObject query = new PreparedQueryObject(); - String cqlQuery = "INSERT INTO " + - this.music_ns + - '.' + - this.musicTxDigestTableName + - " (txid,transactiondigest) " + - "VALUES (" + - newId.txId + ",'" + - transactionDigest + - "');"; - query.appendQueryString(cqlQuery); - //\TODO check if I am not shooting on my own foot - try { - MusicCore.nonKeyRelatedPut(query,"critical"); - } catch (MusicServiceException e) { - logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage()); - throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString()); - } - } - - - - @Override - public HashMap getTxDigest(MusicTxDigestId id) throws MDBCServiceException { - String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName); - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - pQueryObject.addValue(id.txId); - Row newRow; - try { - newRow = executeMusicUnlockedQuorumGet(pQueryObject); - } catch (MDBCServiceException e) { - logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.txId); - throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); - } - String digest = newRow.getString("transactiondigest"); - HashMap changes; - try { - changes = (HashMap) MDBCUtils.fromString(digest); - } catch (IOException e) { - logger.error("IOException when deserializing digest failed with an invalid class for id:"+id.txId); - throw new MDBCServiceException("Deserializng digest failed with ioexception"); - } catch (ClassNotFoundException e) { - logger.error("Deserializng digest failed with an invalid class for id:"+id.txId); - throw new MDBCServiceException("Deserializng digest failed with an invalid class"); - } - return changes; - } - - @Override - public void own(List ranges){ - throw new UnsupportedOperationException(); - } - - @Override - public void appendRange(String rangeId, List ranges){ - throw new UnsupportedOperationException(); - } - - @Override - public void relinquish(String ownerId, String rangeId){ - throw new UnsupportedOperationException(); - } - - /** - * This method executes a write query in Music - * @param cql the CQL to be sent to Cassandra - */ - private static void executeMusicWriteQuery(String keyspace, String table, String cql) - throws MDBCServiceException { - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - ResultType rt = null; - try { - rt = MusicCore.createTable(keyspace,table,pQueryObject,"critical"); - } catch (MusicServiceException e) { - //\TODO: handle better, at least transform into an MDBCServiceException - e.printStackTrace(); - } - String result = rt.getResult(); - if (result==null || result.toLowerCase().equals("failure")) { - throw new MDBCServiceException("Music eventual put failed"); - } - } - - private static Row executeMusicLockedGet(String keyspace, String table, PreparedQueryObject cqlObject, String primaryKey, - String lock) - throws MDBCServiceException{ - ResultSet result; - try { - result = MusicCore.criticalGet(keyspace,table,primaryKey,cqlObject,lock); - } catch(MusicServiceException e){ - //\TODO: handle better, at least transform into an MDBCServiceException - e.printStackTrace(); - throw new MDBCServiceException("Error executing critical get"); - } - if(result.isExhausted()){ - throw new MDBCServiceException("There is not a row that matches the id "+primaryKey); - } - return result.one(); - } - - private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject) - throws MDBCServiceException{ - ResultSet result = MusicCore.quorumGet(cqlObject); - //\TODO: handle better, at least transform into an MDBCServiceException - if(result.isExhausted()){ - throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]"); - } - return result.one(); - } - - private void executeMusicLockedPut(String namespace, String tableName, - String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId, - MusicCore.Condition conditionInfo) throws MDBCServiceException { - ReturnType rt ; - if(lockId==null) { - try { - rt = MusicCore.atomicPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, conditionInfo); - } catch (MusicLockingException e) { - logger.error("Music locked put failed"); - throw new MDBCServiceException("Music locked put failed"); - } catch (MusicServiceException e) { - logger.error("Music service fail: Music locked put failed"); - throw new MDBCServiceException("Music service fail: Music locked put failed"); - } catch (MusicQueryException e) { - logger.error("Music query fail: locked put failed"); - throw new MDBCServiceException("Music query fail: Music locked put failed"); - } - } - else { - rt = MusicCore.criticalPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, lockId, conditionInfo); - } - if (rt.getResult().getResult().toLowerCase().equals("failure")) { - throw new MDBCServiceException("Music locked put failed"); - } - } -} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java index 3d85a27..0f05734 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java @@ -25,8 +25,6 @@ import java.sql.Connection; import java.util.Properties; import org.onap.music.logging.EELFLoggerDelegate; -import org.onap.music.mdbc.DatabasePartition; -import org.onap.music.mdbc.MusicSqlManager; /** * This class is used to construct instances of Mixins that implement either the {@link org.onap.music.mdbc.mixins.DBInterface} @@ -50,7 +48,7 @@ public class MixinFactory { * @param info the Properties to use as an argument to the constructor * @return the newly constructed DBInterface, or null if one cannot be found. */ - public static DBInterface createDBInterface(String name, MusicSqlManager msm, String url, Connection conn, Properties info) { + public static DBInterface createDBInterface(String name, MusicInterface mi, String url, Connection conn, Properties info) { for (Class cl : Utils.getClassesImplementing(DBInterface.class)) { try { Constructor con = cl.getConstructor(); @@ -59,10 +57,10 @@ public class MixinFactory { String miname = dbi.getMixinName(); logger.info(EELFLoggerDelegate.applicationLogger,"Checking "+miname); if (miname.equalsIgnoreCase(name)) { - con = cl.getConstructor(MusicSqlManager.class, String.class, Connection.class, Properties.class); + con = cl.getConstructor(MusicInterface.class, String.class, Connection.class, Properties.class); if (con != null) { logger.info(EELFLoggerDelegate.applicationLogger,"Found match: "+miname); - return (DBInterface) con.newInstance(msm, url, conn, info); + return (DBInterface) con.newInstance(mi, url, conn, info); } } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java new file mode 100755 index 0000000..8181159 --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java @@ -0,0 +1,308 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ +package org.onap.music.mdbc.mixins; + +import java.sql.Types; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.json.JSONObject; +import org.json.JSONTokener; +import org.onap.music.datastore.PreparedQueryObject; +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.exceptions.MusicServiceException; +import org.onap.music.main.MusicCore; +import org.onap.music.main.ReturnType; + +import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.mdbc.DatabasePartition; +import org.onap.music.mdbc.TableInfo; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; + +/** + * This class provides the methods that MDBC needs to access Cassandra directly in order to provide persistence + * to calls to the user's DB. It stores dirty row references in one table (called DIRTY____) rather than one dirty + * table per real table (as {@link org.onap.music.mdbc.mixins.MusicMixin} does). + * + * @author Robert P. Eby + */ +public class Music2Mixin extends MusicMixin { + private static final String DIRTY_TABLE = "DIRTY____"; // it seems Cassandra won't allow __DIRTY__ + private boolean dirty_table_created = false; + + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Music2Mixin.class); + + public Music2Mixin() { + super(); + } + + public Music2Mixin(String url, Properties info) throws MDBCServiceException { + super(url, info); + } + + /** + * Get the name of this MusicInterface mixin object. + * @return the name + */ + @Override + public String getMixinName() { + return "cassandra2"; + } + /** + * Do what is needed to close down the MUSIC connection. + */ + @Override + public void close() { + super.close(); + } + + /** + * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables. + * The keyspace name comes from the initialization properties passed to the JDBC driver. + * @throws MusicServiceException + */ + @Override + public void createKeyspace() throws MDBCServiceException { + super.createKeyspace(); + } + + /** + * This method performs all necessary initialization in Music/Cassandra to store the table tableName. + * @param tableName the table to initialize MUSIC for + */ + @Override + public void initializeMusicForTable(TableInfo ti, String tableName) { + super.initializeMusicForTable(ti, tableName); + } + + /** + * Create a dirty row table for the real table tableName. The primary keys columns from the real table are recreated in + * the dirty table, along with a "REPLICA__" column that names the replica that should update it's internal state from MUSIC. + * @param tableName the table to create a "dirty" table for + */ + @Override + public void createDirtyRowTable(TableInfo ti, String tableName) { + if (!dirty_table_created) { + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (tablename TEXT, replica TEXT, keyset TEXT, PRIMARY KEY(tablename, replica, keyset));", music_ns, DIRTY_TABLE); + executeMusicWriteQuery(cql); + dirty_table_created = true; + } + } + /** + * Drop the dirty row table for tableName from MUSIC. + * @param tableName the table being dropped + */ + @Override + public void dropDirtyRowTable(String tableName) { + // no-op + } + + private String buildJSON(TableInfo ti, String tableName, Object[] keys) { + // Build JSON string representing this keyset + JSONObject jo = new JSONObject(); + int j = 0; + for (int i = 0; i < ti.columns.size(); i++) { + if (ti.iskey.get(i)) { + jo.put(ti.columns.get(i), keys[j++]); + } + } + return jo.toString(); + } + /** + * Remove the entries from the dirty row (for this replica) that correspond to a set of primary keys + * @param tableName the table we are removing dirty entries from + * @param keys the primary key values to use in the DELETE. Note: this is *only* the primary keys, not a full table row. + */ + @Override + public void cleanDirtyRow(TableInfo ti, String tableName, JSONObject keys) { + String cql = String.format("DELETE FROM %s.%s WHERE tablename = ? AND replica = ? AND keyset = ?;", music_ns, DIRTY_TABLE); + //Session sess = getMusicSession(); + //PreparedStatement ps = getPreparedStatementFromCache(cql); + Object[] values = new Object[] { tableName, myId, keys }; + logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql + " with values " + values[0] + " " + values[1] + " " + values[2]); + + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + pQueryObject.addValue(tableName); + pQueryObject.addValue(myId); + pQueryObject.addValue(keys); + ReturnType rt = MusicCore.eventualPut(pQueryObject); + if(rt.getResult().getResult().toLowerCase().equals("failure")) { + logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage()); + } + /*BoundStatement bound = ps.bind(values); + bound.setReadTimeoutMillis(60000); + synchronized (sess) { + sess.execute(bound); + }*/ + } + /** + * Get a list of "dirty rows" for a table. The dirty rows returned apply only to this replica, + * and consist of a Map of primary key column names and values. + * @param tableName the table we are querying for + * @return a list of maps; each list item is a map of the primary key names and values for that "dirty row". + */ + @SuppressWarnings("deprecation") + @Override + public List> getDirtyRows(TableInfo ti, String tableName) { + String cql = String.format("SELECT keyset FROM %s.%s WHERE tablename = ? AND replica = ?;", music_ns, DIRTY_TABLE); + logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql + " with values " + tableName + " " + myId); + + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + pQueryObject.addValue(tableName); + pQueryObject.addValue(myId); + ResultSet results = null; + try { + results = MusicCore.get(pQueryObject); + } catch (MusicServiceException e) { + e.printStackTrace(); + } + /*Session sess = getMusicSession(); + PreparedStatement ps = getPreparedStatementFromCache(cql); + BoundStatement bound = ps.bind(new Object[] { tableName, myId }); + bound.setReadTimeoutMillis(60000); + ResultSet results = null; + synchronized (sess) { + results = sess.execute(bound); + }*/ + List> list = new ArrayList>(); + for (Row row : results) { + String json = row.getString("keyset"); + JSONObject jo = new JSONObject(new JSONTokener(json)); + Map objs = new HashMap(); + for (String colname : jo.keySet()) { + int coltype = ti.getColType(colname); + switch (coltype) { + case Types.BIGINT: + objs.put(colname, jo.getLong(colname)); + break; + case Types.BOOLEAN: + objs.put(colname, jo.getBoolean(colname)); + break; + case Types.BLOB: + logger.error(EELFLoggerDelegate.errorLogger,"WE DO NOT SUPPORT BLOBS AS PRIMARY KEYS!! COLUMN NAME="+colname); + // throw an exception here??? + break; + case Types.DOUBLE: + objs.put(colname, jo.getDouble(colname)); + break; + case Types.INTEGER: + objs.put(colname, jo.getInt(colname)); + break; + case Types.TIMESTAMP: + objs.put(colname, new Date(jo.getString(colname))); + break; + case Types.VARCHAR: + default: + objs.put(colname, jo.getString(colname)); + break; + } + } + list.add(objs); + } + return list; + } + + /** + * Drops the named table and its dirty row table (for all replicas) from MUSIC. The dirty row table is dropped first. + * @param tableName This is the table that has been dropped + */ + @Override + public void clearMusicForTable(String tableName) { + super.clearMusicForTable(tableName); + } + /** + * This function is called whenever there is a DELETE to a row on a local SQL table, wherein it updates the + * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. MUSIC propagates + * it to the other replicas. + * + * @param tableName This is the table that has changed. + * @param oldRow This is a copy of the old row being deleted + */ + public void deleteFromEntityTableInMusic(TableInfo ti, String tableName, JSONObject oldRow) { + super.deleteFromEntityTableInMusic(ti, tableName, oldRow); + } + /** + * This method is called whenever there is a SELECT on a local SQL table, wherein it first checks the local + * dirty bits table to see if there are any keys in Cassandra whose value has not yet been sent to SQL + * @param tableName This is the table on which the select is being performed + */ + @Override + public void readDirtyRowsAndUpdateDb(DBInterface dbi, String tableName) { + super.readDirtyRowsAndUpdateDb(dbi, tableName); + } + + /** + * This method is called whenever there is an INSERT or UPDATE to a local SQL table, wherein it updates the + * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. Music propagates + * it to the other replicas. + * + * @param tableName This is the table that has changed. + * @param changedRow This is information about the row that has changed + */ + @Override + public void updateDirtyRowAndEntityTableInMusic(TableInfo ti, String tableName, JSONObject changedRow) { + super.updateDirtyRowAndEntityTableInMusic(ti, tableName, changedRow); + } + + /** + * Mark rows as "dirty" in the dirty rows table for tableName. Rows are marked for all replicas but + * this one (this replica already has the up to date data). + * @param tableName the table we are marking dirty + * @param keys an ordered list of the values being put into the table. The values that correspond to the tables' + * primary key are copied into the dirty row table. + */ + @Deprecated + public void markDirtyRow(TableInfo ti, String tableName, Object[] keys) { + String cql = String.format("INSERT INTO %s.%s (tablename, replica, keyset) VALUES (?, ?, ?);", music_ns, DIRTY_TABLE); + /*Session sess = getMusicSession(); + PreparedStatement ps = getPreparedStatementFromCache(cql);*/ + @SuppressWarnings("unused") + Object[] values = new Object[] { tableName, "", buildJSON(ti, tableName, keys) }; + PreparedQueryObject pQueryObject = null; + for (String repl : allReplicaIds) { + /*if (!repl.equals(myId)) { + values[1] = repl; + logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql + " with values " + values[0] + " " + values[1] + " " + values[2]); + + BoundStatement bound = ps.bind(values); + bound.setReadTimeoutMillis(60000); + synchronized (sess) { + sess.execute(bound); + } + }*/ + pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + pQueryObject.addValue(tableName); + pQueryObject.addValue(repl); + pQueryObject.addValue(buildJSON(ti, tableName, keys)); + ReturnType rt = MusicCore.eventualPut(pQueryObject); + if(rt.getResult().getResult().toLowerCase().equals("failure")) { + System.out.println("Failure while critical put..."+rt.getMessage()); + } + } + } +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java index 52b3036..5b10a73 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java @@ -162,14 +162,13 @@ public interface MusicInterface { /** * Commits the corresponding REDO-log into MUSIC * - * @param dbi, the database interface use in the local SQL cache, where the music interface is being used * @param partition * @param transactionDigest digest of the transaction that is being committed into the Redo log in music. It has to be a HashMap, because it is required to be serializable * @param txId id associated with the log being send * @param progressKeeper data structure that is used to handle to detect failures, and know what to do * @throws MDBCServiceException */ - void commitLog(DBInterface dbi, DatabasePartition partition, HashMap transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException; + void commitLog(DatabasePartition partition, HashMap transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException; MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index a7ea680..258ea4f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -20,211 +20,1488 @@ package org.onap.music.mdbc.mixins; import java.io.IOException; -import java.io.InputStream; -import java.util.*; +import java.io.Reader; +import java.nio.ByteBuffer; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; -import org.onap.music.mdbc.LockId; -import org.json.JSONObject; -import org.onap.music.exceptions.MusicLockingException; - -import org.onap.music.exceptions.MDBCServiceException; -import org.onap.music.mdbc.DatabasePartition; -import org.onap.music.mdbc.Range; -import org.onap.music.mdbc.TableInfo; +import org.onap.music.mdbc.*; import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.mdbc.tables.MusicTxDigest; import org.onap.music.mdbc.tables.TxCommitProgress; +import org.json.JSONObject; +import org.onap.music.datastore.CassaLockStore; +import org.onap.music.datastore.PreparedQueryObject; +import org.onap.music.exceptions.MusicLockingException; +import org.onap.music.exceptions.MusicQueryException; +import org.onap.music.exceptions.MusicServiceException; import org.onap.music.main.MusicCore; +import org.onap.music.main.MusicCore.Condition; +import org.onap.music.main.ResultType; +import org.onap.music.main.ReturnType; -/** +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.logging.EELFLoggerDelegate; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TupleValue; +/** + * This class provides the methods that MDBC needs to access Cassandra directly in order to provide persistence + * to calls to the user's DB. It does not do any table or row locking. + * + *

This code only supports the following limited list of H2 and Cassandra data types:

+ * + * + * + * + * + * + * + * + * + * + *
H2 Data TypeMapped to Cassandra Data Type
BIGINTBIGINT
BOOLEANBOOLEAN
CLOBBLOB
DOUBLEDOUBLE
INTEGERINT
TIMESTAMPTIMESTAMP
VARBINARYBLOB
VARCHARVARCHAR
* + * @author Robert P. Eby */ public class MusicMixin implements MusicInterface { + /** The property name to use to identify this replica to MusicSqlManager */ + public static final String KEY_MY_ID = "myid"; + /** The property name to use for the comma-separated list of replica IDs. */ + public static final String KEY_REPLICAS = "replica_ids"; + /** The property name to use to identify the IP address for Cassandra. */ + public static final String KEY_MUSIC_ADDRESS = "cassandra.host"; + /** The property name to use to provide the replication factor for Cassandra. */ + public static final String KEY_MUSIC_RFACTOR = "music_rfactor"; + /** The property name to use to provide the replication factor for Cassandra. */ + public static final String KEY_MUSIC_NAMESPACE = "music_namespace"; + /** Namespace for the tables in MUSIC (Cassandra) */ + public static final String DEFAULT_MUSIC_NAMESPACE = "namespace"; + /** The default property value to use for the Cassandra IP address. */ + public static final String DEFAULT_MUSIC_ADDRESS = "localhost"; + /** The default property value to use for the Cassandra replication factor. */ + public static final int DEFAULT_MUSIC_RFACTOR = 1; + /** The default primary string column, if none is provided. */ + public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid"; + /** Type of the primary key, if none is defined by the user */ + public static final String MDBC_PRIMARYKEY_TYPE = "uuid"; + + + //\TODO Add logic to change the names when required and create the tables when necessary + private String musicTxDigestTableName = "musictxdigest"; + private String musicRangeInformationTableName = "musicrangeinformation"; - public static Map> currentLockMap = new HashMap<>(); - public static List criticalTables = new ArrayList<>(); - - @Override - public String getMixinName() { - // - return null; + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicMixin.class); + + private static final Map typemap = new HashMap<>(); + static { + // We only support the following type mappings currently (from DB -> Cassandra). + // Anything else will likely cause a NullPointerException + typemap.put(Types.BIGINT, "BIGINT"); // aka. IDENTITY + typemap.put(Types.BLOB, "VARCHAR"); + typemap.put(Types.BOOLEAN, "BOOLEAN"); + typemap.put(Types.CLOB, "BLOB"); + typemap.put(Types.DATE, "VARCHAR"); + typemap.put(Types.DOUBLE, "DOUBLE"); + typemap.put(Types.DECIMAL, "DECIMAL"); + typemap.put(Types.INTEGER, "INT"); + //typemap.put(Types.TIMESTAMP, "TIMESTAMP"); + typemap.put(Types.SMALLINT, "SMALLINT"); + typemap.put(Types.TIMESTAMP, "VARCHAR"); + typemap.put(Types.VARBINARY, "BLOB"); + typemap.put(Types.VARCHAR, "VARCHAR"); + typemap.put(Types.CHAR, "VARCHAR"); + //The "Hacks", these don't have a direct mapping + //typemap.put(Types.DATE, "VARCHAR"); + //typemap.put(Types.DATE, "TIMESTAMP"); } - @Override - public String getMusicDefaultPrimaryKeyName() { - // - return null; - } + protected final String music_ns; + protected final String myId; + protected final String[] allReplicaIds; + private final String musicAddress; + private final int music_rfactor; + private MusicConnector mCon = null; + private Session musicSession = null; + private boolean keyspace_created = false; + private Map ps_cache = new HashMap<>(); + private Set in_progress = Collections.synchronizedSet(new HashSet()); + public MusicMixin() { + //this.logger = null; + this.musicAddress = null; + this.music_ns = null; + this.music_rfactor = 0; + this.myId = null; + this.allReplicaIds = null; + } + + public MusicMixin(String url, Properties info) throws MDBCServiceException { + // Default values -- should be overridden in the Properties + // Default to using the host_ids of the various peers as the replica IDs (this is probably preferred) + this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS); + logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: musicAddress="+musicAddress); + + String s = info.getProperty(KEY_MUSIC_RFACTOR); + this.music_rfactor = (s == null) ? DEFAULT_MUSIC_RFACTOR : Integer.parseInt(s); + + this.myId = info.getProperty(KEY_MY_ID, getMyHostId()); + logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: myId="+myId); + + + this.allReplicaIds = info.getProperty(KEY_REPLICAS, getAllHostIds()).split(","); + logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: allReplicaIds="+info.getProperty(KEY_REPLICAS, this.myId)); + + this.music_ns = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE); + logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns); + createKeyspace(); + } + + /** + * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables. + * The keyspace name comes from the initialization properties passed to the JDBC driver. + */ @Override - public String generateUniqueKey() { - // - return null; + public void createKeyspace() throws MDBCServiceException { + + Map replicationInfo = new HashMap<>(); + replicationInfo.put("'class'", "'SimpleStrategy'"); + replicationInfo.put("'replication_factor'", music_rfactor); + + PreparedQueryObject queryObject = new PreparedQueryObject(); + queryObject.appendQueryString( + "CREATE KEYSPACE IF NOT EXISTS " + this.music_ns + + " WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":")); + + try { + MusicCore.nonKeyRelatedPut(queryObject, "eventual"); + } catch (MusicServiceException e) { + if (!e.getMessage().equals("Keyspace "+music_ns+" already exists")) { + throw new MDBCServiceException("Error creating namespace: "+music_ns+". Internal error:"+e.getErrorMessage()); + } + } + } + + private String getMyHostId() { + ResultSet rs = executeMusicRead("SELECT HOST_ID FROM SYSTEM.LOCAL"); + Row row = rs.one(); + return (row == null) ? "UNKNOWN" : row.getUUID("HOST_ID").toString(); + } + private String getAllHostIds() { + ResultSet results = executeMusicRead("SELECT HOST_ID FROM SYSTEM.PEERS"); + StringBuilder sb = new StringBuilder(myId); + for (Row row : results) { + sb.append(","); + sb.append(row.getUUID("HOST_ID").toString()); + } + return sb.toString(); } + /** + * Get the name of this MusicInterface mixin object. + * @return the name + */ @Override - public String getMusicKeyFromRow(TableInfo ti, String table, JSONObject dbRow) { - // - return null; + public String getMixinName() { + return "cassandra"; } - + /** + * Do what is needed to close down the MUSIC connection. + */ @Override public void close() { - // - + if (musicSession != null) { + musicSession.close(); + musicSession = null; + } } - @Override - public void createKeyspace() { - // - + public void initializeMetricDataStructures() throws MDBCServiceException { + try { + createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number + createMusicRangeInformationTable(); + } + catch(MDBCServiceException e){ + logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC"); + } } + /** + * This method performs all necessary initialization in Music/Cassandra to store the table tableName. + * @param tableName the table to initialize MUSIC for + */ @Override public void initializeMusicForTable(TableInfo ti, String tableName) { - // - + /** + * This code creates two tables for every table in SQL: + * (i) a table with the exact same name as the SQL table storing the SQL data. + * (ii) a "dirty bits" table that stores the keys in the Cassandra table that are yet to be + * updated in the SQL table (they were written by some other node). + */ + StringBuilder fields = new StringBuilder(); + StringBuilder prikey = new StringBuilder(); + String pfx = "", pfx2 = ""; + for (int i = 0; i < ti.columns.size(); i++) { + fields.append(pfx) + .append(ti.columns.get(i)) + .append(" ") + .append(typemap.get(ti.coltype.get(i))); + if (ti.iskey.get(i)) { + // Primary key column + prikey.append(pfx2).append(ti.columns.get(i)); + pfx2 = ", "; + } + pfx = ", "; + } + if (prikey.length()==0) { + fields.append(pfx).append(MDBC_PRIMARYKEY_NAME) + .append(" ") + .append(MDBC_PRIMARYKEY_TYPE); + prikey.append(MDBC_PRIMARYKEY_NAME); + } + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", music_ns, tableName, fields.toString(), prikey.toString()); + executeMusicWriteQuery(cql); } + // ************************************************** + // Dirty Tables (in MUSIC) methods + // ************************************************** + + /** + * Create a dirty row table for the real table tableName. The primary keys columns from the real table are recreated in + * the dirty table, along with a "REPLICA__" column that names the replica that should update it's internal state from MUSIC. + * @param tableName the table to create a "dirty" table for + */ @Override public void createDirtyRowTable(TableInfo ti, String tableName) { - // - + // create dirtybitsTable at all replicas +// for (String repl : allReplicaIds) { +//// String dirtyRowsTableName = "dirty_"+tableName+"_"+allReplicaIds[i]; +//// String dirtyTableQuery = "CREATE TABLE IF NOT EXISTS "+music_ns+"."+ dirtyRowsTableName+" (dirtyRowKeys text PRIMARY KEY);"; +// cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s_%s (dirtyRowKeys TEXT PRIMARY KEY);", music_ns, tableName, repl); +// executeMusicWriteQuery(cql); +// } + StringBuilder ddl = new StringBuilder("REPLICA__ TEXT"); + StringBuilder cols = new StringBuilder("REPLICA__"); + for (int i = 0; i < ti.columns.size(); i++) { + if (ti.iskey.get(i)) { + // Only use the primary keys columns in the "Dirty" table + ddl.append(", ") + .append(ti.columns.get(i)) + .append(" ") + .append(typemap.get(ti.coltype.get(i))); + cols.append(", ").append(ti.columns.get(i)); + } + } + if(cols.length()==0) { + //fixme + System.err.println("Create dirty row table found no primary key"); + } + ddl.append(", PRIMARY KEY(").append(cols).append(")"); + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s (%s);", music_ns, tableName, ddl.toString()); + executeMusicWriteQuery(cql); } - + /** + * Drop the dirty row table for tableName from MUSIC. + * @param tableName the table being dropped + */ @Override public void dropDirtyRowTable(String tableName) { - // - + String cql = String.format("DROP TABLE %s.DIRTY_%s;", music_ns, tableName); + executeMusicWriteQuery(cql); } - - @Override - public void clearMusicForTable(String tableName) { - // - - } - + /** + * Mark rows as "dirty" in the dirty rows table for tableName. Rows are marked for all replicas but + * this one (this replica already has the up to date data). + * @param tableName the table we are marking dirty + * @param keys an ordered list of the values being put into the table. The values that correspond to the tables' + * primary key are copied into the dirty row table. + */ @Override public void markDirtyRow(TableInfo ti, String tableName, JSONObject keys) { - // - + Object[] keyObj = getObjects(ti,tableName, keys); + StringBuilder cols = new StringBuilder("REPLICA__"); + PreparedQueryObject pQueryObject = null; + StringBuilder vals = new StringBuilder("?"); + List vallist = new ArrayList(); + vallist.add(""); // placeholder for replica + for (int i = 0; i < ti.columns.size(); i++) { + if (ti.iskey.get(i)) { + cols.append(", ").append(ti.columns.get(i)); + vals.append(", ").append("?"); + vallist.add(keyObj[i]); + } + } + if(cols.length()==0) { + //FIXME + System.err.println("markDIrtyRow need to fix primary key"); + } + String cql = String.format("INSERT INTO %s.DIRTY_%s (%s) VALUES (%s);", music_ns, tableName, cols.toString(), vals.toString()); + /*Session sess = getMusicSession(); + PreparedStatement ps = getPreparedStatementFromCache(cql);*/ + String primaryKey; + if(ti.hasKey()) { + primaryKey = getMusicKeyFromRow(ti,tableName, keys); + } + else { + primaryKey = getMusicKeyFromRowWithoutPrimaryIndexes(ti,tableName, keys); + } + System.out.println("markDirtyRow: PK value: "+primaryKey); + + Object pkObj = null; + for (int i = 0; i < ti.columns.size(); i++) { + if (ti.iskey.get(i)) { + pkObj = keyObj[i]; + } + } + for (String repl : allReplicaIds) { + pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + pQueryObject.addValue(tableName); + pQueryObject.addValue(repl); + pQueryObject.addValue(pkObj); + updateMusicDB(tableName, primaryKey, pQueryObject); + //if (!repl.equals(myId)) { + /*logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql); + vallist.set(0, repl); + BoundStatement bound = ps.bind(vallist.toArray()); + bound.setReadTimeoutMillis(60000); + synchronized (sess) { + sess.execute(bound); + }*/ + //} + + } } - + /** + * Remove the entries from the dirty row (for this replica) that correspond to a set of primary keys + * @param tableName the table we are removing dirty entries from + * @param keys the primary key values to use in the DELETE. Note: this is *only* the primary keys, not a full table row. + */ @Override public void cleanDirtyRow(TableInfo ti, String tableName, JSONObject keys) { - // - + Object[] keysObjects = getObjects(ti,tableName,keys); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + StringBuilder cols = new StringBuilder("REPLICA__=?"); + List vallist = new ArrayList(); + vallist.add(myId); + int n = 0; + for (int i = 0; i < ti.columns.size(); i++) { + if (ti.iskey.get(i)) { + cols.append(" AND ").append(ti.columns.get(i)).append("=?"); + vallist.add(keysObjects[n++]); + pQueryObject.addValue(keysObjects[n++]); + } + } + String cql = String.format("DELETE FROM %s.DIRTY_%s WHERE %s;", music_ns, tableName, cols.toString()); + logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql); + pQueryObject.appendQueryString(cql); + ReturnType rt = MusicCore.eventualPut(pQueryObject); + if(rt.getResult().getResult().toLowerCase().equals("failure")) { + System.out.println("Failure while cleanDirtyRow..."+rt.getMessage()); + } + /*Session sess = getMusicSession(); + PreparedStatement ps = getPreparedStatementFromCache(cql); + BoundStatement bound = ps.bind(vallist.toArray()); + bound.setReadTimeoutMillis(60000); + synchronized (sess) { + sess.execute(bound); + }*/ } - + /** + * Get a list of "dirty rows" for a table. The dirty rows returned apply only to this replica, + * and consist of a Map of primary key column names and values. + * @param tableName the table we are querying for + * @return a list of maps; each list item is a map of the primary key names and values for that "dirty row". + */ @Override - public List> getDirtyRows(TableInfo ti, String tableName) { - // - return null; + public List> getDirtyRows(TableInfo ti, String tableName) { + String cql = String.format("SELECT * FROM %s.DIRTY_%s WHERE REPLICA__=?;", music_ns, tableName); + ResultSet results = null; + logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql); + + /*Session sess = getMusicSession(); + PreparedStatement ps = getPreparedStatementFromCache(cql); + BoundStatement bound = ps.bind(new Object[] { myId }); + bound.setReadTimeoutMillis(60000); + synchronized (sess) { + results = sess.execute(bound); + }*/ + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + try { + results = MusicCore.get(pQueryObject); + } catch (MusicServiceException e) { + + e.printStackTrace(); + } + + ColumnDefinitions cdef = results.getColumnDefinitions(); + List> list = new ArrayList>(); + for (Row row : results) { + Map objs = new HashMap(); + for (int i = 0; i < cdef.size(); i++) { + String colname = cdef.getName(i).toUpperCase(); + String coltype = cdef.getType(i).getName().toString().toUpperCase(); + if (!colname.equals("REPLICA__")) { + switch (coltype) { + case "BIGINT": + objs.put(colname, row.getLong(colname)); + break; + case "BOOLEAN": + objs.put(colname, row.getBool(colname)); + break; + case "BLOB": + objs.put(colname, row.getString(colname)); + break; + case "DATE": + objs.put(colname, row.getString(colname)); + break; + case "DOUBLE": + objs.put(colname, row.getDouble(colname)); + break; + case "DECIMAL": + objs.put(colname, row.getDecimal(colname)); + break; + case "INT": + objs.put(colname, row.getInt(colname)); + break; + case "TIMESTAMP": + objs.put(colname, row.getTimestamp(colname)); + break; + case "VARCHAR": + default: + objs.put(colname, row.getString(colname)); + break; + } + } + } + list.add(objs); + } + return list; } + /** + * Drops the named table and its dirty row table (for all replicas) from MUSIC. The dirty row table is dropped first. + * @param tableName This is the table that has been dropped + */ + @Override + public void clearMusicForTable(String tableName) { + dropDirtyRowTable(tableName); + String cql = String.format("DROP TABLE %s.%s;", music_ns, tableName); + executeMusicWriteQuery(cql); + } + /** + * This function is called whenever there is a DELETE to a row on a local SQL table, wherein it updates the + * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. MUSIC propagates + * it to the other replicas. + * + * @param tableName This is the table that has changed. + * @param oldRow This is a copy of the old row being deleted + */ @Override public void deleteFromEntityTableInMusic(TableInfo ti, String tableName, JSONObject oldRow) { - // + Object[] objects = getObjects(ti,tableName,oldRow); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + if (ti.hasKey()) { + assert(ti.columns.size() == objects.length); + } else { + assert(ti.columns.size()+1 == objects.length); + } + + StringBuilder where = new StringBuilder(); + List vallist = new ArrayList(); + String pfx = ""; + for (int i = 0; i < ti.columns.size(); i++) { + if (ti.iskey.get(i)) { + where.append(pfx) + .append(ti.columns.get(i)) + .append("=?"); + vallist.add(objects[i]); + pQueryObject.addValue(objects[i]); + pfx = " AND "; + } + } + if (!ti.hasKey()) { + where.append(MDBC_PRIMARYKEY_NAME + "=?"); + //\FIXME this is wrong, old row is not going to contain the UUID, this needs to be fixed + vallist.add(UUID.fromString((String) objects[0])); + pQueryObject.addValue(UUID.fromString((String) objects[0])); + } + + String cql = String.format("DELETE FROM %s.%s WHERE %s;", music_ns, tableName, where.toString()); + logger.error(EELFLoggerDelegate.errorLogger,"Executing MUSIC write:"+ cql); + pQueryObject.appendQueryString(cql); + + /*PreparedStatement ps = getPreparedStatementFromCache(cql); + BoundStatement bound = ps.bind(vallist.toArray()); + bound.setReadTimeoutMillis(60000); + Session sess = getMusicSession(); + synchronized (sess) { + sess.execute(bound); + }*/ + String primaryKey = getMusicKeyFromRow(ti,tableName, oldRow); + updateMusicDB(tableName, primaryKey, pQueryObject); + + // Mark the dirty rows in music for all the replicas but us + markDirtyRow(ti,tableName, oldRow); } + public Set getMusicTableSet(String ns) { + Set set = new TreeSet(); + String cql = String.format("SELECT TABLE_NAME FROM SYSTEM_SCHEMA.TABLES WHERE KEYSPACE_NAME = '%s'", ns); + ResultSet rs = executeMusicRead(cql); + for (Row row : rs) { + set.add(row.getString("TABLE_NAME").toUpperCase()); + } + return set; + } + /** + * This method is called whenever there is a SELECT on a local SQL table, wherein it first checks the local + * dirty bits table to see if there are any keys in Cassandra whose value has not yet been sent to SQL + * @param tableName This is the table on which the select is being performed + */ @Override public void readDirtyRowsAndUpdateDb(DBInterface dbi, String tableName) { - // + // Read dirty rows of this table from Music + TableInfo ti = dbi.getTableInfo(tableName); + List> objlist = getDirtyRows(ti,tableName); + PreparedQueryObject pQueryObject = null; + String pre_cql = String.format("SELECT * FROM %s.%s WHERE ", music_ns, tableName); + List vallist = new ArrayList(); + StringBuilder sb = new StringBuilder(); + //\TODO Perform a batch operation instead of each row at a time + for (Map map : objlist) { + pQueryObject = new PreparedQueryObject(); + sb.setLength(0); + vallist.clear(); + String pfx = ""; + for (String key : map.keySet()) { + sb.append(pfx).append(key).append("=?"); + vallist.add(map.get(key)); + pQueryObject.addValue(map.get(key)); + pfx = " AND "; + } + + String cql = pre_cql + sb.toString(); + System.out.println("readDirtyRowsAndUpdateDb: cql: "+cql); + pQueryObject.appendQueryString(cql); + ResultSet dirtyRows = null; + try { + //\TODO Why is this an eventual put?, this should be an atomic + dirtyRows = MusicCore.get(pQueryObject); + } catch (MusicServiceException e) { + + e.printStackTrace(); + } + /* + Session sess = getMusicSession(); + PreparedStatement ps = getPreparedStatementFromCache(cql); + BoundStatement bound = ps.bind(vallist.toArray()); + bound.setReadTimeoutMillis(60000); + ResultSet dirtyRows = null; + synchronized (sess) { + dirtyRows = sess.execute(bound); + }*/ + List rows = dirtyRows.all(); + if (rows.isEmpty()) { + // No rows, the row must have been deleted + deleteRowFromSqlDb(dbi,tableName, map); + } else { + for (Row row : rows) { + writeMusicRowToSQLDb(dbi,tableName, row); + } + } + } + } + + private void deleteRowFromSqlDb(DBInterface dbi, String tableName, Map map) { + dbi.deleteRowFromSqlDb(tableName, map); + TableInfo ti = dbi.getTableInfo(tableName); + List vallist = new ArrayList(); + for (int i = 0; i < ti.columns.size(); i++) { + if (ti.iskey.get(i)) { + String col = ti.columns.get(i); + Object val = map.get(col); + vallist.add(val); + } + } + cleanDirtyRow(ti, tableName, new JSONObject(vallist)); + } + /** + * This functions copies the contents of a row in Music into the corresponding row in the SQL table + * @param tableName This is the name of the table in both Music and swl + * @param musicRow This is the row in Music that is being copied into SQL + */ + private void writeMusicRowToSQLDb(DBInterface dbi, String tableName, Row musicRow) { + // First construct the map of columns and their values + TableInfo ti = dbi.getTableInfo(tableName); + Map map = new HashMap(); + List vallist = new ArrayList(); + String rowid = tableName; + for (String col : ti.columns) { + Object val = getValue(musicRow, col); + map.put(col, val); + if (ti.iskey(col)) { + vallist.add(val); + rowid += "_" + val.toString(); + } + } + + logger.debug("Blocking rowid: "+rowid); + in_progress.add(rowid); // Block propagation of the following INSERT/UPDATE + + dbi.insertRowIntoSqlDb(tableName, map); + + logger.debug("Unblocking rowid: "+rowid); + in_progress.remove(rowid); // Unblock propagation + +// try { +// String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString()); +// executeSQLWrite(sql); +// } catch (SQLException e) { +// logger.debug("Insert failed because row exists, do an update"); +// // TODO - rewrite this UPDATE command should not update key fields +// String sql = String.format("UPDATE %s SET (%s) = (%s) WHERE %s", tableName, fields.toString(), values.toString(), where.toString()); +// try { +// executeSQLWrite(sql); +// } catch (SQLException e1) { +// e1.printStackTrace(); +// } +// } + + ti = dbi.getTableInfo(tableName); + cleanDirtyRow(ti, tableName, new JSONObject(vallist)); + +// String selectQuery = "select "+ primaryKeyName+" FROM "+tableName+" WHERE "+primaryKeyName+"="+primaryKeyValue+";"; +// java.sql.ResultSet rs = executeSQLRead(selectQuery); +// String dbWriteQuery=null; +// try { +// if(rs.next()){//this entry is there, do an update +// dbWriteQuery = "UPDATE "+tableName+" SET "+columnNameString+" = "+ valueString +"WHERE "+primaryKeyName+"="+primaryKeyValue+";"; +// }else +// dbWriteQuery = "INSERT INTO "+tableName+" VALUES"+valueString+";"; +// executeSQLWrite(dbWriteQuery); +// } catch (SQLException e) { +// // ZZTODO Auto-generated catch block +// e.printStackTrace(); +// } + //clean the music dirty bits table +// String dirtyRowIdsTableName = music_ns+".DIRTY_"+tableName+"_"+myId; +// String deleteQuery = "DELETE FROM "+dirtyRowIdsTableName+" WHERE dirtyRowKeys=$$"+primaryKeyValue+"$$;"; +// executeMusicWriteQuery(deleteQuery); + } + private Object getValue(Row musicRow, String colname) { + ColumnDefinitions cdef = musicRow.getColumnDefinitions(); + DataType colType; + try { + colType= cdef.getType(colname); + } + catch(IllegalArgumentException e) { + logger.warn("Colname is not part of table metadata: "+e); + throw e; + } + String typeStr = colType.getName().toString().toUpperCase(); + switch (typeStr) { + case "BIGINT": + return musicRow.getLong(colname); + case "BOOLEAN": + return musicRow.getBool(colname); + case "BLOB": + return musicRow.getString(colname); + case "DATE": + return musicRow.getString(colname); + case "DECIMAL": + return musicRow.getDecimal(colname); + case "DOUBLE": + return musicRow.getDouble(colname); + case "SMALLINT": + case "INT": + return musicRow.getInt(colname); + case "TIMESTAMP": + return musicRow.getTimestamp(colname); + case "UUID": + return musicRow.getUUID(colname); + default: + logger.error(EELFLoggerDelegate.errorLogger, "UNEXPECTED COLUMN TYPE: columname="+colname+", columntype="+typeStr); + // fall thru + case "VARCHAR": + return musicRow.getString(colname); + } } + /** + * This method is called whenever there is an INSERT or UPDATE to a local SQL table, wherein it updates the + * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. Music propagates + * it to the other replicas. + * + * @param tableName This is the table that has changed. + * @param changedRow This is information about the row that has changed + */ @Override public void updateDirtyRowAndEntityTableInMusic(TableInfo ti, String tableName, JSONObject changedRow) { - updateDirtyRowAndEntityTableInMusic(tableName, changedRow, false); + // Build the CQL command + Object[] objects = getObjects(ti,tableName,changedRow); + StringBuilder fields = new StringBuilder(); + StringBuilder values = new StringBuilder(); + String rowid = tableName; + Object[] newrow = new Object[objects.length]; + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + String pfx = ""; + int keyoffset=0; + for (int i = 0; i < objects.length; i++) { + if (!ti.hasKey() && i==0) { + //We need to tack on cassandra's uid in place of a primary key + fields.append(MDBC_PRIMARYKEY_NAME); + values.append("?"); + newrow[i] = UUID.fromString((String) objects[i]); + pQueryObject.addValue(newrow[i]); + keyoffset=-1; + pfx = ", "; + continue; + } + fields.append(pfx).append(ti.columns.get(i+keyoffset)); + values.append(pfx).append("?"); + pfx = ", "; + if (objects[i] instanceof byte[]) { + // Cassandra doesn't seem to have a Codec to translate a byte[] to a ByteBuffer + newrow[i] = ByteBuffer.wrap((byte[]) objects[i]); + pQueryObject.addValue(newrow[i]); + } else if (objects[i] instanceof Reader) { + // Cassandra doesn't seem to have a Codec to translate a Reader to a ByteBuffer either... + newrow[i] = ByteBuffer.wrap(readBytesFromReader((Reader) objects[i])); + pQueryObject.addValue(newrow[i]); + } else { + newrow[i] = objects[i]; + pQueryObject.addValue(newrow[i]); + } + if (i+keyoffset>=0 && ti.iskey.get(i+keyoffset)) { + rowid += "_" + newrow[i].toString(); + } + } + if (in_progress.contains(rowid)) { + // This call to updateDirtyRowAndEntityTableInMusic() was called as a result of a Cassandra -> H2 update; ignore + logger.debug(EELFLoggerDelegate.applicationLogger, "updateDirtyRowAndEntityTableInMusic: bypassing MUSIC update on "+rowid); + + } else { + // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update + String cql = String.format("INSERT INTO %s.%s (%s) VALUES (%s);", music_ns, tableName, fields.toString(), values.toString()); + + pQueryObject.appendQueryString(cql); + String primaryKey = getMusicKeyFromRow(ti,tableName, changedRow); + updateMusicDB(tableName, primaryKey, pQueryObject); + + /*PreparedStatement ps = getPreparedStatementFromCache(cql); + BoundStatement bound = ps.bind(newrow); + bound.setReadTimeoutMillis(60000); + Session sess = getMusicSession(); + synchronized (sess) { + sess.execute(bound); + }*/ + // Mark the dirty rows in music for all the replicas but us + markDirtyRow(ti,tableName, changedRow); + } } + - public void updateDirtyRowAndEntityTableInMusic(String tableName, JSONObject changedRow, boolean isCritical) { - } - public static void releaseZKLocks(Set lockIds) { - for (LockId lockId : lockIds) { - System.out.println("Releasing lock: " + lockId); - try { - MusicCore.voluntaryReleaseLock(lockId.getFullyQualifiedLockKey(), lockId.getLockReference()); - MusicCore.destroyLockRef(lockId.getFullyQualifiedLockKey(), lockId.getLockReference()); - } catch (MusicLockingException e) { - e.printStackTrace(); + private byte[] readBytesFromReader(Reader rdr) { + StringBuilder sb = new StringBuilder(); + try { + int ch; + while ((ch = rdr.read()) >= 0) { + sb.append((char)ch); } + } catch (IOException e) { + logger.warn("readBytesFromReader: "+e); } + return sb.toString().getBytes(); } - @Override - public String getMusicKeyFromRowWithoutPrimaryIndexes(TableInfo ti, String tableName, JSONObject changedRow) { - // - return null; + protected PreparedStatement getPreparedStatementFromCache(String cql) { + // Note: have to hope that the Session never changes! + if (!ps_cache.containsKey(cql)) { + Session sess = getMusicSession(); + PreparedStatement ps = sess.prepare(cql); + ps_cache.put(cql, ps); + } + return ps_cache.get(cql); } - @Override - public void initializeMetricDataStructures() { - // + /** + * This method gets a connection to Music + * @return the Cassandra Session to use + */ + protected Session getMusicSession() { + // create cassandra session + if (musicSession == null) { + logger.info(EELFLoggerDelegate.applicationLogger, "Creating New Music Session"); + mCon = new MusicConnector(musicAddress); + musicSession = mCon.getSession(); + } + return musicSession; + } + + /** + * This method executes a write query in Music + * @param cql the CQL to be sent to Cassandra + */ + protected void executeMusicWriteQuery(String cql) { + logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + ReturnType rt = MusicCore.eventualPut(pQueryObject); + if(rt.getResult().getResult().toLowerCase().equals("failure")) { + logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage()); + } + /*Session sess = getMusicSession(); + SimpleStatement s = new SimpleStatement(cql); + s.setReadTimeoutMillis(60000); + synchronized (sess) { + sess.execute(s); + }*/ + } + + /** + * This method executes a read query in Music + * @param cql the CQL to be sent to Cassandra + * @return a ResultSet containing the rows returned from the query + */ + protected ResultSet executeMusicRead(String cql) { + logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + ResultSet results = null; + try { + results = MusicCore.get(pQueryObject); + } catch (MusicServiceException e) { + + e.printStackTrace(); + } + return results; + /*Session sess = getMusicSession(); + synchronized (sess) { + return sess.execute(cql); + }*/ + } + /** + * Returns the default primary key name that this mixin uses + */ + public String getMusicDefaultPrimaryKeyName() { + return MDBC_PRIMARYKEY_NAME; } + /** + * Return the function for cassandra's primary key generation + */ + public String generateUniqueKey() { + return MDBCUtils.generateUniqueKey().toString(); + } + @Override - public Object[] getObjects(TableInfo ti, String tableName, JSONObject row) { + public String getMusicKeyFromRowWithoutPrimaryIndexes(TableInfo ti, String table, JSONObject dbRow) { + //\TODO this operation is super expensive to perform, both latency and BW + // it is better to add additional where clauses, and have the primary key + // to be composed of known columns of the table + // Adding this primary indexes would be an additional burden to the developers, which spanner + // also does, but otherwise performance is really bad + // At least it should have a set of columns that are guaranteed to be unique + StringBuilder cqlOperation = new StringBuilder(); + cqlOperation.append("SELECT * FROM ") + .append(music_ns) + .append(".") + .append(table); + ResultSet musicResults = executeMusicRead(cqlOperation.toString()); + Object[] dbRowObjects = getObjects(ti,table,dbRow); + while (!musicResults.isExhausted()) { + Row musicRow = musicResults.one(); + if (rowIs(ti, musicRow, dbRowObjects)) { + return ((UUID)getValue(musicRow, MDBC_PRIMARYKEY_NAME)).toString(); + } + } + //should never reach here return null; } + + /** + * Checks to see if this row is in list of database entries + * @param ti + * @param musicRow + * @param dbRow + * @return + */ + private boolean rowIs(TableInfo ti, Row musicRow, Object[] dbRow) { + //System.out.println("Comparing " + musicRow.toString()); + boolean sameRow=true; + for (int i=0; i transactionDigest, String txId, TxCommitProgress progressKeeper) - throws MDBCServiceException { - // TODO Auto-generated method stub + public String getMusicKeyFromRow(TableInfo ti, String tableName, JSONObject row) { + List keyCols = ti.getKeyColumns(); + if(keyCols.isEmpty()){ + throw new IllegalArgumentException("Table doesn't have defined primary indexes "); + } + StringBuilder key = new StringBuilder(); + String pfx = ""; + for(String keyCol: keyCols) { + key.append(pfx); + key.append(row.get(keyCol)); + pfx = ","; + } + String keyStr = key.toString(); + return keyStr; } - @Override - public HashMap getTxDigest(MusicTxDigestId id) { - return null; + public void updateMusicDB(String tableName, String primaryKey, PreparedQueryObject pQObject) { + ReturnType rt = MusicCore.eventualPut(pQObject); + if(rt.getResult().getResult().toLowerCase().equals("failure")) { + System.out.println("Failure while critical put..."+rt.getMessage()); + } } + /** + * Build a preparedQueryObject that appends a transaction to the mriTable + * @param mriTable + * @param uuid + * @param table + * @param redoUuid + * @return + */ + private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, String table, UUID redoUuid){ + PreparedQueryObject query = new PreparedQueryObject(); + StringBuilder appendBuilder = new StringBuilder(); + appendBuilder.append("UPDATE ") + .append(music_ns) + .append(".") + .append(mriTable) + .append(" SET txredolog = txredolog +[('") + .append(table) + .append("',") + .append(redoUuid) + .append(")] WHERE rangeid = ") + .append(uuid) + .append(";"); + query.appendQueryString(appendBuilder.toString()); + return query; + } + + protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException { + UUID mriIndex = partition.getMusicRangeInformationIndex(); + String lockId; + lockId = MusicCore.createLockReference(fullyQualifiedKey); + //\TODO Handle better failures to acquire locks + ReturnType lockReturn; + try { + lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId); + } catch (MusicLockingException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Lock was not acquire correctly for key "+fullyQualifiedKey); + throw new MDBCServiceException("Lock was not acquire correctly for key "+fullyQualifiedKey); + } catch (MusicServiceException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Error in music, when locking key: "+fullyQualifiedKey); + throw new MDBCServiceException("Error in music, when locking: "+fullyQualifiedKey); + } catch (MusicQueryException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Error in executing query music, when locking key: "+fullyQualifiedKey); + throw new MDBCServiceException("Error in executing query music, when locking: "+fullyQualifiedKey); + } + //\TODO this is wrong, we should have a better way to obtain a lock forcefully, clean the queue and obtain the lock + if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) { + try { + MusicCore.forciblyReleaseLock(fullyQualifiedKey,lockId); + CassaLockStore lockingServiceHandle = MusicCore.getLockingServiceHandle(); + CassaLockStore.LockObject lockOwner = lockingServiceHandle.peekLockQueue(music_ns, + this.musicRangeInformationTableName, mriIndex.toString()); + while(lockOwner.lockRef != lockId) { + MusicCore.forciblyReleaseLock(fullyQualifiedKey, lockOwner.lockRef); + try { + lockOwner = lockingServiceHandle.peekLockQueue(music_ns, + this.musicRangeInformationTableName, mriIndex.toString()); + } catch(NullPointerException e){ + //Ignore null pointer exception + lockId = MusicCore.createLockReference(fullyQualifiedKey); + break; + } + } + lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId); + + } catch (MusicLockingException e) { + throw new MDBCServiceException("Could not lock the corresponding lock"); + } catch (MusicServiceException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Error in music, when locking key: "+fullyQualifiedKey); + throw new MDBCServiceException("Error in music, when locking: "+fullyQualifiedKey); + } catch (MusicQueryException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Error in executing query music, when locking key: "+fullyQualifiedKey); + throw new MDBCServiceException("Error in executing query music, when locking: "+fullyQualifiedKey); + } + } + if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) { + throw new MDBCServiceException("Could not lock the corresponding lock"); + } + //TODO: Java newbie here, verify that this lockId is actually assigned to the global DatabasePartition in the StateManager instance + partition.setLockId(lockId); + return lockId; + } + + + protected void appendIndexToMri(String lockId, UUID commitId, UUID MriIndex) throws MDBCServiceException{ + PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MriIndex, musicTxDigestTableName, commitId); + ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, MriIndex.toString(), appendQuery, lockId, null); + if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){ + logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage()); + throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage()); + } + } + + /** + * Writes the transaction information to metric's txDigest and musicRangeInformation table + * This officially commits the transaction globally + */ @Override - public DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) { + public void commitLog(DatabasePartition partition, + HashMap transactionDigest, String txId , + TxCommitProgress progressKeeper) throws MDBCServiceException { + if (transactionDigest==null || transactionDigest.size()==0) { + return; + } + logger.info("Commiting lock for " + partition.getMusicRangeInformationIndex() + ", txID=" + txId); + UUID mriIndex = partition.getMusicRangeInformationIndex(); + if(mriIndex==null) { + //\TODO Fetch MriIndex from the Range Information Table + throw new MDBCServiceException("TIT Index retrieval not yet implemented"); + } + String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex.toString(); + //0. See if reference to lock was already created + String lockId = partition.getLockId(); + if(lockId == null || lockId.isEmpty()) { + lockId = createAndAssignLock(fullyQualifiedMriKey,partition); + } + + UUID commitId; + //Generate a local commit id + if(progressKeeper.containsTx(txId)) { + commitId = progressKeeper.getCommitId(txId); + } + else{ + logger.error(EELFLoggerDelegate.errorLogger, "Tx with id "+txId+" was not created in the TxCommitProgress "); + throw new MDBCServiceException("Tx with id "+txId+" was not created in the TxCommitProgress "); + } + //Add creation type of transaction digest + + //1. Push new row to RRT and obtain its index + String serializedTransactionDigest; + try { + serializedTransactionDigest = MDBCUtils.toString(transactionDigest); + } catch (IOException e) { + throw new MDBCServiceException("Failed to serialized transaction digest with error "+e.toString()); + } + MusicTxDigestId digestId = new MusicTxDigestId(commitId); + addTxDigest(digestId, serializedTransactionDigest); + //2. Save RRT index to RQ + if(progressKeeper!= null) { + progressKeeper.setRecordId(txId,digestId); + } + //3. Append RRT index into the corresponding TIT row array + appendToRedoLog(mriIndex,partition,digestId); + } + + /** + * @param tableName + * @param string + * @param rowValues + * @return + */ + @SuppressWarnings("unused") + private String getUid(String tableName, String string, Object[] rowValues) { + // + // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update + String cql = String.format("SELECT * FROM %s.%s;", music_ns, tableName); + PreparedStatement ps = getPreparedStatementFromCache(cql); + BoundStatement bound = ps.bind(); + bound.setReadTimeoutMillis(60000); + Session sess = getMusicSession(); + ResultSet rs; + synchronized (sess) { + rs = sess.execute(bound); + } + + //should never reach here + logger.error(EELFLoggerDelegate.errorLogger, "Could not find the row in the primary key"); return null; } @Override - public void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) { - } + public Object[] getObjects(TableInfo ti, String tableName, JSONObject row) { + // \FIXME: we may need to add the primary key of the row if it was autogenerated by MUSIC + List cols = ti.columns; + int size = cols.size(); + boolean hasDefault = false; + if(row.has(getMusicDefaultPrimaryKeyName())) { + size++; + hasDefault = true; + } - @Override - public void addTxDigest(MusicTxDigestId newId, String transactionDigest) { + Object[] objects = new Object[size]; + int idx = 0; + if(hasDefault) { + objects[idx++] = row.getString(getMusicDefaultPrimaryKeyName()); + } + for(String col : ti.columns) { + objects[idx]=row.get(col); + } + return objects; } - + @Override - public void own(List ranges) { - throw new java.lang.UnsupportedOperationException("function not implemented yet"); + public List getPartitionIndexes() { + ArrayList partitions = new ArrayList(); + String cql = String.format("SELECT rangeid FROM %s.%s", music_ns, musicRangeInformationTableName); + ResultSet rs = executeMusicRead(cql); + for (Row r: rs) { + partitions.add(r.getUUID("rangeid")); + } + return partitions; + } + + @Override + public MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException { + //TODO: verify that lock id is valid before calling the database operations function + //UUID id = partition.getMusicRangeInformationIndex(); + + String cql = String.format("SELECT * FROM %s.%s WHERE rangeid = ?;", music_ns, musicRangeInformationTableName); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + pQueryObject.addValue(partitionIndex); + Row newRow; + try { + newRow = executeMusicUnlockedQuorumGet(pQueryObject); + } catch (MDBCServiceException e) { + logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName); + throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); + } + + List log = newRow.getList("txredolog",TupleValue.class); + List digestIds = new ArrayList<>(); + for(TupleValue t: log){ + //final String tableName = t.getString(0); + final UUID index = t.getUUID(1); + digestIds.add(new MusicTxDigestId(index)); + } + List partitions = new ArrayList<>(); + Set tables = newRow.getSet("keys",String.class); + for (String table:tables){ + partitions.add(new Range(table)); + } + return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex, ""), + digestIds, newRow.getString("ownerid"),newRow.getString("metricprocessid")); + } + + + /** + * This function creates the TransactionInformation table. It contain information related + * to the transactions happening in a given partition. + * * The schema of the table is + * * Id, uiid. + * * Partition, uuid id of the partition + * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables + * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables + * * Redo: list of uiids associated to the Redo Records Table + * + */ + private void createMusicRangeInformationTable() throws MDBCServiceException { + String tableName = this.musicRangeInformationTableName; + String priKey = "rangeid"; + StringBuilder fields = new StringBuilder(); + fields.append("rangeid uuid, "); + fields.append("keys set, "); + fields.append("ownerid text, "); + fields.append("metricprocessid text, "); + //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly + fields.append("txredolog list>> "); + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", + this.music_ns, tableName, fields, priKey); + try { + executeMusicWriteQuery(this.music_ns,tableName,cql); + } catch (MDBCServiceException e) { + logger.error("Initialization error: Failure to create transaction information table"); + throw(e); + } + } + + + @Override + public DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException { + DatabasePartition newPartition = info.getDBPartition(); + String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+newPartition.getMusicRangeInformationIndex().toString(); + String lockId = createAndAssignLock(fullyQualifiedMriKey,newPartition); + createEmptyMriRow(info.getMetricProcessId(),lockId,new ArrayList()); + throw new UnsupportedOperationException(); + } + + /** + * Creates a new empty MRI row + * @param processId id of the process that is going to own initially this. + * @return uuid associated to the new row + */ + private UUID createEmptyMriRow(String processId, String lockId, List ranges) + throws MDBCServiceException { + UUID id = MDBCUtils.generateUniqueKey(); + return createEmptyMriRow(id,processId,lockId,ranges); + } + + /** + * Creates a new empty MRI row + * @param processId id of the process that is going to own initially this. + * @return uuid associated to the new row + */ + private UUID createEmptyMriRow(UUID id, String processId, String lockId, List ranges) + throws MDBCServiceException{ + StringBuilder insert = new StringBuilder("INSERT INTO ") + .append(this.music_ns) + .append('.') + .append(this.musicRangeInformationTableName) + .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ") + .append("(") + .append(id) + .append(",{"); + boolean first=true; + for(Range r: ranges){ + if(first){ first=false; } + else { + insert.append(','); + } + insert.append("'").append(r.toString()).append("'"); + } + insert.append("},'") + .append((lockId==null)?"":lockId) + .append("','") + .append(processId) + .append("',[]);"); + PreparedQueryObject query = new PreparedQueryObject(); + query.appendQueryString(insert.toString()); + try { + executeMusicLockedPut(this.music_ns,this.musicRangeInformationTableName,id.toString(),query,lockId,null); + } catch (MDBCServiceException e) { + logger.error("Initialization error: Failure to add new row to transaction information"); + throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); + } + return id; + } + + @Override + public void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException { + logger.info("Appending to redo log for partition " + partition.getMusicRangeInformationIndex() + " txId=" + newRecord.txId); + PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, mriRowId, musicTxDigestTableName, newRecord.txId); + ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, mriRowId.toString(), appendQuery, partition.getLockId(), null); + if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){ + logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage()); + throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage()); + } + } + + public void createMusicTxDigest() throws MDBCServiceException { + createMusicTxDigest(-1); + } + + + /** + * This function creates the MusicTxDigest table. It contain information related to each transaction committed + * * LeaseId: id associated with the lease, text + * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later + * * TransactionDigest: text that contains all the changes in the transaction + */ + private void createMusicTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException { + String tableName = this.musicTxDigestTableName; + if(musicTxDigestTableNumber >= 0) { + tableName = tableName + + "-" + + Integer.toString(musicTxDigestTableNumber); + } + String priKey = "txid"; + StringBuilder fields = new StringBuilder(); + fields.append("txid uuid, "); + fields.append("transactiondigest text ");//notice lack of ',' + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey); + try { + executeMusicWriteQuery(this.music_ns,tableName,cql); + } catch (MDBCServiceException e) { + logger.error("Initialization error: Failure to create redo records table"); + throw(e); + } } + + + /** + * Writes the transaction history to the txDigest + */ + @Override + public void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException { + //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest); + PreparedQueryObject query = new PreparedQueryObject(); + String cqlQuery = "INSERT INTO " + + this.music_ns + + '.' + + this.musicTxDigestTableName + + " (txid,transactiondigest) " + + "VALUES (" + + newId.txId + ",'" + + transactionDigest + + "');"; + query.appendQueryString(cqlQuery); + //\TODO check if I am not shooting on my own foot + try { + MusicCore.nonKeyRelatedPut(query,"critical"); + } catch (MusicServiceException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage()); + throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString()); + } + } + - @Override - public void appendRange(String rangeId, List ranges) { - throw new java.lang.UnsupportedOperationException("function not implemented yet"); + + @Override + public HashMap getTxDigest(MusicTxDigestId id) throws MDBCServiceException { + String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + pQueryObject.addValue(id.txId); + Row newRow; + try { + newRow = executeMusicUnlockedQuorumGet(pQueryObject); + } catch (MDBCServiceException e) { + logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.txId); + throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); + } + String digest = newRow.getString("transactiondigest"); + HashMap changes; + try { + changes = (HashMap) MDBCUtils.fromString(digest); + } catch (IOException e) { + logger.error("IOException when deserializing digest failed with an invalid class for id:"+id.txId); + throw new MDBCServiceException("Deserializng digest failed with ioexception"); + } catch (ClassNotFoundException e) { + logger.error("Deserializng digest failed with an invalid class for id:"+id.txId); + throw new MDBCServiceException("Deserializng digest failed with an invalid class"); + } + return changes; + } + + @Override + public void own(List ranges){ + throw new UnsupportedOperationException(); + } + + @Override + public void appendRange(String rangeId, List ranges){ + throw new UnsupportedOperationException(); + } + + @Override + public void relinquish(String ownerId, String rangeId){ + throw new UnsupportedOperationException(); + } + + /** + * This method executes a write query in Music + * @param cql the CQL to be sent to Cassandra + */ + private static void executeMusicWriteQuery(String keyspace, String table, String cql) + throws MDBCServiceException { + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + ResultType rt = null; + try { + rt = MusicCore.createTable(keyspace,table,pQueryObject,"critical"); + } catch (MusicServiceException e) { + //\TODO: handle better, at least transform into an MDBCServiceException + e.printStackTrace(); + } + String result = rt.getResult(); + if (result==null || result.toLowerCase().equals("failure")) { + throw new MDBCServiceException("Music eventual put failed"); + } } - @Override - public void relinquish(String ownerId, String rangeId) { - throw new java.lang.UnsupportedOperationException("function not implemented yet"); + private static Row executeMusicLockedGet(String keyspace, String table, PreparedQueryObject cqlObject, String primaryKey, + String lock) + throws MDBCServiceException{ + ResultSet result; + try { + result = MusicCore.criticalGet(keyspace,table,primaryKey,cqlObject,lock); + } catch(MusicServiceException e){ + //\TODO: handle better, at least transform into an MDBCServiceException + e.printStackTrace(); + throw new MDBCServiceException("Error executing critical get"); + } + if(result.isExhausted()){ + throw new MDBCServiceException("There is not a row that matches the id "+primaryKey); + } + return result.one(); } - @Override - public List getPartitionIndexes() { - // TODO Auto-generated method stub - return null; + private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject) + throws MDBCServiceException{ + ResultSet result = MusicCore.quorumGet(cqlObject); + //\TODO: handle better, at least transform into an MDBCServiceException + if(result.isExhausted()){ + throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]"); + } + return result.one(); } - @Override - public MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException { - // TODO Auto-generated method stub - return null; + private void executeMusicLockedPut(String namespace, String tableName, + String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId, + MusicCore.Condition conditionInfo) throws MDBCServiceException { + ReturnType rt ; + if(lockId==null) { + try { + rt = MusicCore.atomicPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, conditionInfo); + } catch (MusicLockingException e) { + logger.error("Music locked put failed"); + throw new MDBCServiceException("Music locked put failed"); + } catch (MusicServiceException e) { + logger.error("Music service fail: Music locked put failed"); + throw new MDBCServiceException("Music service fail: Music locked put failed"); + } catch (MusicQueryException e) { + logger.error("Music query fail: locked put failed"); + throw new MDBCServiceException("Music query fail: Music locked put failed"); + } + } + else { + rt = MusicCore.criticalPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, lockId, conditionInfo); + } + if (rt.getResult().getResult().toLowerCase().equals("failure")) { + throw new MDBCServiceException("Music locked put failed"); + } } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java index 5943b34..383d522 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java @@ -37,7 +37,6 @@ import org.json.JSONObject; import org.json.JSONTokener; import org.onap.music.logging.EELFLoggerDelegate; -import org.onap.music.mdbc.MusicSqlManager; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; import org.onap.music.mdbc.tables.OperationType; @@ -71,7 +70,7 @@ public class MySQLMixin implements DBInterface { "CREATE TABLE IF NOT EXISTS "+TRANS_TBL+ " (IX INT AUTO_INCREMENT, OP CHAR(1), TABLENAME VARCHAR(255), NEWROWDATA VARCHAR(1024), KEYDATA VARCHAR(1024), CONNECTION_ID INT,PRIMARY KEY (IX))"; - private final MusicSqlManager msm; + private final MusicInterface mi; private final int connId; private final String dbName; private final Connection dbConnection; @@ -79,14 +78,14 @@ public class MySQLMixin implements DBInterface { private boolean server_tbl_created = false; public MySQLMixin() { - this.msm = null; + this.mi = null; this.connId = 0; this.dbName = null; this.dbConnection = null; this.tables = null; } - public MySQLMixin(MusicSqlManager msm, String url, Connection conn, Properties info) { - this.msm = msm; + public MySQLMixin(MusicInterface mi, String url, Connection conn, Properties info) { + this.mi = mi; this.connId = generateConnID(conn); this.dbName = getDBName(conn); this.dbConnection = conn; @@ -585,13 +584,13 @@ NEW.field refers to the new value // the actual columns, otherwise performance when doing range queries are going // to be even worse (see the else bracket down) // - String musicKey = msm.generateUniqueKey(); + String musicKey = mi.generateUniqueKey(); /*} else { //get key from data musicKey = msm.getMusicKeyFromRowWithoutPrimaryIndexes(tbl,newRow); }*/ - newRow.put(msm.getMusicDefaultPrimaryKeyName(), musicKey); - keydataStr.put(msm.getMusicDefaultPrimaryKeyName(), musicKey); + newRow.put(mi.getMusicDefaultPrimaryKeyName(), musicKey); + keydataStr.put(mi.getMusicDefaultPrimaryKeyName(), musicKey); } /*else { //Use the keys @@ -646,8 +645,8 @@ NEW.field refers to the new value JSONObject jo = new JSONObject(); if (!getTableInfo(tableName).hasKey()) { - String musicKey = msm.generateUniqueKey(); - jo.put(msm.getMusicDefaultPrimaryKeyName(), musicKey); + String musicKey = mi.generateUniqueKey(); + jo.put(mi.getMusicDefaultPrimaryKeyName(), musicKey); } for (String col : ti.columns) { @@ -655,7 +654,7 @@ NEW.field refers to the new value } @SuppressWarnings("unused") - Object[] row = Utils.jsonToRow(ti,tableName, jo,msm.getMusicDefaultPrimaryKeyName()); + Object[] row = Utils.jsonToRow(ti,tableName, jo, mi.getMusicDefaultPrimaryKeyName()); //\FIXME this is wrong now, update of the dirty row and entity is now handled by the archival process //msm.updateDirtyRowAndEntityTableInMusic(ti,tableName, jo); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java index 0a5bd60..cb5d28e 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java @@ -40,7 +40,7 @@ import org.onap.music.mdbc.MdbcServerLogic; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.StateManager; import org.onap.music.mdbc.configurations.NodeConfiguration; -import org.onap.music.mdbc.mixins.CassandraMixin; +import org.onap.music.mdbc.mixins.MusicMixin; import org.onap.music.mdbc.mixins.MusicInterface; import com.datastax.driver.core.Row; @@ -88,7 +88,7 @@ public class MusicTxDigest { while(colIterator.hasNext()) { String col = colIterator.next(); //FIXME: should not explicitly refer to cassandramixin - if (col.equals(CassandraMixin.MDBC_PRIMARYKEY_NAME)) { + if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) { //reserved name continue; } @@ -140,6 +140,8 @@ public class MusicTxDigest { case SELECT: //no update happened, do nothing break; + default: + logger.error(op.getOperationType() + "not implemented for replay"); } } diff --git a/mdbc-server/src/main/resources/mdbc.properties b/mdbc-server/src/main/resources/mdbc.properties index d3feee2..73e8f77 100755 --- a/mdbc-server/src/main/resources/mdbc.properties +++ b/mdbc-server/src/main/resources/mdbc.properties @@ -2,11 +2,9 @@ # A list of all Mixins that should be checked by MDBC # MIXINS= \ - org.onap.music.mdbc.mixins.H2Mixin \ - org.onap.music.mdbc.mixins.H2ServerMixin \ org.onap.music.mdbc.mixins.MySQLMixin \ - org.onap.music.mdbc.mixins.CassandraMixin \ - org.onap.music.mdbc.mixins.Cassandra2Mixin + org.onap.music.mdbc.mixins.MusicMixin \ + org.onap.music.mdbc.mixins.Music2Mixin DEFAULT_DRIVERS=\ org.h2.Driver \ diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java index eab38d3..388f8b8 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java @@ -38,5 +38,13 @@ public class MusicTxDigestTest { HashMap digest = (HashMap) MDBCUtils.fromString(t1); txDigest.replayTxDigest(digest); } + + @Test + public void testEmptyDigest() throws Exception { + String t1 = "rO0ABXNyABFqYXZhLnV0aWwuSGFzaE1hcAUH2sHDFmDRAwACRgAKbG9hZEZhY3RvckkACXRocmVzaG9sZHhwP0AAAAAAAAB3CAAAABAAAAAAeA=="; + MusicTxDigest txDigest = new MusicTxDigest(null); + HashMap digest = (HashMap) MDBCUtils.fromString(t1); + txDigest.replayTxDigest(digest); + } } diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/test/TestCommon.java b/mdbc-server/src/test/java/org/onap/music/mdbc/test/TestCommon.java index c618f3b..7e7dc88 100755 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/test/TestCommon.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/test/TestCommon.java @@ -24,7 +24,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.util.Properties; -import org.onap.music.mdbc.mixins.CassandraMixin; +import org.onap.music.mdbc.mixins.MusicMixin; public class TestCommon { public static final String DB_DRIVER = "avatica.Driver"; @@ -34,9 +34,9 @@ public class TestCommon { public Connection getDBConnection(String url, String keyspace, String id) throws SQLException, ClassNotFoundException { Class.forName(DB_DRIVER); Properties driver_info = new Properties(); - driver_info.put(CassandraMixin.KEY_MY_ID, id); - driver_info.put(CassandraMixin.KEY_REPLICAS, "0,1,2"); - driver_info.put(CassandraMixin.KEY_MUSIC_ADDRESS, "localhost"); + driver_info.put(MusicMixin.KEY_MY_ID, id); + driver_info.put(MusicMixin.KEY_REPLICAS, "0,1,2"); + driver_info.put(MusicMixin.KEY_MUSIC_ADDRESS, "localhost"); driver_info.put("user", DB_USER); driver_info.put("password", DB_PASSWORD); return DriverManager.getConnection(url, driver_info); -- cgit 1.2.3-korg