From 505d5c6cd7ee3c6f31c66dc6c2de99f6a97a334c Mon Sep 17 00:00:00 2001 From: "Tschaen, Brendan" Date: Wed, 17 Oct 2018 17:26:32 -0400 Subject: Finish adding METRIC code Change-Id: Ifd0307ac21f85e504d690c79080174a50af87f9e Issue-ID: MUSIC-149 Signed-off-by: Tschaen, Brendan --- .../java/com/att/research/mdbc/ArchiveProcess.java | 11 +- .../com/att/research/mdbc/DatabaseOperations.java | 120 +++--- .../com/att/research/mdbc/DatabasePartition.java | 51 ++- src/main/java/com/att/research/mdbc/MDBCUtils.java | 32 +- .../java/com/att/research/mdbc/MdbcConnection.java | 2 +- .../com/att/research/mdbc/MusicSqlManager.java | 430 +++++++++++---------- .../java/com/att/research/mdbc/StateManager.java | 8 +- .../mdbc/configurations/NodeConfiguration.java | 4 +- .../mdbc/configurations/TablesConfiguration.java | 34 +- .../att/research/mdbc/configurations/config-0.json | 6 +- .../att/research/mdbc/configurations/ranges.json | 4 +- .../mdbc/configurations/tableConfiguration.json | 4 +- .../att/research/mdbc/mixins/CassandraMixin.java | 43 ++- .../com/att/research/mdbc/mixins/DBInterface.java | 1 + .../att/research/mdbc/mixins/MusicInterface.java | 11 +- .../com/att/research/mdbc/mixins/MusicMixin.java | 11 +- .../com/att/research/mdbc/mixins/MySQLMixin.java | 2 + .../com/att/research/mdbc/tables/Operation.java | 35 ++ .../att/research/mdbc/tables/OperationType.java | 5 + .../research/mdbc/tables/PartitionInformation.java | 19 + .../research/mdbc/tables/RedoHistoryElement.java | 15 + .../com/att/research/mdbc/tables/RedoRecordId.java | 15 + .../com/att/research/mdbc/tables/StagingTable.java | 51 +++ .../mdbc/tables/TablePartitionInformation.java | 15 + .../com/att/research/mdbc/tables/TitReference.java | 12 + .../mdbc/tables/TransactionInformationElement.java | 19 + .../att/research/mdbc/tables/TxCommitProgress.java | 206 ++++++++++ .../att/research/mdbc/tools/CreatePartition.java | 15 +- .../java/com/att/research/mdbc/MDBCUtilsTest.java | 5 +- 29 files changed, 817 insertions(+), 369 deletions(-) create mode 100644 src/main/java/com/att/research/mdbc/tables/Operation.java create mode 100644 src/main/java/com/att/research/mdbc/tables/OperationType.java create mode 100644 src/main/java/com/att/research/mdbc/tables/PartitionInformation.java create mode 100644 src/main/java/com/att/research/mdbc/tables/RedoHistoryElement.java create mode 100644 src/main/java/com/att/research/mdbc/tables/RedoRecordId.java create mode 100644 src/main/java/com/att/research/mdbc/tables/StagingTable.java create mode 100644 src/main/java/com/att/research/mdbc/tables/TablePartitionInformation.java create mode 100644 src/main/java/com/att/research/mdbc/tables/TitReference.java create mode 100644 src/main/java/com/att/research/mdbc/tables/TransactionInformationElement.java create mode 100644 src/main/java/com/att/research/mdbc/tables/TxCommitProgress.java (limited to 'src') diff --git a/src/main/java/com/att/research/mdbc/ArchiveProcess.java b/src/main/java/com/att/research/mdbc/ArchiveProcess.java index f192430..8290d66 100644 --- a/src/main/java/com/att/research/mdbc/ArchiveProcess.java +++ b/src/main/java/com/att/research/mdbc/ArchiveProcess.java @@ -12,10 +12,9 @@ public class ArchiveProcess { //TODO: This is a place holder for taking snapshots and moving data from redo record into actual tables /** - * This method is called whenever there is a DELETE on a local SQL table, and should be called by the underlying databases - * triggering mechanism. It updates the MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL DELETE. - * Music propagates it to the other replicas. If the local database is in the middle of a transaction, the DELETEs to MUSIC are - * delayed until the transaction is either committed or rolled back. + * This method is called whenever there is a DELETE on the transaction digest and should be called when ownership changes, if required + * It updates the MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL DELETE. + * Music propagates it to the other replicas. * @param tableName This is the table on which the select is being performed * @param oldRow This is information about the row that is being deleted */ @@ -26,8 +25,8 @@ public class ArchiveProcess { } /** - * This method is called whenever there is an INSERT or UPDATE to a local SQL table, and should be called by the underlying databases - * triggering mechanism. It updates the MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. + * This method is called whenever there is an INSERT or UPDATE to a the transaction digest, and should be called by an + * ownership chance. It updates the MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. * Music propagates it to the other replicas. If the local database is in the middle of a transaction, the updates to MUSIC are * delayed until the transaction is either committed or rolled back. * diff --git a/src/main/java/com/att/research/mdbc/DatabaseOperations.java b/src/main/java/com/att/research/mdbc/DatabaseOperations.java index 406152e..bd10928 100644 --- a/src/main/java/com/att/research/mdbc/DatabaseOperations.java +++ b/src/main/java/com/att/research/mdbc/DatabaseOperations.java @@ -263,64 +263,9 @@ public class DatabaseOperations { return id; } - /** - * This function creates the TransactionInformation table. It contain information related - * to the transactions happening in a given partition. - * * The schema of the table is - * * Id, uiid. - * * Partition, uuid id of the partition - * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables - * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables - * * Redo: list of uiids associated to the Redo Records Table - * - */ - public static void CreateTransactionInformationTable( String musicNamespace, String transactionInformationTableName) throws MDBCServiceException { - String tableName = transactionInformationTableName; - String priKey = "id"; - StringBuilder fields = new StringBuilder(); - fields.append("id uuid, "); - fields.append("partition uuid, "); - fields.append("latestapplied int, "); - fields.append("applied boolean, "); - //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly - fields.append("redo list>>> "); - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey); - try { - executeMusicWriteQuery(musicNamespace,tableName,cql); - } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to create transaction information table"); - throw(e); - } - } - /** - * This function creates the RedoRecords table. It contain information related to each transaction committed - * * LeaseId: id associated with the lease, text - * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later - * * TransactionDigest: text that contains all the changes in the transaction - */ - public static void CreateRedoRecordsTable(int redoTableNumber, String musicNamespace, String redoRecordTableName) throws MDBCServiceException { - String tableName = redoRecordTableName; - if(redoTableNumber >= 0) { - StringBuilder table = new StringBuilder(); - table.append(tableName); - table.append("-"); - table.append(Integer.toString(redoTableNumber)); - tableName=table.toString(); - } - String priKey = "leaseid,leasecounter"; - StringBuilder fields = new StringBuilder(); - fields.append("leaseid text, "); - fields.append("leasecounter varint, "); - fields.append("transactiondigest text ");//notice lack of ',' - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey); - try { - executeMusicWriteQuery(musicNamespace,tableName,cql); - } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to create redo records table"); - throw(e); - } - } + + /** * This function creates the Table To Partition table. It contain information related to @@ -440,4 +385,65 @@ public class DatabaseOperations { } } } + + /** + * This function creates the MusicTxDigest table. It contain information related to each transaction committed + * * LeaseId: id associated with the lease, text + * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later + * * TransactionDigest: text that contains all the changes in the transaction + */ + public static void CreateMusicTxDigest(int musicTxDigestTableNumber, String musicNamespace, String musicTxDigestTableName) throws MDBCServiceException { + String tableName = musicTxDigestTableName; + if(musicTxDigestTableNumber >= 0) { + StringBuilder table = new StringBuilder(); + table.append(tableName); + table.append("-"); + table.append(Integer.toString(musicTxDigestTableNumber)); + tableName=table.toString(); + } + String priKey = "leaseid,leasecounter"; + StringBuilder fields = new StringBuilder(); + fields.append("leaseid text, "); + fields.append("leasecounter varint, "); + fields.append("transactiondigest text ");//notice lack of ',' + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey); + try { + executeMusicWriteQuery(musicNamespace,tableName,cql); + } catch (MDBCServiceException e) { + logger.error("Initialization error: Failure to create redo records table"); + throw(e); + } + } + + /** + * This function creates the TransactionInformation table. It contain information related + * to the transactions happening in a given partition. + * * The schema of the table is + * * Id, uiid. + * * Partition, uuid id of the partition + * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables + * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables + * * Redo: list of uiids associated to the Redo Records Table + * + */ + public static void CreateMusicRangeInformationTable(String musicNamespace, String musicRangeInformationTableName) throws MDBCServiceException { + String tableName = musicRangeInformationTableName; + String priKey = "id"; + StringBuilder fields = new StringBuilder(); + fields.append("id uuid, "); + fields.append("partition uuid, "); + fields.append("latestapplied int, "); + fields.append("applied boolean, "); + //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly + fields.append("redo list>>> "); + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey); + try { + executeMusicWriteQuery(musicNamespace,tableName,cql); + } catch (MDBCServiceException e) { + logger.error("Initialization error: Failure to create transaction information table"); + throw(e); + } + } + + } diff --git a/src/main/java/com/att/research/mdbc/DatabasePartition.java b/src/main/java/com/att/research/mdbc/DatabasePartition.java index 6046801..a9b4f3e 100644 --- a/src/main/java/com/att/research/mdbc/DatabasePartition.java +++ b/src/main/java/com/att/research/mdbc/DatabasePartition.java @@ -7,7 +7,6 @@ import java.util.HashSet; import java.util.Set; import com.att.research.logging.EELFLoggerDelegate; -import com.att.research.mdbc.mixins.CassandraMixin; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -19,9 +18,9 @@ import com.google.gson.GsonBuilder; public class DatabasePartition { private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(DatabasePartition.class); - private String transactionInformationTable;//Table that currently contains the REDO log for this partition - private String transactionInformationIndex;//Index that can be obtained either from - private String redoRecordsTable; + private String musicRangeInformationTable;//Table that currently contains the REDO log for this partition + private String musicRangeInformationIndex;//Index that can be obtained either from + private String musicTxDigestTable; private String partitionId; private String lockId; protected Set ranges; @@ -35,7 +34,7 @@ public class DatabasePartition { ranges = new HashSet<>(); } - public DatabasePartition(Set knownRanges, String titIndex, String titTable, String partitionId, String lockId, String redoRecordsTable) { + public DatabasePartition(Set knownRanges, String mriIndex, String mriTable, String partitionId, String lockId, String musicTxDigestTable) { if(knownRanges != null) { ranges = knownRanges; } @@ -43,25 +42,25 @@ public class DatabasePartition { ranges = new HashSet<>(); } - if(redoRecordsTable != null) { - this.setRedoRecordsTable(redoRecordsTable); + if(musicTxDigestTable != null) { + this.setMusicTxDigestTable(musicTxDigestTable); } else{ - this.setRedoRecordsTable(""); + this.setMusicTxDigestTable(""); } - if(titIndex != null) { - this.setTransactionInformationIndex(titIndex); + if(mriIndex != null) { + this.setMusicRangeInformationIndex(mriIndex); } else { - this.setTransactionInformationIndex(""); + this.setMusicRangeInformationIndex(""); } - if(titTable != null) { - this.setTransactionInformationTable(titTable); + if(mriTable != null) { + this.setMusicRangeInformationTable(mriTable); } else { - this.setTransactionInformationTable(""); + this.setMusicRangeInformationTable(""); } if(partitionId != null) { @@ -79,20 +78,20 @@ public class DatabasePartition { } } - public String getTransactionInformationTable() { - return transactionInformationTable; + public String getMusicRangeInformationTable() { + return musicRangeInformationTable; } - public void setTransactionInformationTable(String transactionInformationTable) { - this.transactionInformationTable = transactionInformationTable; + public void setMusicRangeInformationTable(String musicRangeInformationTable) { + this.musicRangeInformationTable = musicRangeInformationTable; } - public String getTransactionInformationIndex() { - return transactionInformationIndex; + public String getMusicRangeInformationIndex() { + return musicRangeInformationIndex; } - public void setTransactionInformationIndex(String transactionInformationIndex) { - this.transactionInformationIndex = transactionInformationIndex; + public void setMusicRangeInformationIndex(String musicRangeInformationIndex) { + this.musicRangeInformationIndex = musicRangeInformationIndex; } /** @@ -180,11 +179,11 @@ public class DatabasePartition { this.lockId = lockId; } - public String getRedoRecordsTable() { - return redoRecordsTable; + public String getMusicTxDigestTable() { + return musicTxDigestTable; } - public void setRedoRecordsTable(String redoRecordsTable) { - this.redoRecordsTable = redoRecordsTable; + public void setMusicTxDigestTable(String musicTxDigestTable) { + this.musicTxDigestTable = musicTxDigestTable; } } diff --git a/src/main/java/com/att/research/mdbc/MDBCUtils.java b/src/main/java/com/att/research/mdbc/MDBCUtils.java index 411be8d..34f4b10 100644 --- a/src/main/java/com/att/research/mdbc/MDBCUtils.java +++ b/src/main/java/com/att/research/mdbc/MDBCUtils.java @@ -2,26 +2,23 @@ package com.att.research.mdbc; import java.io.*; import java.util.Base64; +import java.util.Deque; +import java.util.HashMap; import com.att.research.logging.EELFLoggerDelegate; import com.att.research.logging.format.AppMessages; import com.att.research.logging.format.ErrorSeverity; import com.att.research.logging.format.ErrorTypes; +import com.att.research.mdbc.tables.Operation; +import com.att.research.mdbc.tables.StagingTable; + +import javassist.bytecode.Descriptor.Iterator; + +import org.apache.commons.lang3.tuple.Pair; import org.json.JSONObject; public class MDBCUtils { - /** Read the object from Base64 string. */ - public static Object fromString( String s ) throws IOException , - ClassNotFoundException { - byte [] data = Base64.getDecoder().decode( s ); - ObjectInputStream ois = new ObjectInputStream( - new ByteArrayInputStream( data ) ); - Object o = ois.readObject(); - ois.close(); - return o; - } - - /** Write the object to a Base64 string. */ + /** Write the object to a Base64 string. */ public static String toString( Serializable o ) throws IOException { //TODO We may want to also compress beside serialize ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -44,6 +41,17 @@ public class MDBCUtils { oos.close(); return Base64.getEncoder().encodeToString(baos.toByteArray()); } + + /** Read the object from Base64 string. */ + public static Object fromString( String s ) throws IOException , + ClassNotFoundException { + byte [] data = Base64.getDecoder().decode( s ); + ObjectInputStream ois = new ObjectInputStream( + new ByteArrayInputStream( data ) ); + Object o = ois.readObject(); + ois.close(); + return o; + } public static void saveToFile(String serializedContent, String filename, EELFLoggerDelegate logger) throws IOException { try (PrintWriter fout = new PrintWriter(filename)) { diff --git a/src/main/java/com/att/research/mdbc/MdbcConnection.java b/src/main/java/com/att/research/mdbc/MdbcConnection.java index d471522..1e845fd 100644 --- a/src/main/java/com/att/research/mdbc/MdbcConnection.java +++ b/src/main/java/com/att/research/mdbc/MdbcConnection.java @@ -26,7 +26,7 @@ import com.att.research.logging.format.AppMessages; import com.att.research.logging.format.ErrorSeverity; import com.att.research.logging.format.ErrorTypes; import com.att.research.mdbc.mixins.MusicInterface; -import com.att.research.mdbc.mixins.TxCommitProgress; +import com.att.research.mdbc.tables.TxCommitProgress; /** diff --git a/src/main/java/com/att/research/mdbc/MusicSqlManager.java b/src/main/java/com/att/research/mdbc/MusicSqlManager.java index 4330cfe..e32a969 100755 --- a/src/main/java/com/att/research/mdbc/MusicSqlManager.java +++ b/src/main/java/com/att/research/mdbc/MusicSqlManager.java @@ -1,22 +1,16 @@ package com.att.research.mdbc; import java.sql.Connection; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Properties; -import java.util.Set; +import java.util.*; import org.json.JSONObject; import com.att.research.mdbc.mixins.DBInterface; import com.att.research.mdbc.mixins.MixinFactory; import com.att.research.mdbc.mixins.MusicInterface; -import com.att.research.mdbc.mixins.StagingTable; -import com.att.research.mdbc.mixins.TxCommitProgress; import com.att.research.mdbc.mixins.Utils; - +import com.att.research.mdbc.tables.StagingTable; +import com.att.research.mdbc.tables.TxCommitProgress; import com.att.research.exceptions.MDBCServiceException; import com.att.research.exceptions.QueryException; import com.att.research.logging.*; @@ -41,117 +35,121 @@ import com.att.research.logging.format.ErrorTypes; */ public class MusicSqlManager { - private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicSqlManager.class); - - private final DBInterface dbi; - private final MusicInterface mi; - private final Set table_set; - private final HashMap transactionDigest; - private boolean autocommit; // a copy of the autocommit flag from the JDBC Connection + private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicSqlManager.class); - /** - * Build a MusicSqlManager for a DB connection. This construct may only be called by getMusicSqlManager(), - * which will ensure that only one MusicSqlManager is created per URL. - * This is the location where the appropriate mixins to use for the MusicSqlManager should be determined. - * They should be picked based upon the URL and the properties passed to this constructor. - *

- * At the present time, we only support the use of the H2Mixin (for access to a local H2 database), - * with the CassandraMixin (for direct access to a Cassandra noSQL DB as the persistence layer). - *

- * - * @param url the JDBC URL which was used to connection to the database - * @param conn the actual connection to the database - * @param info properties passed from the initial JDBC connect() call - * @throws MDBCServiceException - */ - public MusicSqlManager(String url, Connection conn, Properties info, MusicInterface mi) throws MDBCServiceException { - try { - info.putAll(Utils.getMdbcProperties()); - String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT); - this.dbi = MixinFactory.createDBInterface(mixinDb, this, url, conn, info); - this.mi = mi; - this.table_set = Collections.synchronizedSet(new HashSet()); - this.autocommit = true; - this.transactionDigest = new HashMap(); + private final DBInterface dbi; + private final MusicInterface mi; + private final Set table_set; + private final HashMap transactionDigest; + private boolean autocommit; // a copy of the autocommit flag from the JDBC Connection - }catch(Exception e) { - throw new MDBCServiceException(e.getMessage()); - } - } + /** + * Build a MusicSqlManager for a DB connection. This construct may only be called by getMusicSqlManager(), + * which will ensure that only one MusicSqlManager is created per URL. + * This is the location where the appropriate mixins to use for the MusicSqlManager should be determined. + * They should be picked based upon the URL and the properties passed to this constructor. + *

+ * At the present time, we only support the use of the H2Mixin (for access to a local H2 database), + * with the CassandraMixin (for direct access to a Cassandra noSQL DB as the persistence layer). + *

+ * + * @param url the JDBC URL which was used to connection to the database + * @param conn the actual connection to the database + * @param info properties passed from the initial JDBC connect() call + * @throws MDBCServiceException + */ + public MusicSqlManager(String url, Connection conn, Properties info, MusicInterface mi) throws MDBCServiceException { + try { + info.putAll(Utils.getMdbcProperties()); + String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT); + this.dbi = MixinFactory.createDBInterface(mixinDb, this, url, conn, info); + this.mi = mi; + this.table_set = Collections.synchronizedSet(new HashSet()); + this.autocommit = true; + this.transactionDigest = new HashMap(); - public void setAutoCommit(boolean b,String txId, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException { - if (b != autocommit) { - autocommit = b; - logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b); - if (b) { - // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction - if(txId == null || txId.isEmpty()) { - logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); - throw new MDBCServiceException("tx id is null"); - } - commit(txId,progressKeeper,partition); - } - } - } + } catch (Exception e) { + throw new MDBCServiceException(e.getMessage()); + } + } - /** - * Close this MusicSqlManager. - */ - public void close() { - if (dbi != null) { - dbi.close(); - } - } + public void setAutoCommit(boolean b, String txId, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException { + if (b != autocommit) { + autocommit = b; + logger.debug(EELFLoggerDelegate.applicationLogger, "autocommit changed to " + b); + if (b) { + // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction + if (txId == null || txId.isEmpty()) { + logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null", AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); + throw new MDBCServiceException("tx id is null"); + } + commit(txId, progressKeeper, partition); + } + } + } - /** - * Code to be run within the DB driver before a SQL statement is executed. This is where tables - * can be synchronized before a SELECT, for those databases that do not support SELECT triggers. - * @param sql the SQL statement that is about to be executed - */ - public void preStatementHook(final String sql) { - dbi.preStatementHook(sql); - } - /** - * Code to be run within the DB driver after a SQL statement has been executed. This is where remote - * statement actions can be copied back to Cassandra/MUSIC. - * @param sql the SQL statement that was executed - */ - public void postStatementHook(final String sql) { - dbi.postStatementHook(sql,transactionDigest); - } - /** - * Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the - * proxy first starts, and whenever there is the possibility that tables were created or dropped. It is synchronized - * in order to prevent multiple threads from running this code in parallel. - */ - public synchronized void synchronizeTables() throws QueryException { - Set set1 = dbi.getSQLTableSet(); // set of tables in the database - logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1); - for (String tableName : set1) { - // This map will be filled in if this table was previously discovered - if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) { - logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName); - try { - TableInfo ti = dbi.getTableInfo(tableName); - mi.initializeMusicForTable(ti,tableName); - //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted - ti = dbi.getTableInfo(tableName); - mi.createDirtyRowTable(ti,tableName); - dbi.createSQLTriggers(tableName); - table_set.add(tableName); - synchronizeTableData(tableName); - logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" + - table_set.size() + "/" + set1.size() + "tables uploaded"); - } catch (Exception e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); - //logger.error(EELFLoggerDelegate.errorLogger, "Exception synchronizeTables: "+e); - throw new QueryException(); - } - } - } + /** + * Close this MusicSqlManager. + */ + public void close() { + if (dbi != null) { + dbi.close(); + } + } + + /** + * Code to be run within the DB driver before a SQL statement is executed. This is where tables + * can be synchronized before a SELECT, for those databases that do not support SELECT triggers. + * + * @param sql the SQL statement that is about to be executed + */ + public void preStatementHook(final String sql) { + dbi.preStatementHook(sql); + } + + /** + * Code to be run within the DB driver after a SQL statement has been executed. This is where remote + * statement actions can be copied back to Cassandra/MUSIC. + * + * @param sql the SQL statement that was executed + */ + public void postStatementHook(final String sql) { + dbi.postStatementHook(sql, transactionDigest); + } + + /** + * Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the + * proxy first starts, and whenever there is the possibility that tables were created or dropped. It is synchronized + * in order to prevent multiple threads from running this code in parallel. + */ + public synchronized void synchronizeTables() throws QueryException { + Set set1 = dbi.getSQLTableSet(); // set of tables in the database + logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1); + for (String tableName : set1) { + // This map will be filled in if this table was previously discovered + if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) { + logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: " + tableName); + try { + TableInfo ti = dbi.getTableInfo(tableName); + mi.initializeMusicForTable(ti, tableName); + //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted + ti = dbi.getTableInfo(tableName); + mi.createDirtyRowTable(ti, tableName); + dbi.createSQLTriggers(tableName); + table_set.add(tableName); + synchronizeTableData(tableName); + logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" + + table_set.size() + "/" + set1.size() + "tables uploaded"); + } catch (Exception e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); + //logger.error(EELFLoggerDelegate.errorLogger, "Exception synchronizeTables: "+e); + throw new QueryException(); + } + } + } // Set set2 = getMusicTableSet(music_ns); - // not working - fix later + // not working - fix later // for (String tbl : set2) { // if (!set1.contains(tbl)) { // logger.debug("Old table dropped: "+tbl); @@ -159,101 +157,107 @@ public class MusicSqlManager { // // ZZTODO drop camunda table ? // } // } - } + } + + /** + * On startup, copy dirty data from Cassandra to H2. May not be needed. + * + * @param tableName + */ + public void synchronizeTableData(String tableName) { + // TODO - copy MUSIC -> H2 + dbi.synchronizeData(tableName); + } + + /** + * This method is called whenever there is a SELECT on a local SQL table, and should be called by the underlying databases + * triggering mechanism. It first checks the local dirty bits table to see if there are any keys in Cassandra whose value + * has not yet been sent to SQL. If there are, the appropriate values are copied from Cassandra to the local database. + * Under normal execution, this function behaves as a NOP operation. + * + * @param tableName This is the table on which the SELECT is being performed + */ + public void readDirtyRowsAndUpdateDb(String tableName) { + mi.readDirtyRowsAndUpdateDb(dbi, tableName); + } + + + /** + * This method gets the primary key that the music interfaces uses by default. + * If the front end uses a primary key, this will not match what is used in the MUSIC interface + * + * @return + */ + public String getMusicDefaultPrimaryKeyName() { + return mi.getMusicDefaultPrimaryKeyName(); + } + + /** + * Asks music interface to provide the function to create a primary key + * e.g. uuid(), 1, "unique_aksd419fjc" + * + * @return + */ + public String generateUniqueKey() { + // + return mi.generateUniqueKey(); + } - /** - * On startup, copy dirty data from Cassandra to H2. May not be needed. - * @param tableName - */ - public void synchronizeTableData(String tableName) { - // TODO - copy MUSIC -> H2 - dbi.synchronizeData(tableName); - } - /** - * This method is called whenever there is a SELECT on a local SQL table, and should be called by the underlying databases - * triggering mechanism. It first checks the local dirty bits table to see if there are any keys in Cassandra whose value - * has not yet been sent to SQL. If there are, the appropriate values are copied from Cassandra to the local database. - * Under normal execution, this function behaves as a NOP operation. - * @param tableName This is the table on which the SELECT is being performed - */ - public void readDirtyRowsAndUpdateDb(String tableName) { - mi.readDirtyRowsAndUpdateDb(dbi,tableName); - } - - - - - /** - * This method gets the primary key that the music interfaces uses by default. - * If the front end uses a primary key, this will not match what is used in the MUSIC interface - * @return - */ - public String getMusicDefaultPrimaryKeyName() { - return mi.getMusicDefaultPrimaryKeyName(); - } - - /** - * Asks music interface to provide the function to create a primary key - * e.g. uuid(), 1, "unique_aksd419fjc" - * @return - */ - public String generateUniqueKey() { - // - return mi.generateUniqueKey(); - } - - - /** - * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed, - * they are performed now and copied into MUSIC. - * @throws MDBCServiceException - */ - public synchronized void commit(String txId, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException { - logger.debug(EELFLoggerDelegate.applicationLogger, " commit "); - // transaction was committed -- add all the updates into the REDO-Log in MUSIC - try { - mi.commitLog(dbi, partition, transactionDigest, txId, progressKeeper); - }catch(MDBCServiceException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL); - throw e; - } - } - /** - * Perform a rollback, as requested by the JDBC driver. If any row updates have been delayed, - * they are discarded. - */ - public synchronized void rollback() { - // transaction was rolled back - discard the updates - logger.debug(EELFLoggerDelegate.applicationLogger, "Rollback");; - transactionDigest.clear(); - } + /** + * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed, + * they are performed now and copied into MUSIC. + * + * @throws MDBCServiceException + */ + public synchronized void commit(String txId, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException { + logger.debug(EELFLoggerDelegate.applicationLogger, " commit "); + // transaction was committed -- add all the updates into the REDO-Log in MUSIC + try { + mi.commitLog(dbi, partition, transactionDigest, txId, progressKeeper); + } catch (MDBCServiceException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL); + throw e; + } + } - /** - * Get all - * @param table - * @param dbRow - * @return - */ - public String getMusicKeyFromRowWithoutPrimaryIndexes(String table, JSONObject dbRow) { - TableInfo ti = dbi.getTableInfo(table); - return mi.getMusicKeyFromRowWithoutPrimaryIndexes(ti,table, dbRow); - } - - public String getMusicKeyFromRow(String table, JSONObject dbRow) { - TableInfo ti = dbi.getTableInfo(table); - return mi.getMusicKeyFromRow(ti,table, dbRow); - } - - /** - * Returns all keys that matches the current sql statement, and not in already updated keys. - * - * @param sql the query that we are getting keys for - * @deprecated - */ - public ArrayList getMusicKeys(String sql) { - ArrayList musicKeys = new ArrayList(); - //\TODO See if this is required + /** + * Perform a rollback, as requested by the JDBC driver. If any row updates have been delayed, + * they are discarded. + */ + public synchronized void rollback() { + // transaction was rolled back - discard the updates + logger.debug(EELFLoggerDelegate.applicationLogger, "Rollback"); + ; + transactionDigest.clear(); + } + + /** + * Get all + * + * @param table + * @param dbRow + * @return + */ + public String getMusicKeyFromRowWithoutPrimaryIndexes(String table, JSONObject dbRow) { + TableInfo ti = dbi.getTableInfo(table); + return mi.getMusicKeyFromRowWithoutPrimaryIndexes(ti, table, dbRow); + } + + public String getMusicKeyFromRow(String table, JSONObject dbRow) { + TableInfo ti = dbi.getTableInfo(table); + return mi.getMusicKeyFromRow(ti, table, dbRow); + } + + /** + * Returns all keys that matches the current sql statement, and not in already updated keys. + * + * @param sql the query that we are getting keys for + * @deprecated + */ + public ArrayList getMusicKeys(String sql) { + ArrayList musicKeys = new ArrayList(); + //\TODO See if this is required /* try { net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql); @@ -295,6 +299,18 @@ public class MusicSqlManager { System.out.print(musicKey + ","); } */ - return musicKeys; - } + return musicKeys; + } + + public void own(List ranges) { + throw new java.lang.UnsupportedOperationException("function not implemented yet"); + } + + public void appendRange(String rangeId, List ranges) { + throw new java.lang.UnsupportedOperationException("function not implemented yet"); + } + + public void relinquish(String ownerId, String rangeId) { + throw new java.lang.UnsupportedOperationException("function not implemented yet"); + } } diff --git a/src/main/java/com/att/research/mdbc/StateManager.java b/src/main/java/com/att/research/mdbc/StateManager.java index accd13a..dc243fb 100644 --- a/src/main/java/com/att/research/mdbc/StateManager.java +++ b/src/main/java/com/att/research/mdbc/StateManager.java @@ -8,7 +8,7 @@ import com.att.research.logging.format.ErrorTypes; import com.att.research.mdbc.mixins.MixinFactory; import com.att.research.mdbc.mixins.MusicInterface; import com.att.research.mdbc.mixins.MusicMixin; -import com.att.research.mdbc.mixins.TxCommitProgress; +import com.att.research.mdbc.tables.TxCommitProgress; import java.sql.Connection; import java.sql.DriverManager; @@ -61,10 +61,14 @@ public class StateManager { //\fixme this is not really used, delete! String cassandraUrl = info.getProperty(Configuration.KEY_CASSANDRA_URL, Configuration.CASSANDRA_URL_DEFAULT); String mixin = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT); + init(mixin,cassandraUrl); + } + + protected void init(String mixin, String cassandraUrl) throws MDBCServiceException { this.musicManager = MixinFactory.createMusicInterface(mixin, cassandraUrl, info,ranges); this.musicManager.createKeyspace(); try { - this.musicManager.initializeMdbcDataStructures(); + this.musicManager.initializeMetricDataStructures(); } catch (MDBCServiceException e) { logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR); throw(e); diff --git a/src/main/java/com/att/research/mdbc/configurations/NodeConfiguration.java b/src/main/java/com/att/research/mdbc/configurations/NodeConfiguration.java index 78850e3..d74dafb 100644 --- a/src/main/java/com/att/research/mdbc/configurations/NodeConfiguration.java +++ b/src/main/java/com/att/research/mdbc/configurations/NodeConfiguration.java @@ -22,8 +22,8 @@ public class NodeConfiguration { public DatabasePartition partition; public String nodeName; - public NodeConfiguration(String tables, String titIndex, String titTableName, String partitionId, String sqlDatabaseName, String node, String redoRecordsTable){ - partition = new DatabasePartition(toRanges(tables), titIndex, titTableName, partitionId, null, redoRecordsTable) ; + public NodeConfiguration(String tables, String mriIndex, String mriTableName, String partitionId, String sqlDatabaseName, String node, String redoRecordsTable){ + partition = new DatabasePartition(toRanges(tables), mriIndex, mriTableName, partitionId, null, redoRecordsTable) ; this.sqlDatabaseName = sqlDatabaseName; this.nodeName = node; } diff --git a/src/main/java/com/att/research/mdbc/configurations/TablesConfiguration.java b/src/main/java/com/att/research/mdbc/configurations/TablesConfiguration.java index 0d28b51..eeb15a5 100644 --- a/src/main/java/com/att/research/mdbc/configurations/TablesConfiguration.java +++ b/src/main/java/com/att/research/mdbc/configurations/TablesConfiguration.java @@ -19,7 +19,7 @@ import java.util.List; public class TablesConfiguration { private final String TIT_TABLE_NAME = "transactioninformation"; - private final String REDO_RECORDS_NAME = "redorecords"; + private final String MUSIC_TX_DIGEST_TABLE_NAME = "musictxdigest"; private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TablesConfiguration.class); private List partitions; @@ -54,13 +54,13 @@ public class TablesConfiguration { throw new MDBCServiceException("Partition was not correctly initialized"); } for(PartitionInformation partitionInfo : partitions){ - String titTableName = partitionInfo.titTableName; + String titTableName = partitionInfo.mriTableName; titTableName = (titTableName==null || titTableName.isEmpty())?TIT_TABLE_NAME:titTableName; //0) Create the corresponding TIT table - DatabaseOperations.CreateTransactionInformationTable(musicNamespace,titTableName); - String redoRecordsName = partitionInfo.rrtTableName; - redoRecordsName = (redoRecordsName==null || redoRecordsName.isEmpty())?REDO_RECORDS_NAME:redoRecordsName; - DatabaseOperations.CreateRedoRecordsTable(-1,musicNamespace,redoRecordsName); + DatabaseOperations.CreateMusicRangeInformationTable(musicNamespace,titTableName); + String musicTxDigestTableName = partitionInfo.mtxdTableName; + musicTxDigestTableName = (musicTxDigestTableName==null || musicTxDigestTableName.isEmpty())? MUSIC_TX_DIGEST_TABLE_NAME :musicTxDigestTableName; + DatabaseOperations.CreateMusicTxDigest(-1,musicNamespace,musicTxDigestTableName); //0) Create the corresponding TIT table String partitionId; if(partitionInfo.partitionId==null || partitionInfo.partitionId.isEmpty()){ @@ -87,7 +87,7 @@ public class TablesConfiguration { //5) Add it to the redo history table DatabaseOperations.createRedoHistoryBeginRow(musicNamespace,rhName,newRedoRow,partitionId,null); //6) Create config for this node - nodeConfigs.add(new NodeConfiguration(String.join(",",partitionInfo.tables),titIndex,titTableName,partitionId,sqlDatabaseName,partitionInfo.owner,redoRecordsName)); + nodeConfigs.add(new NodeConfiguration(String.join(",",partitionInfo.tables),titIndex,titTableName,partitionId,sqlDatabaseName,partitionInfo.owner,musicTxDigestTableName)); } return nodeConfigs; } @@ -124,8 +124,8 @@ public class TablesConfiguration { public class PartitionInformation{ private List tables; private String owner; - private String titTableName; - private String rrtTableName; + private String mriTableName; + private String mtxdTableName; private String partitionId; private int replicationFactor; @@ -145,12 +145,12 @@ public class TablesConfiguration { this.owner = owner; } - public String getTitTableName() { - return titTableName; + public String getMriTableName() { + return mriTableName; } - public void setTitTableName(String titTableName) { - this.titTableName = titTableName; + public void setMriTableName(String mriTableName) { + this.mriTableName = mriTableName; } public String getPartitionId() { @@ -169,12 +169,12 @@ public class TablesConfiguration { this.replicationFactor = replicationFactor; } - public String getRrtTableName(){ - return rrtTableName; + public String getMtxdTableName(){ + return mtxdTableName; } - public void setRrtTableName(String rrtTableName) { - this.rrtTableName = rrtTableName; + public void setMtxdTableName(String mtxdTableName) { + this.mtxdTableName = mtxdTableName; } } } diff --git a/src/main/java/com/att/research/mdbc/configurations/config-0.json b/src/main/java/com/att/research/mdbc/configurations/config-0.json index 96d947c..2207a52 100644 --- a/src/main/java/com/att/research/mdbc/configurations/config-0.json +++ b/src/main/java/com/att/research/mdbc/configurations/config-0.json @@ -1,9 +1,9 @@ { "sqlDatabaseName": "test", "partition": { - "transactionInformationTable": "transactioninformation", - "transactionInformationIndex": "259a7a7c-f741-44ae-8d6e-227a02ddc96e", - "redoRecordsTable": "redorecords", + "musicRangeInformationTable": "transactioninformation", + "musicRangeInformationIndex": "259a7a7c-f741-44ae-8d6e-227a02ddc96e", + "musicTxDigestTable": "musictxdigest", "partitionId": "ad766447-1adf-4800-aade-9f31a356ab4b", "lockId": "", "ranges": [ diff --git a/src/main/java/com/att/research/mdbc/configurations/ranges.json b/src/main/java/com/att/research/mdbc/configurations/ranges.json index afa343b..2a792e8 100644 --- a/src/main/java/com/att/research/mdbc/configurations/ranges.json +++ b/src/main/java/com/att/research/mdbc/configurations/ranges.json @@ -1,6 +1,6 @@ { - "transactionInformationTable": "transactioninformation", - "transactionInformationIndex": "d0e8ef2e-aeca-4261-8d9d-1679f560b85b", + "musicRangeInformationTable": "transactioninformation", + "musicRangeInformationIndex": "d0e8ef2e-aeca-4261-8d9d-1679f560b85b", "partitionId": "798110cf-9c61-4db2-9446-cb2dbab5a143", "lockId": "", "ranges": [ diff --git a/src/main/java/com/att/research/mdbc/configurations/tableConfiguration.json b/src/main/java/com/att/research/mdbc/configurations/tableConfiguration.json index b3c6224..e67dd0b 100644 --- a/src/main/java/com/att/research/mdbc/configurations/tableConfiguration.json +++ b/src/main/java/com/att/research/mdbc/configurations/tableConfiguration.json @@ -3,8 +3,8 @@ { "tables":["table11"], "owner":"", - "titTableName":"transactioninformation", - "rrtTableName":"redorecords", + "mriTableName":"musicrangeinformation", + "mtxdTableName":"musictxdigest", "partitionId":"", "replicationFactor":1 } diff --git a/src/main/java/com/att/research/mdbc/mixins/CassandraMixin.java b/src/main/java/com/att/research/mdbc/mixins/CassandraMixin.java index 6684fe6..28090b7 100755 --- a/src/main/java/com/att/research/mdbc/mixins/CassandraMixin.java +++ b/src/main/java/com/att/research/mdbc/mixins/CassandraMixin.java @@ -16,6 +16,15 @@ import java.util.TreeSet; import java.util.UUID; import com.att.research.mdbc.*; +import com.att.research.mdbc.tables.PartitionInformation; +import com.att.research.mdbc.tables.RedoHistoryElement; +import com.att.research.mdbc.tables.RedoRecordId; +import com.att.research.mdbc.tables.StagingTable; +import com.att.research.mdbc.tables.TablePartitionInformation; +import com.att.research.mdbc.tables.TitReference; +import com.att.research.mdbc.tables.TransactionInformationElement; +import com.att.research.mdbc.tables.TxCommitProgress; + import org.json.JSONObject; import org.onap.music.datastore.CassaLockStore; import org.onap.music.datastore.PreparedQueryObject; @@ -84,8 +93,8 @@ public class CassandraMixin implements MusicInterface { public static final String PARTITION_INFORMATION_TABLE_NAME = "partitioninfo"; public static final String REDO_HISTORY_TABLE_NAME= "redohistory"; //\TODO Add logic to change the names when required and create the tables when necessary - private String redoRecordTableName = "redorecords"; - private String transactionInformationTableName = "transactioninformation"; + private String musicTxDigestTableName = "musictxdigest"; + private String musicRangeInformationTableName = "musicrangeinformation"; private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassandraMixin.class); @@ -152,7 +161,7 @@ public class CassandraMixin implements MusicInterface { this.music_ns = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE); logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns); - transactionInformationTableName = "transactioninformation"; + musicRangeInformationTableName = "transactioninformation"; createMusicKeyspace(); } @@ -211,10 +220,10 @@ public class CassandraMixin implements MusicInterface { } } @Override - public void initializeMdbcDataStructures() throws MDBCServiceException { + public void initializeMetricDataStructures() throws MDBCServiceException { try { - DatabaseOperations.CreateRedoRecordsTable(-1, music_ns, redoRecordTableName);//\TODO If we start partitioning the data base, we would need to use the redotable number - DatabaseOperations.CreateTransactionInformationTable(music_ns, transactionInformationTableName); + DatabaseOperations.CreateMusicTxDigest(-1, music_ns, musicTxDigestTableName);//\TODO If we start partitioning the data base, we would need to use the redotable number + DatabaseOperations.CreateMusicRangeInformationTable(music_ns, musicRangeInformationTableName); DatabaseOperations.CreateTableToPartitionTable(music_ns, TABLE_TO_PARTITION_TABLE_NAME); DatabaseOperations.CreatePartitionInfoTable(music_ns, PARTITION_INFORMATION_TABLE_NAME); DatabaseOperations.CreateRedoHistoryTable(music_ns, REDO_HISTORY_TABLE_NAME); @@ -1016,7 +1025,7 @@ public class CassandraMixin implements MusicInterface { } - private PreparedQueryObject createAppendRRTIndexToTitQuery(String titTable, String uuid, String table, String redoUuid){ + private PreparedQueryObject createAppendMtxdIndexToMriQuery(String titTable, String uuid, String table, String redoUuid){ PreparedQueryObject query = new PreparedQueryObject(); StringBuilder appendBuilder = new StringBuilder(); appendBuilder.append("UPDATE ") @@ -1088,12 +1097,12 @@ public class CassandraMixin implements MusicInterface { return lockId; } - protected void pushRowToRRT(String lockId, String commitId, HashMap transactionDigest) throws MDBCServiceException{ + protected void pushRowToMtxd(String lockId, String commitId, HashMap transactionDigest) throws MDBCServiceException{ PreparedQueryObject query = new PreparedQueryObject(); StringBuilder cqlQuery = new StringBuilder("INSERT INTO ") .append(music_ns) .append('.') - .append(redoRecordTableName) + .append(musicTxDigestTableName) .append(" (leaseid,leasecounter,transactiondigest) ") .append("VALUES ('") .append( lockId ).append("',") @@ -1115,15 +1124,15 @@ public class CassandraMixin implements MusicInterface { } } - protected void appendIndexToTit(String lockId, String commitId, String TITIndex) throws MDBCServiceException{ + protected void appendIndexToMri(String lockId, String commitId, String TITIndex) throws MDBCServiceException{ StringBuilder redoUuidBuilder = new StringBuilder(); redoUuidBuilder.append("('") .append(lockId) .append("',") .append(commitId) .append(")"); - PreparedQueryObject appendQuery = createAppendRRTIndexToTitQuery(transactionInformationTableName, TITIndex, redoRecordTableName, redoUuidBuilder.toString()); - ReturnType returnType = MusicPureCassaCore.criticalPut(music_ns, transactionInformationTableName, TITIndex, appendQuery, lockId, null); + PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, TITIndex, musicTxDigestTableName, redoUuidBuilder.toString()); + ReturnType returnType = MusicPureCassaCore.criticalPut(music_ns, musicRangeInformationTableName, TITIndex, appendQuery, lockId, null); if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){ logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage()); throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage()); @@ -1132,16 +1141,16 @@ public class CassandraMixin implements MusicInterface { @Override public void commitLog(DBInterface dbi, DatabasePartition partition, HashMap transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{ - String TITIndex = partition.getTransactionInformationIndex(); + String TITIndex = partition.getMusicRangeInformationIndex(); if(TITIndex.isEmpty()) { //\TODO Fetch TITIndex from the Range Information Table throw new MDBCServiceException("TIT Index retrieval not yet implemented"); } - String fullyQualifiedTitKey = music_ns+"."+ transactionInformationTableName +"."+TITIndex; + String fullyQualifiedTitKey = music_ns+"."+ musicRangeInformationTableName +"."+TITIndex; //0. See if reference to lock was already created String lockId = partition.getLockId(); if(lockId == null || lockId.isEmpty()) { - lockId = createAndAssignLock(fullyQualifiedTitKey,partition,music_ns,transactionInformationTableName,TITIndex); + lockId = createAndAssignLock(fullyQualifiedTitKey,partition,music_ns, musicRangeInformationTableName,TITIndex); } String commitId; @@ -1156,14 +1165,14 @@ public class CassandraMixin implements MusicInterface { //Add creation type of transaction digest //1. Push new row to RRT and obtain its index - pushRowToRRT(lockId, commitId, transactionDigest); + pushRowToMtxd(lockId, commitId, transactionDigest); //2. Save RRT index to RQ if(progressKeeper!= null) { progressKeeper.setRecordId(txId,new RedoRecordId(lockId, commitId)); } //3. Append RRT index into the corresponding TIT row array - appendIndexToTit(lockId,commitId,TITIndex); + appendIndexToMri(lockId,commitId,TITIndex); } /** diff --git a/src/main/java/com/att/research/mdbc/mixins/DBInterface.java b/src/main/java/com/att/research/mdbc/mixins/DBInterface.java index 9aa94f9..e2b2ad7 100755 --- a/src/main/java/com/att/research/mdbc/mixins/DBInterface.java +++ b/src/main/java/com/att/research/mdbc/mixins/DBInterface.java @@ -7,6 +7,7 @@ import java.util.Set; import com.att.research.mdbc.Range; import com.att.research.mdbc.TableInfo; +import com.att.research.mdbc.tables.StagingTable; /** * This Interface defines the methods that MDBC needs in order to mirror data to/from a Database instance. diff --git a/src/main/java/com/att/research/mdbc/mixins/MusicInterface.java b/src/main/java/com/att/research/mdbc/mixins/MusicInterface.java index 94b3ac6..6e2e0ca 100755 --- a/src/main/java/com/att/research/mdbc/mixins/MusicInterface.java +++ b/src/main/java/com/att/research/mdbc/mixins/MusicInterface.java @@ -10,6 +10,15 @@ import com.att.research.exceptions.MDBCServiceException; import com.att.research.mdbc.DatabasePartition; import com.att.research.mdbc.Range; import com.att.research.mdbc.TableInfo; +import com.att.research.mdbc.tables.PartitionInformation; +import com.att.research.mdbc.tables.RedoHistoryElement; +import com.att.research.mdbc.tables.RedoRecordId; +import com.att.research.mdbc.tables.StagingTable; +import com.att.research.mdbc.tables.TablePartitionInformation; +import com.att.research.mdbc.tables.TitReference; +import com.att.research.mdbc.tables.TransactionInformationElement; +import com.att.research.mdbc.tables.TxCommitProgress; + import org.onap.music.exceptions.MusicLockingException; /** @@ -22,7 +31,7 @@ public interface MusicInterface { * This function is used to created all the required data structures, both local * \TODO Check if this function is required in the MUSIC interface or could be just created on the constructor */ - void initializeMdbcDataStructures() throws MDBCServiceException; + void initializeMetricDataStructures() throws MDBCServiceException; /** * Get the name of this MusicInterface mixin object. * @return the name diff --git a/src/main/java/com/att/research/mdbc/mixins/MusicMixin.java b/src/main/java/com/att/research/mdbc/mixins/MusicMixin.java index 1fee59c..60adaf1 100644 --- a/src/main/java/com/att/research/mdbc/mixins/MusicMixin.java +++ b/src/main/java/com/att/research/mdbc/mixins/MusicMixin.java @@ -18,6 +18,15 @@ import com.att.research.exceptions.MDBCServiceException; import com.att.research.mdbc.DatabasePartition; import com.att.research.mdbc.Range; import com.att.research.mdbc.TableInfo; +import com.att.research.mdbc.tables.PartitionInformation; +import com.att.research.mdbc.tables.RedoHistoryElement; +import com.att.research.mdbc.tables.RedoRecordId; +import com.att.research.mdbc.tables.StagingTable; +import com.att.research.mdbc.tables.TablePartitionInformation; +import com.att.research.mdbc.tables.TitReference; +import com.att.research.mdbc.tables.TransactionInformationElement; +import com.att.research.mdbc.tables.TxCommitProgress; + import org.onap.music.main.MusicPureCassaCore; /** @@ -172,7 +181,7 @@ public class MusicMixin implements MusicInterface { } @Override - public void initializeMdbcDataStructures() { + public void initializeMetricDataStructures() { // } diff --git a/src/main/java/com/att/research/mdbc/mixins/MySQLMixin.java b/src/main/java/com/att/research/mdbc/mixins/MySQLMixin.java index a836a39..4f70147 100755 --- a/src/main/java/com/att/research/mdbc/mixins/MySQLMixin.java +++ b/src/main/java/com/att/research/mdbc/mixins/MySQLMixin.java @@ -21,6 +21,8 @@ import com.att.research.logging.EELFLoggerDelegate; import com.att.research.mdbc.MusicSqlManager; import com.att.research.mdbc.Range; import com.att.research.mdbc.TableInfo; +import com.att.research.mdbc.tables.OperationType; +import com.att.research.mdbc.tables.StagingTable; import net.sf.jsqlparser.JSQLParserException; import net.sf.jsqlparser.parser.CCJSqlParserUtil; diff --git a/src/main/java/com/att/research/mdbc/tables/Operation.java b/src/main/java/com/att/research/mdbc/tables/Operation.java new file mode 100644 index 0000000..026fa40 --- /dev/null +++ b/src/main/java/com/att/research/mdbc/tables/Operation.java @@ -0,0 +1,35 @@ +package com.att.research.mdbc.tables; + +import java.io.Serializable; + +import org.json.JSONObject; +import org.json.JSONTokener; + +public final class Operation implements Serializable{ + + private static final long serialVersionUID = -1215301985078183104L; + + final OperationType TYPE; + final String OLD_VAL; + final String NEW_VAL; + + public Operation(OperationType type, String newVal, String oldVal) { + TYPE = type; + NEW_VAL = newVal; + OLD_VAL = oldVal; + } + + public JSONObject getNewVal(){ + JSONObject newRow = new JSONObject(new JSONTokener(NEW_VAL)); + return newRow; + } + + public JSONObject getOldVal(){ + JSONObject keydata = new JSONObject(new JSONTokener(OLD_VAL)); + return keydata; + } + + public OperationType getOperationType() { + return this.TYPE; + } +} diff --git a/src/main/java/com/att/research/mdbc/tables/OperationType.java b/src/main/java/com/att/research/mdbc/tables/OperationType.java new file mode 100644 index 0000000..ae83485 --- /dev/null +++ b/src/main/java/com/att/research/mdbc/tables/OperationType.java @@ -0,0 +1,5 @@ +package com.att.research.mdbc.tables; + +public enum OperationType{ + DELETE, UPDATE, INSERT, SELECT +} diff --git a/src/main/java/com/att/research/mdbc/tables/PartitionInformation.java b/src/main/java/com/att/research/mdbc/tables/PartitionInformation.java new file mode 100644 index 0000000..9249844 --- /dev/null +++ b/src/main/java/com/att/research/mdbc/tables/PartitionInformation.java @@ -0,0 +1,19 @@ +package com.att.research.mdbc.tables; + +import java.util.List; + +public class PartitionInformation { + public final String partition; + public final TitReference tit; + public final List tables; + public final int replicationFactor; + public final String currentOwner; + + public PartitionInformation(String partition, TitReference tit, List tables, int replicationFactor, String currentOwner) { + this.partition=partition; + this.tit=tit; + this.tables=tables; + this.replicationFactor=replicationFactor; + this.currentOwner=currentOwner; + } +} diff --git a/src/main/java/com/att/research/mdbc/tables/RedoHistoryElement.java b/src/main/java/com/att/research/mdbc/tables/RedoHistoryElement.java new file mode 100644 index 0000000..8d92216 --- /dev/null +++ b/src/main/java/com/att/research/mdbc/tables/RedoHistoryElement.java @@ -0,0 +1,15 @@ +package com.att.research.mdbc.tables; + +import java.util.List; + +public final class RedoHistoryElement { + public final String partition; + public final TitReference current; + public final List previous; + + public RedoHistoryElement(String partition, TitReference current, List previous) { + this.partition = partition; + this.current = current; + this.previous = previous; + } +} diff --git a/src/main/java/com/att/research/mdbc/tables/RedoRecordId.java b/src/main/java/com/att/research/mdbc/tables/RedoRecordId.java new file mode 100644 index 0000000..225c89e --- /dev/null +++ b/src/main/java/com/att/research/mdbc/tables/RedoRecordId.java @@ -0,0 +1,15 @@ +package com.att.research.mdbc.tables; + +public final class RedoRecordId { + public final String leaseId; + public final String commitId; + + public RedoRecordId(String leaseId, String commitId) { + this.leaseId = leaseId; + this.commitId = commitId; + } + + public boolean isEmpty() { + return (this.leaseId==null || this.leaseId.isEmpty())&&(this.commitId==null||this.commitId.isEmpty()); + } +} diff --git a/src/main/java/com/att/research/mdbc/tables/StagingTable.java b/src/main/java/com/att/research/mdbc/tables/StagingTable.java new file mode 100644 index 0000000..c16f11c --- /dev/null +++ b/src/main/java/com/att/research/mdbc/tables/StagingTable.java @@ -0,0 +1,51 @@ +package com.att.research.mdbc.tables; + +import java.io.Serializable; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Set; +import org.apache.commons.lang3.tuple.Pair; +import org.json.JSONObject; + +import com.att.research.logging.EELFLoggerDelegate; + +public class StagingTable implements Serializable{ + /** + * + */ + private static final long serialVersionUID = 7583182634761771943L; + private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(StagingTable.class); + //primary key -> Operation + private HashMap> operations; + + public StagingTable() { + operations = new HashMap<>(); + } + + synchronized public void addOperation(String key, OperationType type, String oldVal, String newVal) { + if(!operations.containsKey(key)) { + operations.put(key, new LinkedList<>()); + } + operations.get(key).add(new Operation(type,newVal,oldVal)); + } + + synchronized public Deque> getIterableSnapshot() throws NoSuchFieldException{ + Deque> response=new LinkedList>(); + //\TODO: check if we can just return the last change to a given key + Set keys = operations.keySet(); + for(String key : keys) { + Deque ops = operations.get(key); + if(ops.isEmpty()) { + logger.error(EELFLoggerDelegate.errorLogger, "Invalid state of the Operation data structure when creating snapshot"); + throw new NoSuchFieldException("Invalid state of the operation data structure"); + } + response.add(Pair.of(key,ops.getLast())); + } + return response; + } + + synchronized public void clean() { + operations.clear(); + } +} diff --git a/src/main/java/com/att/research/mdbc/tables/TablePartitionInformation.java b/src/main/java/com/att/research/mdbc/tables/TablePartitionInformation.java new file mode 100644 index 0000000..9201de5 --- /dev/null +++ b/src/main/java/com/att/research/mdbc/tables/TablePartitionInformation.java @@ -0,0 +1,15 @@ +package com.att.research.mdbc.tables; + +import java.util.List; + +public final class TablePartitionInformation { + public final String table; + public final String partition; + public final List oldPartitions; + + public TablePartitionInformation(String table, String partition, List oldPartitions) { + this.table = table; + this.partition = partition; + this.oldPartitions = oldPartitions; + } +} diff --git a/src/main/java/com/att/research/mdbc/tables/TitReference.java b/src/main/java/com/att/research/mdbc/tables/TitReference.java new file mode 100644 index 0000000..2abb989 --- /dev/null +++ b/src/main/java/com/att/research/mdbc/tables/TitReference.java @@ -0,0 +1,12 @@ +package com.att.research.mdbc.tables; + +public final class TitReference { + public final String table; + public final String index; + + public TitReference(String table, String index) { + this.table = table; + this.index= index; + } + +} diff --git a/src/main/java/com/att/research/mdbc/tables/TransactionInformationElement.java b/src/main/java/com/att/research/mdbc/tables/TransactionInformationElement.java new file mode 100644 index 0000000..a80cab1 --- /dev/null +++ b/src/main/java/com/att/research/mdbc/tables/TransactionInformationElement.java @@ -0,0 +1,19 @@ +package com.att.research.mdbc.tables; + +import java.util.List; + +public final class TransactionInformationElement { + public final String index; + public final List redoLog; + public final String partition; + public final int latestApplied; + public final boolean applied; + + public TransactionInformationElement(String index, List redoLog, String partition, int latestApplied, boolean applied) { + this.index = index; + this.redoLog = redoLog; + this.partition = partition; + this.latestApplied = latestApplied; + this.applied = applied; + } +} diff --git a/src/main/java/com/att/research/mdbc/tables/TxCommitProgress.java b/src/main/java/com/att/research/mdbc/tables/TxCommitProgress.java new file mode 100644 index 0000000..cecdb08 --- /dev/null +++ b/src/main/java/com/att/research/mdbc/tables/TxCommitProgress.java @@ -0,0 +1,206 @@ +package com.att.research.mdbc.tables; + +import java.math.BigInteger; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import com.att.research.logging.EELFLoggerDelegate; + +import java.sql.Connection; +import java.util.concurrent.atomic.AtomicReference; + + +public class TxCommitProgress{ + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TxCommitProgress.class); + + private AtomicReference nextCommitId; + private Map transactionInfo; + + public TxCommitProgress(){ + nextCommitId=new AtomicReference<>(BigInteger.ZERO); + transactionInfo = new ConcurrentHashMap<>(); + } + + public boolean containsTx(String txId) { + return transactionInfo.containsKey(txId); + } + + public BigInteger getCommitId(String txId) { + CommitProgress prog = transactionInfo.get(txId); + if(prog.isCommitIdAssigned()) { + return prog.getCommitId(); + } + BigInteger commitId = nextCommitId.getAndUpdate((a)-> a.add(BigInteger.ONE)); + prog.setCommitId(commitId); + return commitId; + } + + public void createNewTransactionTracker(String id, Connection conn) { + transactionInfo.put(id, new CommitProgress(id,conn)); + } + + public void commitRequested(String txId) { + CommitProgress prog = transactionInfo.get(txId); + if(prog == null){ + logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when storing commit request",txId); + } + prog.setCommitRequested(); + } + + public void setSQLDone(String txId) { + CommitProgress prog = transactionInfo.get(txId); + if(prog == null){ + logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when storing saving completion of SQL",txId); + } + prog.setSQLCompleted(); + } + + public void setMusicDone(String txId) { + CommitProgress prog = transactionInfo.get(txId); + if(prog == null){ + logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when storing saving completion of Music",txId); + } + prog.setMusicCompleted(); + } + + public Connection getConnection(String txId){ + CommitProgress prog = transactionInfo.get(txId); + if(prog == null){ + logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when retrieving statement",txId); + } + return prog.getConnection(); + } + + public void setRecordId(String txId, RedoRecordId recordId){ + CommitProgress prog = transactionInfo.get(txId); + if(prog == null){ + logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when setting record Id",txId); + } + prog.setRecordId(recordId); + } + + public RedoRecordId getRecordId(String txId) { + CommitProgress prog = transactionInfo.get(txId); + if(prog == null){ + logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when getting record Id",txId); + } + return prog.getRecordId(); + } + + public boolean isRecordIdAssigned(String txId) { + CommitProgress prog = transactionInfo.get(txId); + if(prog == null){ + logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when checking record",txId); + } + return prog.isRedoRecordAssigned(); + } + + public boolean isComplete(String txId) { + CommitProgress prog = transactionInfo.get(txId); + if(prog == null){ + logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when checking completion",txId); + } + return prog.isComplete(); + } + + public void reinitializeTxProgress(String txId) { + CommitProgress prog = transactionInfo.get(txId); + if(prog == null){ + logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when reinitializing tx progress",txId); + } + prog.reinitialize(); + } + + public void deleteTxProgress(String txId){ + transactionInfo.remove(txId); + } +} + +final class CommitProgress{ + private String lTxId; // local transaction id + private BigInteger commitId; // commit id + private boolean commitRequested; //indicates if the user tried to commit the request already. + private boolean SQLDone; // indicates if SQL was already committed + private boolean MusicDone; // indicates if music commit was already performed, atomic bool + private Connection connection;// reference to a connection object. This is used to complete a commit if it failed in the original thread. + private Long timestamp; // last time this data structure was updated + private RedoRecordId redoRecordId;// record id for each partition + + public CommitProgress(String id,Connection conn){ + redoRecordId=null; + lTxId = id; + commitRequested = false; + SQLDone = false; + MusicDone = false; + connection = conn; + commitId = null; + timestamp = System.currentTimeMillis(); + } + + public synchronized boolean isComplete() { + return commitRequested && SQLDone && MusicDone; + } + + public synchronized void setCommitId(BigInteger commitId) { + this.commitId = commitId; + timestamp = System.currentTimeMillis(); + } + + public synchronized void reinitialize() { + commitId = null; + redoRecordId=null; + commitRequested = false; + SQLDone = false; + MusicDone = false; + timestamp = System.currentTimeMillis(); + } + + public synchronized void setCommitRequested() { + commitRequested = true; + timestamp = System.currentTimeMillis(); + } + + public synchronized void setSQLCompleted() { + SQLDone = true; + timestamp = System.currentTimeMillis(); + } + + public synchronized void setMusicCompleted() { + MusicDone = true; + timestamp = System.currentTimeMillis(); + } + + public Connection getConnection() { + timestamp = System.currentTimeMillis(); + return connection; + } + + public long getTimestamInMillis() { + return timestamp; + } + + public synchronized void setRecordId(RedoRecordId id) { + redoRecordId = id; + timestamp = System.currentTimeMillis(); + } + + public synchronized boolean isRedoRecordAssigned() { + return this.redoRecordId!=null; + } + + public synchronized RedoRecordId getRecordId() { + return redoRecordId; + } + + public synchronized BigInteger getCommitId() { + return commitId; + } + + public synchronized String getId() { + return this.lTxId; + } + + public synchronized boolean isCommitIdAssigned() { + return this.commitId!= null; + } +} \ No newline at end of file diff --git a/src/main/java/com/att/research/mdbc/tools/CreatePartition.java b/src/main/java/com/att/research/mdbc/tools/CreatePartition.java index 09524cb..83f210d 100644 --- a/src/main/java/com/att/research/mdbc/tools/CreatePartition.java +++ b/src/main/java/com/att/research/mdbc/tools/CreatePartition.java @@ -1,17 +1,10 @@ package com.att.research.mdbc.tools; import com.att.research.logging.EELFLoggerDelegate; -import com.att.research.mdbc.DatabasePartition; -import com.att.research.mdbc.MDBCUtils; -import com.att.research.mdbc.Range; import com.att.research.mdbc.configurations.NodeConfiguration; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; - public class CreatePartition { public static final EELFLoggerDelegate LOG = EELFLoggerDelegate.getLogger(CreatePartition.class); @@ -27,9 +20,9 @@ public class CreatePartition { @Parameter(names = { "-n", "--tit-table-name" }, required = true, description = "Tit Table name") private String titTable; - @Parameter(names = { "-r", "--redorecords-table-name" }, required = true, - description = "Redo Records Table name") - private String rrTable; + @Parameter(names = { "-r", "--music-tx-digest-table-name" }, required = true, + description = "Music Transaction Digest Table name") + private String mTxDTable; @Parameter(names = { "-p", "--partition-id" }, required = true, description = "Partition Id") private String partitionId; @@ -43,7 +36,7 @@ public class CreatePartition { } public void convert(){ - config = new NodeConfiguration(tables,titIndex,titTable,partitionId,"test","",rrTable); + config = new NodeConfiguration(tables,titIndex,titTable,partitionId,"test","", mTxDTable); } public void saveToFile(){ diff --git a/src/test/java/com/att/research/mdbc/MDBCUtilsTest.java b/src/test/java/com/att/research/mdbc/MDBCUtilsTest.java index cdee078..28af754 100644 --- a/src/test/java/com/att/research/mdbc/MDBCUtilsTest.java +++ b/src/test/java/com/att/research/mdbc/MDBCUtilsTest.java @@ -1,7 +1,8 @@ package com.att.research.mdbc; -import com.att.research.mdbc.mixins.OperationType; -import com.att.research.mdbc.mixins.StagingTable; +import com.att.research.mdbc.tables.OperationType; +import com.att.research.mdbc.tables.StagingTable; + import org.json.JSONArray; import org.json.JSONObject; import org.junit.Test; -- cgit 1.2.3-korg