aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorTschaen, Brendan <ctschaen@att.com>2018-10-17 17:26:32 -0400
committerTschaen, Brendan <ctschaen@att.com>2018-10-17 17:26:52 -0400
commit505d5c6cd7ee3c6f31c66dc6c2de99f6a97a334c (patch)
treed61162c3bcdf5c515184206ad57a306fefb67f8b /src
parent73f8de325b31d350883f6752fb04d63c41112e8f (diff)
Finish adding METRIC code
Change-Id: Ifd0307ac21f85e504d690c79080174a50af87f9e Issue-ID: MUSIC-149 Signed-off-by: Tschaen, Brendan <ctschaen@att.com>
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/att/research/mdbc/ArchiveProcess.java11
-rw-r--r--src/main/java/com/att/research/mdbc/DatabaseOperations.java120
-rw-r--r--src/main/java/com/att/research/mdbc/DatabasePartition.java51
-rw-r--r--src/main/java/com/att/research/mdbc/MDBCUtils.java32
-rw-r--r--src/main/java/com/att/research/mdbc/MdbcConnection.java2
-rwxr-xr-xsrc/main/java/com/att/research/mdbc/MusicSqlManager.java430
-rw-r--r--src/main/java/com/att/research/mdbc/StateManager.java8
-rw-r--r--src/main/java/com/att/research/mdbc/configurations/NodeConfiguration.java4
-rw-r--r--src/main/java/com/att/research/mdbc/configurations/TablesConfiguration.java34
-rw-r--r--src/main/java/com/att/research/mdbc/configurations/config-0.json6
-rw-r--r--src/main/java/com/att/research/mdbc/configurations/ranges.json4
-rw-r--r--src/main/java/com/att/research/mdbc/configurations/tableConfiguration.json4
-rwxr-xr-xsrc/main/java/com/att/research/mdbc/mixins/CassandraMixin.java43
-rwxr-xr-xsrc/main/java/com/att/research/mdbc/mixins/DBInterface.java1
-rwxr-xr-xsrc/main/java/com/att/research/mdbc/mixins/MusicInterface.java11
-rw-r--r--src/main/java/com/att/research/mdbc/mixins/MusicMixin.java11
-rwxr-xr-xsrc/main/java/com/att/research/mdbc/mixins/MySQLMixin.java2
-rw-r--r--src/main/java/com/att/research/mdbc/tables/Operation.java35
-rw-r--r--src/main/java/com/att/research/mdbc/tables/OperationType.java5
-rw-r--r--src/main/java/com/att/research/mdbc/tables/PartitionInformation.java19
-rw-r--r--src/main/java/com/att/research/mdbc/tables/RedoHistoryElement.java15
-rw-r--r--src/main/java/com/att/research/mdbc/tables/RedoRecordId.java15
-rw-r--r--src/main/java/com/att/research/mdbc/tables/StagingTable.java51
-rw-r--r--src/main/java/com/att/research/mdbc/tables/TablePartitionInformation.java15
-rw-r--r--src/main/java/com/att/research/mdbc/tables/TitReference.java12
-rw-r--r--src/main/java/com/att/research/mdbc/tables/TransactionInformationElement.java19
-rw-r--r--src/main/java/com/att/research/mdbc/tables/TxCommitProgress.java206
-rw-r--r--src/main/java/com/att/research/mdbc/tools/CreatePartition.java15
-rw-r--r--src/test/java/com/att/research/mdbc/MDBCUtilsTest.java5
29 files changed, 817 insertions, 369 deletions
diff --git a/src/main/java/com/att/research/mdbc/ArchiveProcess.java b/src/main/java/com/att/research/mdbc/ArchiveProcess.java
index f192430..8290d66 100644
--- a/src/main/java/com/att/research/mdbc/ArchiveProcess.java
+++ b/src/main/java/com/att/research/mdbc/ArchiveProcess.java
@@ -12,10 +12,9 @@ public class ArchiveProcess {
//TODO: This is a place holder for taking snapshots and moving data from redo record into actual tables
/**
- * This method is called whenever there is a DELETE on a local SQL table, and should be called by the underlying databases
- * triggering mechanism. It updates the MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL DELETE.
- * Music propagates it to the other replicas. If the local database is in the middle of a transaction, the DELETEs to MUSIC are
- * delayed until the transaction is either committed or rolled back.
+ * This method is called whenever there is a DELETE on the transaction digest and should be called when ownership changes, if required
+ * It updates the MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL DELETE.
+ * Music propagates it to the other replicas.
* @param tableName This is the table on which the select is being performed
* @param oldRow This is information about the row that is being deleted
*/
@@ -26,8 +25,8 @@ public class ArchiveProcess {
}
/**
- * This method is called whenever there is an INSERT or UPDATE to a local SQL table, and should be called by the underlying databases
- * triggering mechanism. It updates the MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write.
+ * This method is called whenever there is an INSERT or UPDATE to a the transaction digest, and should be called by an
+ * ownership chance. It updates the MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write.
* Music propagates it to the other replicas. If the local database is in the middle of a transaction, the updates to MUSIC are
* delayed until the transaction is either committed or rolled back.
*
diff --git a/src/main/java/com/att/research/mdbc/DatabaseOperations.java b/src/main/java/com/att/research/mdbc/DatabaseOperations.java
index 406152e..bd10928 100644
--- a/src/main/java/com/att/research/mdbc/DatabaseOperations.java
+++ b/src/main/java/com/att/research/mdbc/DatabaseOperations.java
@@ -263,64 +263,9 @@ public class DatabaseOperations {
return id;
}
- /**
- * This function creates the TransactionInformation table. It contain information related
- * to the transactions happening in a given partition.
- * * The schema of the table is
- * * Id, uiid.
- * * Partition, uuid id of the partition
- * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables
- * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables
- * * Redo: list of uiids associated to the Redo Records Table
- *
- */
- public static void CreateTransactionInformationTable( String musicNamespace, String transactionInformationTableName) throws MDBCServiceException {
- String tableName = transactionInformationTableName;
- String priKey = "id";
- StringBuilder fields = new StringBuilder();
- fields.append("id uuid, ");
- fields.append("partition uuid, ");
- fields.append("latestapplied int, ");
- fields.append("applied boolean, ");
- //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly
- fields.append("redo list<frozen<tuple<text,tuple<text,varint>>>> ");
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
- try {
- executeMusicWriteQuery(musicNamespace,tableName,cql);
- } catch (MDBCServiceException e) {
- logger.error("Initialization error: Failure to create transaction information table");
- throw(e);
- }
- }
- /**
- * This function creates the RedoRecords table. It contain information related to each transaction committed
- * * LeaseId: id associated with the lease, text
- * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
- * * TransactionDigest: text that contains all the changes in the transaction
- */
- public static void CreateRedoRecordsTable(int redoTableNumber, String musicNamespace, String redoRecordTableName) throws MDBCServiceException {
- String tableName = redoRecordTableName;
- if(redoTableNumber >= 0) {
- StringBuilder table = new StringBuilder();
- table.append(tableName);
- table.append("-");
- table.append(Integer.toString(redoTableNumber));
- tableName=table.toString();
- }
- String priKey = "leaseid,leasecounter";
- StringBuilder fields = new StringBuilder();
- fields.append("leaseid text, ");
- fields.append("leasecounter varint, ");
- fields.append("transactiondigest text ");//notice lack of ','
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
- try {
- executeMusicWriteQuery(musicNamespace,tableName,cql);
- } catch (MDBCServiceException e) {
- logger.error("Initialization error: Failure to create redo records table");
- throw(e);
- }
- }
+
+
/**
* This function creates the Table To Partition table. It contain information related to
@@ -440,4 +385,65 @@ public class DatabaseOperations {
}
}
}
+
+ /**
+ * This function creates the MusicTxDigest table. It contain information related to each transaction committed
+ * * LeaseId: id associated with the lease, text
+ * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
+ * * TransactionDigest: text that contains all the changes in the transaction
+ */
+ public static void CreateMusicTxDigest(int musicTxDigestTableNumber, String musicNamespace, String musicTxDigestTableName) throws MDBCServiceException {
+ String tableName = musicTxDigestTableName;
+ if(musicTxDigestTableNumber >= 0) {
+ StringBuilder table = new StringBuilder();
+ table.append(tableName);
+ table.append("-");
+ table.append(Integer.toString(musicTxDigestTableNumber));
+ tableName=table.toString();
+ }
+ String priKey = "leaseid,leasecounter";
+ StringBuilder fields = new StringBuilder();
+ fields.append("leaseid text, ");
+ fields.append("leasecounter varint, ");
+ fields.append("transactiondigest text ");//notice lack of ','
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
+ try {
+ executeMusicWriteQuery(musicNamespace,tableName,cql);
+ } catch (MDBCServiceException e) {
+ logger.error("Initialization error: Failure to create redo records table");
+ throw(e);
+ }
+ }
+
+ /**
+ * This function creates the TransactionInformation table. It contain information related
+ * to the transactions happening in a given partition.
+ * * The schema of the table is
+ * * Id, uiid.
+ * * Partition, uuid id of the partition
+ * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables
+ * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables
+ * * Redo: list of uiids associated to the Redo Records Table
+ *
+ */
+ public static void CreateMusicRangeInformationTable(String musicNamespace, String musicRangeInformationTableName) throws MDBCServiceException {
+ String tableName = musicRangeInformationTableName;
+ String priKey = "id";
+ StringBuilder fields = new StringBuilder();
+ fields.append("id uuid, ");
+ fields.append("partition uuid, ");
+ fields.append("latestapplied int, ");
+ fields.append("applied boolean, ");
+ //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly
+ fields.append("redo list<frozen<tuple<text,tuple<text,varint>>>> ");
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
+ try {
+ executeMusicWriteQuery(musicNamespace,tableName,cql);
+ } catch (MDBCServiceException e) {
+ logger.error("Initialization error: Failure to create transaction information table");
+ throw(e);
+ }
+ }
+
+
}
diff --git a/src/main/java/com/att/research/mdbc/DatabasePartition.java b/src/main/java/com/att/research/mdbc/DatabasePartition.java
index 6046801..a9b4f3e 100644
--- a/src/main/java/com/att/research/mdbc/DatabasePartition.java
+++ b/src/main/java/com/att/research/mdbc/DatabasePartition.java
@@ -7,7 +7,6 @@ import java.util.HashSet;
import java.util.Set;
import com.att.research.logging.EELFLoggerDelegate;
-import com.att.research.mdbc.mixins.CassandraMixin;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -19,9 +18,9 @@ import com.google.gson.GsonBuilder;
public class DatabasePartition {
private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(DatabasePartition.class);
- private String transactionInformationTable;//Table that currently contains the REDO log for this partition
- private String transactionInformationIndex;//Index that can be obtained either from
- private String redoRecordsTable;
+ private String musicRangeInformationTable;//Table that currently contains the REDO log for this partition
+ private String musicRangeInformationIndex;//Index that can be obtained either from
+ private String musicTxDigestTable;
private String partitionId;
private String lockId;
protected Set<Range> ranges;
@@ -35,7 +34,7 @@ public class DatabasePartition {
ranges = new HashSet<>();
}
- public DatabasePartition(Set<Range> knownRanges, String titIndex, String titTable, String partitionId, String lockId, String redoRecordsTable) {
+ public DatabasePartition(Set<Range> knownRanges, String mriIndex, String mriTable, String partitionId, String lockId, String musicTxDigestTable) {
if(knownRanges != null) {
ranges = knownRanges;
}
@@ -43,25 +42,25 @@ public class DatabasePartition {
ranges = new HashSet<>();
}
- if(redoRecordsTable != null) {
- this.setRedoRecordsTable(redoRecordsTable);
+ if(musicTxDigestTable != null) {
+ this.setMusicTxDigestTable(musicTxDigestTable);
}
else{
- this.setRedoRecordsTable("");
+ this.setMusicTxDigestTable("");
}
- if(titIndex != null) {
- this.setTransactionInformationIndex(titIndex);
+ if(mriIndex != null) {
+ this.setMusicRangeInformationIndex(mriIndex);
}
else {
- this.setTransactionInformationIndex("");
+ this.setMusicRangeInformationIndex("");
}
- if(titTable != null) {
- this.setTransactionInformationTable(titTable);
+ if(mriTable != null) {
+ this.setMusicRangeInformationTable(mriTable);
}
else {
- this.setTransactionInformationTable("");
+ this.setMusicRangeInformationTable("");
}
if(partitionId != null) {
@@ -79,20 +78,20 @@ public class DatabasePartition {
}
}
- public String getTransactionInformationTable() {
- return transactionInformationTable;
+ public String getMusicRangeInformationTable() {
+ return musicRangeInformationTable;
}
- public void setTransactionInformationTable(String transactionInformationTable) {
- this.transactionInformationTable = transactionInformationTable;
+ public void setMusicRangeInformationTable(String musicRangeInformationTable) {
+ this.musicRangeInformationTable = musicRangeInformationTable;
}
- public String getTransactionInformationIndex() {
- return transactionInformationIndex;
+ public String getMusicRangeInformationIndex() {
+ return musicRangeInformationIndex;
}
- public void setTransactionInformationIndex(String transactionInformationIndex) {
- this.transactionInformationIndex = transactionInformationIndex;
+ public void setMusicRangeInformationIndex(String musicRangeInformationIndex) {
+ this.musicRangeInformationIndex = musicRangeInformationIndex;
}
/**
@@ -180,11 +179,11 @@ public class DatabasePartition {
this.lockId = lockId;
}
- public String getRedoRecordsTable() {
- return redoRecordsTable;
+ public String getMusicTxDigestTable() {
+ return musicTxDigestTable;
}
- public void setRedoRecordsTable(String redoRecordsTable) {
- this.redoRecordsTable = redoRecordsTable;
+ public void setMusicTxDigestTable(String musicTxDigestTable) {
+ this.musicTxDigestTable = musicTxDigestTable;
}
}
diff --git a/src/main/java/com/att/research/mdbc/MDBCUtils.java b/src/main/java/com/att/research/mdbc/MDBCUtils.java
index 411be8d..34f4b10 100644
--- a/src/main/java/com/att/research/mdbc/MDBCUtils.java
+++ b/src/main/java/com/att/research/mdbc/MDBCUtils.java
@@ -2,26 +2,23 @@ package com.att.research.mdbc;
import java.io.*;
import java.util.Base64;
+import java.util.Deque;
+import java.util.HashMap;
import com.att.research.logging.EELFLoggerDelegate;
import com.att.research.logging.format.AppMessages;
import com.att.research.logging.format.ErrorSeverity;
import com.att.research.logging.format.ErrorTypes;
+import com.att.research.mdbc.tables.Operation;
+import com.att.research.mdbc.tables.StagingTable;
+
+import javassist.bytecode.Descriptor.Iterator;
+
+import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONObject;
public class MDBCUtils {
- /** Read the object from Base64 string. */
- public static Object fromString( String s ) throws IOException ,
- ClassNotFoundException {
- byte [] data = Base64.getDecoder().decode( s );
- ObjectInputStream ois = new ObjectInputStream(
- new ByteArrayInputStream( data ) );
- Object o = ois.readObject();
- ois.close();
- return o;
- }
-
- /** Write the object to a Base64 string. */
+ /** Write the object to a Base64 string. */
public static String toString( Serializable o ) throws IOException {
//TODO We may want to also compress beside serialize
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -44,6 +41,17 @@ public class MDBCUtils {
oos.close();
return Base64.getEncoder().encodeToString(baos.toByteArray());
}
+
+ /** Read the object from Base64 string. */
+ public static Object fromString( String s ) throws IOException ,
+ ClassNotFoundException {
+ byte [] data = Base64.getDecoder().decode( s );
+ ObjectInputStream ois = new ObjectInputStream(
+ new ByteArrayInputStream( data ) );
+ Object o = ois.readObject();
+ ois.close();
+ return o;
+ }
public static void saveToFile(String serializedContent, String filename, EELFLoggerDelegate logger) throws IOException {
try (PrintWriter fout = new PrintWriter(filename)) {
diff --git a/src/main/java/com/att/research/mdbc/MdbcConnection.java b/src/main/java/com/att/research/mdbc/MdbcConnection.java
index d471522..1e845fd 100644
--- a/src/main/java/com/att/research/mdbc/MdbcConnection.java
+++ b/src/main/java/com/att/research/mdbc/MdbcConnection.java
@@ -26,7 +26,7 @@ import com.att.research.logging.format.AppMessages;
import com.att.research.logging.format.ErrorSeverity;
import com.att.research.logging.format.ErrorTypes;
import com.att.research.mdbc.mixins.MusicInterface;
-import com.att.research.mdbc.mixins.TxCommitProgress;
+import com.att.research.mdbc.tables.TxCommitProgress;
/**
diff --git a/src/main/java/com/att/research/mdbc/MusicSqlManager.java b/src/main/java/com/att/research/mdbc/MusicSqlManager.java
index 4330cfe..e32a969 100755
--- a/src/main/java/com/att/research/mdbc/MusicSqlManager.java
+++ b/src/main/java/com/att/research/mdbc/MusicSqlManager.java
@@ -1,22 +1,16 @@
package com.att.research.mdbc;
import java.sql.Connection;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
+import java.util.*;
import org.json.JSONObject;
import com.att.research.mdbc.mixins.DBInterface;
import com.att.research.mdbc.mixins.MixinFactory;
import com.att.research.mdbc.mixins.MusicInterface;
-import com.att.research.mdbc.mixins.StagingTable;
-import com.att.research.mdbc.mixins.TxCommitProgress;
import com.att.research.mdbc.mixins.Utils;
-
+import com.att.research.mdbc.tables.StagingTable;
+import com.att.research.mdbc.tables.TxCommitProgress;
import com.att.research.exceptions.MDBCServiceException;
import com.att.research.exceptions.QueryException;
import com.att.research.logging.*;
@@ -41,117 +35,121 @@ import com.att.research.logging.format.ErrorTypes;
*/
public class MusicSqlManager {
- private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicSqlManager.class);
-
- private final DBInterface dbi;
- private final MusicInterface mi;
- private final Set<String> table_set;
- private final HashMap<Range,StagingTable> transactionDigest;
- private boolean autocommit; // a copy of the autocommit flag from the JDBC Connection
+ private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicSqlManager.class);
- /**
- * Build a MusicSqlManager for a DB connection. This construct may only be called by getMusicSqlManager(),
- * which will ensure that only one MusicSqlManager is created per URL.
- * This is the location where the appropriate mixins to use for the MusicSqlManager should be determined.
- * They should be picked based upon the URL and the properties passed to this constructor.
- * <p>
- * At the present time, we only support the use of the H2Mixin (for access to a local H2 database),
- * with the CassandraMixin (for direct access to a Cassandra noSQL DB as the persistence layer).
- * </p>
- *
- * @param url the JDBC URL which was used to connection to the database
- * @param conn the actual connection to the database
- * @param info properties passed from the initial JDBC connect() call
- * @throws MDBCServiceException
- */
- public MusicSqlManager(String url, Connection conn, Properties info, MusicInterface mi) throws MDBCServiceException {
- try {
- info.putAll(Utils.getMdbcProperties());
- String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT);
- this.dbi = MixinFactory.createDBInterface(mixinDb, this, url, conn, info);
- this.mi = mi;
- this.table_set = Collections.synchronizedSet(new HashSet<String>());
- this.autocommit = true;
- this.transactionDigest = new HashMap<Range,StagingTable>();
+ private final DBInterface dbi;
+ private final MusicInterface mi;
+ private final Set<String> table_set;
+ private final HashMap<Range, StagingTable> transactionDigest;
+ private boolean autocommit; // a copy of the autocommit flag from the JDBC Connection
- }catch(Exception e) {
- throw new MDBCServiceException(e.getMessage());
- }
- }
+ /**
+ * Build a MusicSqlManager for a DB connection. This construct may only be called by getMusicSqlManager(),
+ * which will ensure that only one MusicSqlManager is created per URL.
+ * This is the location where the appropriate mixins to use for the MusicSqlManager should be determined.
+ * They should be picked based upon the URL and the properties passed to this constructor.
+ * <p>
+ * At the present time, we only support the use of the H2Mixin (for access to a local H2 database),
+ * with the CassandraMixin (for direct access to a Cassandra noSQL DB as the persistence layer).
+ * </p>
+ *
+ * @param url the JDBC URL which was used to connection to the database
+ * @param conn the actual connection to the database
+ * @param info properties passed from the initial JDBC connect() call
+ * @throws MDBCServiceException
+ */
+ public MusicSqlManager(String url, Connection conn, Properties info, MusicInterface mi) throws MDBCServiceException {
+ try {
+ info.putAll(Utils.getMdbcProperties());
+ String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT);
+ this.dbi = MixinFactory.createDBInterface(mixinDb, this, url, conn, info);
+ this.mi = mi;
+ this.table_set = Collections.synchronizedSet(new HashSet<String>());
+ this.autocommit = true;
+ this.transactionDigest = new HashMap<Range, StagingTable>();
- public void setAutoCommit(boolean b,String txId, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException {
- if (b != autocommit) {
- autocommit = b;
- logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b);
- if (b) {
- // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction
- if(txId == null || txId.isEmpty()) {
- logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
- throw new MDBCServiceException("tx id is null");
- }
- commit(txId,progressKeeper,partition);
- }
- }
- }
+ } catch (Exception e) {
+ throw new MDBCServiceException(e.getMessage());
+ }
+ }
- /**
- * Close this MusicSqlManager.
- */
- public void close() {
- if (dbi != null) {
- dbi.close();
- }
- }
+ public void setAutoCommit(boolean b, String txId, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException {
+ if (b != autocommit) {
+ autocommit = b;
+ logger.debug(EELFLoggerDelegate.applicationLogger, "autocommit changed to " + b);
+ if (b) {
+ // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction
+ if (txId == null || txId.isEmpty()) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null", AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
+ throw new MDBCServiceException("tx id is null");
+ }
+ commit(txId, progressKeeper, partition);
+ }
+ }
+ }
- /**
- * Code to be run within the DB driver before a SQL statement is executed. This is where tables
- * can be synchronized before a SELECT, for those databases that do not support SELECT triggers.
- * @param sql the SQL statement that is about to be executed
- */
- public void preStatementHook(final String sql) {
- dbi.preStatementHook(sql);
- }
- /**
- * Code to be run within the DB driver after a SQL statement has been executed. This is where remote
- * statement actions can be copied back to Cassandra/MUSIC.
- * @param sql the SQL statement that was executed
- */
- public void postStatementHook(final String sql) {
- dbi.postStatementHook(sql,transactionDigest);
- }
- /**
- * Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the
- * proxy first starts, and whenever there is the possibility that tables were created or dropped. It is synchronized
- * in order to prevent multiple threads from running this code in parallel.
- */
- public synchronized void synchronizeTables() throws QueryException {
- Set<String> set1 = dbi.getSQLTableSet(); // set of tables in the database
- logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1);
- for (String tableName : set1) {
- // This map will be filled in if this table was previously discovered
- if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) {
- logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName);
- try {
- TableInfo ti = dbi.getTableInfo(tableName);
- mi.initializeMusicForTable(ti,tableName);
- //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted
- ti = dbi.getTableInfo(tableName);
- mi.createDirtyRowTable(ti,tableName);
- dbi.createSQLTriggers(tableName);
- table_set.add(tableName);
- synchronizeTableData(tableName);
- logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" +
- table_set.size() + "/" + set1.size() + "tables uploaded");
- } catch (Exception e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
- //logger.error(EELFLoggerDelegate.errorLogger, "Exception synchronizeTables: "+e);
- throw new QueryException();
- }
- }
- }
+ /**
+ * Close this MusicSqlManager.
+ */
+ public void close() {
+ if (dbi != null) {
+ dbi.close();
+ }
+ }
+
+ /**
+ * Code to be run within the DB driver before a SQL statement is executed. This is where tables
+ * can be synchronized before a SELECT, for those databases that do not support SELECT triggers.
+ *
+ * @param sql the SQL statement that is about to be executed
+ */
+ public void preStatementHook(final String sql) {
+ dbi.preStatementHook(sql);
+ }
+
+ /**
+ * Code to be run within the DB driver after a SQL statement has been executed. This is where remote
+ * statement actions can be copied back to Cassandra/MUSIC.
+ *
+ * @param sql the SQL statement that was executed
+ */
+ public void postStatementHook(final String sql) {
+ dbi.postStatementHook(sql, transactionDigest);
+ }
+
+ /**
+ * Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the
+ * proxy first starts, and whenever there is the possibility that tables were created or dropped. It is synchronized
+ * in order to prevent multiple threads from running this code in parallel.
+ */
+ public synchronized void synchronizeTables() throws QueryException {
+ Set<String> set1 = dbi.getSQLTableSet(); // set of tables in the database
+ logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1);
+ for (String tableName : set1) {
+ // This map will be filled in if this table was previously discovered
+ if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) {
+ logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: " + tableName);
+ try {
+ TableInfo ti = dbi.getTableInfo(tableName);
+ mi.initializeMusicForTable(ti, tableName);
+ //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted
+ ti = dbi.getTableInfo(tableName);
+ mi.createDirtyRowTable(ti, tableName);
+ dbi.createSQLTriggers(tableName);
+ table_set.add(tableName);
+ synchronizeTableData(tableName);
+ logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" +
+ table_set.size() + "/" + set1.size() + "tables uploaded");
+ } catch (Exception e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
+ //logger.error(EELFLoggerDelegate.errorLogger, "Exception synchronizeTables: "+e);
+ throw new QueryException();
+ }
+ }
+ }
// Set<String> set2 = getMusicTableSet(music_ns);
- // not working - fix later
+ // not working - fix later
// for (String tbl : set2) {
// if (!set1.contains(tbl)) {
// logger.debug("Old table dropped: "+tbl);
@@ -159,101 +157,107 @@ public class MusicSqlManager {
// // ZZTODO drop camunda table ?
// }
// }
- }
+ }
+
+ /**
+ * On startup, copy dirty data from Cassandra to H2. May not be needed.
+ *
+ * @param tableName
+ */
+ public void synchronizeTableData(String tableName) {
+ // TODO - copy MUSIC -> H2
+ dbi.synchronizeData(tableName);
+ }
+
+ /**
+ * This method is called whenever there is a SELECT on a local SQL table, and should be called by the underlying databases
+ * triggering mechanism. It first checks the local dirty bits table to see if there are any keys in Cassandra whose value
+ * has not yet been sent to SQL. If there are, the appropriate values are copied from Cassandra to the local database.
+ * Under normal execution, this function behaves as a NOP operation.
+ *
+ * @param tableName This is the table on which the SELECT is being performed
+ */
+ public void readDirtyRowsAndUpdateDb(String tableName) {
+ mi.readDirtyRowsAndUpdateDb(dbi, tableName);
+ }
+
+
+ /**
+ * This method gets the primary key that the music interfaces uses by default.
+ * If the front end uses a primary key, this will not match what is used in the MUSIC interface
+ *
+ * @return
+ */
+ public String getMusicDefaultPrimaryKeyName() {
+ return mi.getMusicDefaultPrimaryKeyName();
+ }
+
+ /**
+ * Asks music interface to provide the function to create a primary key
+ * e.g. uuid(), 1, "unique_aksd419fjc"
+ *
+ * @return
+ */
+ public String generateUniqueKey() {
+ //
+ return mi.generateUniqueKey();
+ }
- /**
- * On startup, copy dirty data from Cassandra to H2. May not be needed.
- * @param tableName
- */
- public void synchronizeTableData(String tableName) {
- // TODO - copy MUSIC -> H2
- dbi.synchronizeData(tableName);
- }
- /**
- * This method is called whenever there is a SELECT on a local SQL table, and should be called by the underlying databases
- * triggering mechanism. It first checks the local dirty bits table to see if there are any keys in Cassandra whose value
- * has not yet been sent to SQL. If there are, the appropriate values are copied from Cassandra to the local database.
- * Under normal execution, this function behaves as a NOP operation.
- * @param tableName This is the table on which the SELECT is being performed
- */
- public void readDirtyRowsAndUpdateDb(String tableName) {
- mi.readDirtyRowsAndUpdateDb(dbi,tableName);
- }
-
-
-
-
- /**
- * This method gets the primary key that the music interfaces uses by default.
- * If the front end uses a primary key, this will not match what is used in the MUSIC interface
- * @return
- */
- public String getMusicDefaultPrimaryKeyName() {
- return mi.getMusicDefaultPrimaryKeyName();
- }
-
- /**
- * Asks music interface to provide the function to create a primary key
- * e.g. uuid(), 1, "unique_aksd419fjc"
- * @return
- */
- public String generateUniqueKey() {
- //
- return mi.generateUniqueKey();
- }
-
-
- /**
- * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed,
- * they are performed now and copied into MUSIC.
- * @throws MDBCServiceException
- */
- public synchronized void commit(String txId, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException {
- logger.debug(EELFLoggerDelegate.applicationLogger, " commit ");
- // transaction was committed -- add all the updates into the REDO-Log in MUSIC
- try {
- mi.commitLog(dbi, partition, transactionDigest, txId, progressKeeper);
- }catch(MDBCServiceException e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
- throw e;
- }
- }
- /**
- * Perform a rollback, as requested by the JDBC driver. If any row updates have been delayed,
- * they are discarded.
- */
- public synchronized void rollback() {
- // transaction was rolled back - discard the updates
- logger.debug(EELFLoggerDelegate.applicationLogger, "Rollback");;
- transactionDigest.clear();
- }
+ /**
+ * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed,
+ * they are performed now and copied into MUSIC.
+ *
+ * @throws MDBCServiceException
+ */
+ public synchronized void commit(String txId, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException {
+ logger.debug(EELFLoggerDelegate.applicationLogger, " commit ");
+ // transaction was committed -- add all the updates into the REDO-Log in MUSIC
+ try {
+ mi.commitLog(dbi, partition, transactionDigest, txId, progressKeeper);
+ } catch (MDBCServiceException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
+ throw e;
+ }
+ }
- /**
- * Get all
- * @param table
- * @param dbRow
- * @return
- */
- public String getMusicKeyFromRowWithoutPrimaryIndexes(String table, JSONObject dbRow) {
- TableInfo ti = dbi.getTableInfo(table);
- return mi.getMusicKeyFromRowWithoutPrimaryIndexes(ti,table, dbRow);
- }
-
- public String getMusicKeyFromRow(String table, JSONObject dbRow) {
- TableInfo ti = dbi.getTableInfo(table);
- return mi.getMusicKeyFromRow(ti,table, dbRow);
- }
-
- /**
- * Returns all keys that matches the current sql statement, and not in already updated keys.
- *
- * @param sql the query that we are getting keys for
- * @deprecated
- */
- public ArrayList<String> getMusicKeys(String sql) {
- ArrayList<String> musicKeys = new ArrayList<String>();
- //\TODO See if this is required
+ /**
+ * Perform a rollback, as requested by the JDBC driver. If any row updates have been delayed,
+ * they are discarded.
+ */
+ public synchronized void rollback() {
+ // transaction was rolled back - discard the updates
+ logger.debug(EELFLoggerDelegate.applicationLogger, "Rollback");
+ ;
+ transactionDigest.clear();
+ }
+
+ /**
+ * Get all
+ *
+ * @param table
+ * @param dbRow
+ * @return
+ */
+ public String getMusicKeyFromRowWithoutPrimaryIndexes(String table, JSONObject dbRow) {
+ TableInfo ti = dbi.getTableInfo(table);
+ return mi.getMusicKeyFromRowWithoutPrimaryIndexes(ti, table, dbRow);
+ }
+
+ public String getMusicKeyFromRow(String table, JSONObject dbRow) {
+ TableInfo ti = dbi.getTableInfo(table);
+ return mi.getMusicKeyFromRow(ti, table, dbRow);
+ }
+
+ /**
+ * Returns all keys that matches the current sql statement, and not in already updated keys.
+ *
+ * @param sql the query that we are getting keys for
+ * @deprecated
+ */
+ public ArrayList<String> getMusicKeys(String sql) {
+ ArrayList<String> musicKeys = new ArrayList<String>();
+ //\TODO See if this is required
/*
try {
net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql);
@@ -295,6 +299,18 @@ public class MusicSqlManager {
System.out.print(musicKey + ",");
}
*/
- return musicKeys;
- }
+ return musicKeys;
+ }
+
+ public void own(List<Range> ranges) {
+ throw new java.lang.UnsupportedOperationException("function not implemented yet");
+ }
+
+ public void appendRange(String rangeId, List<Range> ranges) {
+ throw new java.lang.UnsupportedOperationException("function not implemented yet");
+ }
+
+ public void relinquish(String ownerId, String rangeId) {
+ throw new java.lang.UnsupportedOperationException("function not implemented yet");
+ }
}
diff --git a/src/main/java/com/att/research/mdbc/StateManager.java b/src/main/java/com/att/research/mdbc/StateManager.java
index accd13a..dc243fb 100644
--- a/src/main/java/com/att/research/mdbc/StateManager.java
+++ b/src/main/java/com/att/research/mdbc/StateManager.java
@@ -8,7 +8,7 @@ import com.att.research.logging.format.ErrorTypes;
import com.att.research.mdbc.mixins.MixinFactory;
import com.att.research.mdbc.mixins.MusicInterface;
import com.att.research.mdbc.mixins.MusicMixin;
-import com.att.research.mdbc.mixins.TxCommitProgress;
+import com.att.research.mdbc.tables.TxCommitProgress;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -61,10 +61,14 @@ public class StateManager {
//\fixme this is not really used, delete!
String cassandraUrl = info.getProperty(Configuration.KEY_CASSANDRA_URL, Configuration.CASSANDRA_URL_DEFAULT);
String mixin = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT);
+ init(mixin,cassandraUrl);
+ }
+
+ protected void init(String mixin, String cassandraUrl) throws MDBCServiceException {
this.musicManager = MixinFactory.createMusicInterface(mixin, cassandraUrl, info,ranges);
this.musicManager.createKeyspace();
try {
- this.musicManager.initializeMdbcDataStructures();
+ this.musicManager.initializeMetricDataStructures();
} catch (MDBCServiceException e) {
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR);
throw(e);
diff --git a/src/main/java/com/att/research/mdbc/configurations/NodeConfiguration.java b/src/main/java/com/att/research/mdbc/configurations/NodeConfiguration.java
index 78850e3..d74dafb 100644
--- a/src/main/java/com/att/research/mdbc/configurations/NodeConfiguration.java
+++ b/src/main/java/com/att/research/mdbc/configurations/NodeConfiguration.java
@@ -22,8 +22,8 @@ public class NodeConfiguration {
public DatabasePartition partition;
public String nodeName;
- public NodeConfiguration(String tables, String titIndex, String titTableName, String partitionId, String sqlDatabaseName, String node, String redoRecordsTable){
- partition = new DatabasePartition(toRanges(tables), titIndex, titTableName, partitionId, null, redoRecordsTable) ;
+ public NodeConfiguration(String tables, String mriIndex, String mriTableName, String partitionId, String sqlDatabaseName, String node, String redoRecordsTable){
+ partition = new DatabasePartition(toRanges(tables), mriIndex, mriTableName, partitionId, null, redoRecordsTable) ;
this.sqlDatabaseName = sqlDatabaseName;
this.nodeName = node;
}
diff --git a/src/main/java/com/att/research/mdbc/configurations/TablesConfiguration.java b/src/main/java/com/att/research/mdbc/configurations/TablesConfiguration.java
index 0d28b51..eeb15a5 100644
--- a/src/main/java/com/att/research/mdbc/configurations/TablesConfiguration.java
+++ b/src/main/java/com/att/research/mdbc/configurations/TablesConfiguration.java
@@ -19,7 +19,7 @@ import java.util.List;
public class TablesConfiguration {
private final String TIT_TABLE_NAME = "transactioninformation";
- private final String REDO_RECORDS_NAME = "redorecords";
+ private final String MUSIC_TX_DIGEST_TABLE_NAME = "musictxdigest";
private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TablesConfiguration.class);
private List<PartitionInformation> partitions;
@@ -54,13 +54,13 @@ public class TablesConfiguration {
throw new MDBCServiceException("Partition was not correctly initialized");
}
for(PartitionInformation partitionInfo : partitions){
- String titTableName = partitionInfo.titTableName;
+ String titTableName = partitionInfo.mriTableName;
titTableName = (titTableName==null || titTableName.isEmpty())?TIT_TABLE_NAME:titTableName;
//0) Create the corresponding TIT table
- DatabaseOperations.CreateTransactionInformationTable(musicNamespace,titTableName);
- String redoRecordsName = partitionInfo.rrtTableName;
- redoRecordsName = (redoRecordsName==null || redoRecordsName.isEmpty())?REDO_RECORDS_NAME:redoRecordsName;
- DatabaseOperations.CreateRedoRecordsTable(-1,musicNamespace,redoRecordsName);
+ DatabaseOperations.CreateMusicRangeInformationTable(musicNamespace,titTableName);
+ String musicTxDigestTableName = partitionInfo.mtxdTableName;
+ musicTxDigestTableName = (musicTxDigestTableName==null || musicTxDigestTableName.isEmpty())? MUSIC_TX_DIGEST_TABLE_NAME :musicTxDigestTableName;
+ DatabaseOperations.CreateMusicTxDigest(-1,musicNamespace,musicTxDigestTableName);
//0) Create the corresponding TIT table
String partitionId;
if(partitionInfo.partitionId==null || partitionInfo.partitionId.isEmpty()){
@@ -87,7 +87,7 @@ public class TablesConfiguration {
//5) Add it to the redo history table
DatabaseOperations.createRedoHistoryBeginRow(musicNamespace,rhName,newRedoRow,partitionId,null);
//6) Create config for this node
- nodeConfigs.add(new NodeConfiguration(String.join(",",partitionInfo.tables),titIndex,titTableName,partitionId,sqlDatabaseName,partitionInfo.owner,redoRecordsName));
+ nodeConfigs.add(new NodeConfiguration(String.join(",",partitionInfo.tables),titIndex,titTableName,partitionId,sqlDatabaseName,partitionInfo.owner,musicTxDigestTableName));
}
return nodeConfigs;
}
@@ -124,8 +124,8 @@ public class TablesConfiguration {
public class PartitionInformation{
private List<String> tables;
private String owner;
- private String titTableName;
- private String rrtTableName;
+ private String mriTableName;
+ private String mtxdTableName;
private String partitionId;
private int replicationFactor;
@@ -145,12 +145,12 @@ public class TablesConfiguration {
this.owner = owner;
}
- public String getTitTableName() {
- return titTableName;
+ public String getMriTableName() {
+ return mriTableName;
}
- public void setTitTableName(String titTableName) {
- this.titTableName = titTableName;
+ public void setMriTableName(String mriTableName) {
+ this.mriTableName = mriTableName;
}
public String getPartitionId() {
@@ -169,12 +169,12 @@ public class TablesConfiguration {
this.replicationFactor = replicationFactor;
}
- public String getRrtTableName(){
- return rrtTableName;
+ public String getMtxdTableName(){
+ return mtxdTableName;
}
- public void setRrtTableName(String rrtTableName) {
- this.rrtTableName = rrtTableName;
+ public void setMtxdTableName(String mtxdTableName) {
+ this.mtxdTableName = mtxdTableName;
}
}
}
diff --git a/src/main/java/com/att/research/mdbc/configurations/config-0.json b/src/main/java/com/att/research/mdbc/configurations/config-0.json
index 96d947c..2207a52 100644
--- a/src/main/java/com/att/research/mdbc/configurations/config-0.json
+++ b/src/main/java/com/att/research/mdbc/configurations/config-0.json
@@ -1,9 +1,9 @@
{
"sqlDatabaseName": "test",
"partition": {
- "transactionInformationTable": "transactioninformation",
- "transactionInformationIndex": "259a7a7c-f741-44ae-8d6e-227a02ddc96e",
- "redoRecordsTable": "redorecords",
+ "musicRangeInformationTable": "transactioninformation",
+ "musicRangeInformationIndex": "259a7a7c-f741-44ae-8d6e-227a02ddc96e",
+ "musicTxDigestTable": "musictxdigest",
"partitionId": "ad766447-1adf-4800-aade-9f31a356ab4b",
"lockId": "",
"ranges": [
diff --git a/src/main/java/com/att/research/mdbc/configurations/ranges.json b/src/main/java/com/att/research/mdbc/configurations/ranges.json
index afa343b..2a792e8 100644
--- a/src/main/java/com/att/research/mdbc/configurations/ranges.json
+++ b/src/main/java/com/att/research/mdbc/configurations/ranges.json
@@ -1,6 +1,6 @@
{
- "transactionInformationTable": "transactioninformation",
- "transactionInformationIndex": "d0e8ef2e-aeca-4261-8d9d-1679f560b85b",
+ "musicRangeInformationTable": "transactioninformation",
+ "musicRangeInformationIndex": "d0e8ef2e-aeca-4261-8d9d-1679f560b85b",
"partitionId": "798110cf-9c61-4db2-9446-cb2dbab5a143",
"lockId": "",
"ranges": [
diff --git a/src/main/java/com/att/research/mdbc/configurations/tableConfiguration.json b/src/main/java/com/att/research/mdbc/configurations/tableConfiguration.json
index b3c6224..e67dd0b 100644
--- a/src/main/java/com/att/research/mdbc/configurations/tableConfiguration.json
+++ b/src/main/java/com/att/research/mdbc/configurations/tableConfiguration.json
@@ -3,8 +3,8 @@
{
"tables":["table11"],
"owner":"",
- "titTableName":"transactioninformation",
- "rrtTableName":"redorecords",
+ "mriTableName":"musicrangeinformation",
+ "mtxdTableName":"musictxdigest",
"partitionId":"",
"replicationFactor":1
}
diff --git a/src/main/java/com/att/research/mdbc/mixins/CassandraMixin.java b/src/main/java/com/att/research/mdbc/mixins/CassandraMixin.java
index 6684fe6..28090b7 100755
--- a/src/main/java/com/att/research/mdbc/mixins/CassandraMixin.java
+++ b/src/main/java/com/att/research/mdbc/mixins/CassandraMixin.java
@@ -16,6 +16,15 @@ import java.util.TreeSet;
import java.util.UUID;
import com.att.research.mdbc.*;
+import com.att.research.mdbc.tables.PartitionInformation;
+import com.att.research.mdbc.tables.RedoHistoryElement;
+import com.att.research.mdbc.tables.RedoRecordId;
+import com.att.research.mdbc.tables.StagingTable;
+import com.att.research.mdbc.tables.TablePartitionInformation;
+import com.att.research.mdbc.tables.TitReference;
+import com.att.research.mdbc.tables.TransactionInformationElement;
+import com.att.research.mdbc.tables.TxCommitProgress;
+
import org.json.JSONObject;
import org.onap.music.datastore.CassaLockStore;
import org.onap.music.datastore.PreparedQueryObject;
@@ -84,8 +93,8 @@ public class CassandraMixin implements MusicInterface {
public static final String PARTITION_INFORMATION_TABLE_NAME = "partitioninfo";
public static final String REDO_HISTORY_TABLE_NAME= "redohistory";
//\TODO Add logic to change the names when required and create the tables when necessary
- private String redoRecordTableName = "redorecords";
- private String transactionInformationTableName = "transactioninformation";
+ private String musicTxDigestTableName = "musictxdigest";
+ private String musicRangeInformationTableName = "musicrangeinformation";
private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassandraMixin.class);
@@ -152,7 +161,7 @@ public class CassandraMixin implements MusicInterface {
this.music_ns = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE);
logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns);
- transactionInformationTableName = "transactioninformation";
+ musicRangeInformationTableName = "transactioninformation";
createMusicKeyspace();
}
@@ -211,10 +220,10 @@ public class CassandraMixin implements MusicInterface {
}
}
@Override
- public void initializeMdbcDataStructures() throws MDBCServiceException {
+ public void initializeMetricDataStructures() throws MDBCServiceException {
try {
- DatabaseOperations.CreateRedoRecordsTable(-1, music_ns, redoRecordTableName);//\TODO If we start partitioning the data base, we would need to use the redotable number
- DatabaseOperations.CreateTransactionInformationTable(music_ns, transactionInformationTableName);
+ DatabaseOperations.CreateMusicTxDigest(-1, music_ns, musicTxDigestTableName);//\TODO If we start partitioning the data base, we would need to use the redotable number
+ DatabaseOperations.CreateMusicRangeInformationTable(music_ns, musicRangeInformationTableName);
DatabaseOperations.CreateTableToPartitionTable(music_ns, TABLE_TO_PARTITION_TABLE_NAME);
DatabaseOperations.CreatePartitionInfoTable(music_ns, PARTITION_INFORMATION_TABLE_NAME);
DatabaseOperations.CreateRedoHistoryTable(music_ns, REDO_HISTORY_TABLE_NAME);
@@ -1016,7 +1025,7 @@ public class CassandraMixin implements MusicInterface {
}
- private PreparedQueryObject createAppendRRTIndexToTitQuery(String titTable, String uuid, String table, String redoUuid){
+ private PreparedQueryObject createAppendMtxdIndexToMriQuery(String titTable, String uuid, String table, String redoUuid){
PreparedQueryObject query = new PreparedQueryObject();
StringBuilder appendBuilder = new StringBuilder();
appendBuilder.append("UPDATE ")
@@ -1088,12 +1097,12 @@ public class CassandraMixin implements MusicInterface {
return lockId;
}
- protected void pushRowToRRT(String lockId, String commitId, HashMap<Range,StagingTable> transactionDigest) throws MDBCServiceException{
+ protected void pushRowToMtxd(String lockId, String commitId, HashMap<Range,StagingTable> transactionDigest) throws MDBCServiceException{
PreparedQueryObject query = new PreparedQueryObject();
StringBuilder cqlQuery = new StringBuilder("INSERT INTO ")
.append(music_ns)
.append('.')
- .append(redoRecordTableName)
+ .append(musicTxDigestTableName)
.append(" (leaseid,leasecounter,transactiondigest) ")
.append("VALUES ('")
.append( lockId ).append("',")
@@ -1115,15 +1124,15 @@ public class CassandraMixin implements MusicInterface {
}
}
- protected void appendIndexToTit(String lockId, String commitId, String TITIndex) throws MDBCServiceException{
+ protected void appendIndexToMri(String lockId, String commitId, String TITIndex) throws MDBCServiceException{
StringBuilder redoUuidBuilder = new StringBuilder();
redoUuidBuilder.append("('")
.append(lockId)
.append("',")
.append(commitId)
.append(")");
- PreparedQueryObject appendQuery = createAppendRRTIndexToTitQuery(transactionInformationTableName, TITIndex, redoRecordTableName, redoUuidBuilder.toString());
- ReturnType returnType = MusicPureCassaCore.criticalPut(music_ns, transactionInformationTableName, TITIndex, appendQuery, lockId, null);
+ PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, TITIndex, musicTxDigestTableName, redoUuidBuilder.toString());
+ ReturnType returnType = MusicPureCassaCore.criticalPut(music_ns, musicRangeInformationTableName, TITIndex, appendQuery, lockId, null);
if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage());
throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage());
@@ -1132,16 +1141,16 @@ public class CassandraMixin implements MusicInterface {
@Override
public void commitLog(DBInterface dbi, DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{
- String TITIndex = partition.getTransactionInformationIndex();
+ String TITIndex = partition.getMusicRangeInformationIndex();
if(TITIndex.isEmpty()) {
//\TODO Fetch TITIndex from the Range Information Table
throw new MDBCServiceException("TIT Index retrieval not yet implemented");
}
- String fullyQualifiedTitKey = music_ns+"."+ transactionInformationTableName +"."+TITIndex;
+ String fullyQualifiedTitKey = music_ns+"."+ musicRangeInformationTableName +"."+TITIndex;
//0. See if reference to lock was already created
String lockId = partition.getLockId();
if(lockId == null || lockId.isEmpty()) {
- lockId = createAndAssignLock(fullyQualifiedTitKey,partition,music_ns,transactionInformationTableName,TITIndex);
+ lockId = createAndAssignLock(fullyQualifiedTitKey,partition,music_ns, musicRangeInformationTableName,TITIndex);
}
String commitId;
@@ -1156,14 +1165,14 @@ public class CassandraMixin implements MusicInterface {
//Add creation type of transaction digest
//1. Push new row to RRT and obtain its index
- pushRowToRRT(lockId, commitId, transactionDigest);
+ pushRowToMtxd(lockId, commitId, transactionDigest);
//2. Save RRT index to RQ
if(progressKeeper!= null) {
progressKeeper.setRecordId(txId,new RedoRecordId(lockId, commitId));
}
//3. Append RRT index into the corresponding TIT row array
- appendIndexToTit(lockId,commitId,TITIndex);
+ appendIndexToMri(lockId,commitId,TITIndex);
}
/**
diff --git a/src/main/java/com/att/research/mdbc/mixins/DBInterface.java b/src/main/java/com/att/research/mdbc/mixins/DBInterface.java
index 9aa94f9..e2b2ad7 100755
--- a/src/main/java/com/att/research/mdbc/mixins/DBInterface.java
+++ b/src/main/java/com/att/research/mdbc/mixins/DBInterface.java
@@ -7,6 +7,7 @@ import java.util.Set;
import com.att.research.mdbc.Range;
import com.att.research.mdbc.TableInfo;
+import com.att.research.mdbc.tables.StagingTable;
/**
* This Interface defines the methods that MDBC needs in order to mirror data to/from a Database instance.
diff --git a/src/main/java/com/att/research/mdbc/mixins/MusicInterface.java b/src/main/java/com/att/research/mdbc/mixins/MusicInterface.java
index 94b3ac6..6e2e0ca 100755
--- a/src/main/java/com/att/research/mdbc/mixins/MusicInterface.java
+++ b/src/main/java/com/att/research/mdbc/mixins/MusicInterface.java
@@ -10,6 +10,15 @@ import com.att.research.exceptions.MDBCServiceException;
import com.att.research.mdbc.DatabasePartition;
import com.att.research.mdbc.Range;
import com.att.research.mdbc.TableInfo;
+import com.att.research.mdbc.tables.PartitionInformation;
+import com.att.research.mdbc.tables.RedoHistoryElement;
+import com.att.research.mdbc.tables.RedoRecordId;
+import com.att.research.mdbc.tables.StagingTable;
+import com.att.research.mdbc.tables.TablePartitionInformation;
+import com.att.research.mdbc.tables.TitReference;
+import com.att.research.mdbc.tables.TransactionInformationElement;
+import com.att.research.mdbc.tables.TxCommitProgress;
+
import org.onap.music.exceptions.MusicLockingException;
/**
@@ -22,7 +31,7 @@ public interface MusicInterface {
* This function is used to created all the required data structures, both local
* \TODO Check if this function is required in the MUSIC interface or could be just created on the constructor
*/
- void initializeMdbcDataStructures() throws MDBCServiceException;
+ void initializeMetricDataStructures() throws MDBCServiceException;
/**
* Get the name of this MusicInterface mixin object.
* @return the name
diff --git a/src/main/java/com/att/research/mdbc/mixins/MusicMixin.java b/src/main/java/com/att/research/mdbc/mixins/MusicMixin.java
index 1fee59c..60adaf1 100644
--- a/src/main/java/com/att/research/mdbc/mixins/MusicMixin.java
+++ b/src/main/java/com/att/research/mdbc/mixins/MusicMixin.java
@@ -18,6 +18,15 @@ import com.att.research.exceptions.MDBCServiceException;
import com.att.research.mdbc.DatabasePartition;
import com.att.research.mdbc.Range;
import com.att.research.mdbc.TableInfo;
+import com.att.research.mdbc.tables.PartitionInformation;
+import com.att.research.mdbc.tables.RedoHistoryElement;
+import com.att.research.mdbc.tables.RedoRecordId;
+import com.att.research.mdbc.tables.StagingTable;
+import com.att.research.mdbc.tables.TablePartitionInformation;
+import com.att.research.mdbc.tables.TitReference;
+import com.att.research.mdbc.tables.TransactionInformationElement;
+import com.att.research.mdbc.tables.TxCommitProgress;
+
import org.onap.music.main.MusicPureCassaCore;
/**
@@ -172,7 +181,7 @@ public class MusicMixin implements MusicInterface {
}
@Override
- public void initializeMdbcDataStructures() {
+ public void initializeMetricDataStructures() {
//
}
diff --git a/src/main/java/com/att/research/mdbc/mixins/MySQLMixin.java b/src/main/java/com/att/research/mdbc/mixins/MySQLMixin.java
index a836a39..4f70147 100755
--- a/src/main/java/com/att/research/mdbc/mixins/MySQLMixin.java
+++ b/src/main/java/com/att/research/mdbc/mixins/MySQLMixin.java
@@ -21,6 +21,8 @@ import com.att.research.logging.EELFLoggerDelegate;
import com.att.research.mdbc.MusicSqlManager;
import com.att.research.mdbc.Range;
import com.att.research.mdbc.TableInfo;
+import com.att.research.mdbc.tables.OperationType;
+import com.att.research.mdbc.tables.StagingTable;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
diff --git a/src/main/java/com/att/research/mdbc/tables/Operation.java b/src/main/java/com/att/research/mdbc/tables/Operation.java
new file mode 100644
index 0000000..026fa40
--- /dev/null
+++ b/src/main/java/com/att/research/mdbc/tables/Operation.java
@@ -0,0 +1,35 @@
+package com.att.research.mdbc.tables;
+
+import java.io.Serializable;
+
+import org.json.JSONObject;
+import org.json.JSONTokener;
+
+public final class Operation implements Serializable{
+
+ private static final long serialVersionUID = -1215301985078183104L;
+
+ final OperationType TYPE;
+ final String OLD_VAL;
+ final String NEW_VAL;
+
+ public Operation(OperationType type, String newVal, String oldVal) {
+ TYPE = type;
+ NEW_VAL = newVal;
+ OLD_VAL = oldVal;
+ }
+
+ public JSONObject getNewVal(){
+ JSONObject newRow = new JSONObject(new JSONTokener(NEW_VAL));
+ return newRow;
+ }
+
+ public JSONObject getOldVal(){
+ JSONObject keydata = new JSONObject(new JSONTokener(OLD_VAL));
+ return keydata;
+ }
+
+ public OperationType getOperationType() {
+ return this.TYPE;
+ }
+}
diff --git a/src/main/java/com/att/research/mdbc/tables/OperationType.java b/src/main/java/com/att/research/mdbc/tables/OperationType.java
new file mode 100644
index 0000000..ae83485
--- /dev/null
+++ b/src/main/java/com/att/research/mdbc/tables/OperationType.java
@@ -0,0 +1,5 @@
+package com.att.research.mdbc.tables;
+
+public enum OperationType{
+ DELETE, UPDATE, INSERT, SELECT
+}
diff --git a/src/main/java/com/att/research/mdbc/tables/PartitionInformation.java b/src/main/java/com/att/research/mdbc/tables/PartitionInformation.java
new file mode 100644
index 0000000..9249844
--- /dev/null
+++ b/src/main/java/com/att/research/mdbc/tables/PartitionInformation.java
@@ -0,0 +1,19 @@
+package com.att.research.mdbc.tables;
+
+import java.util.List;
+
+public class PartitionInformation {
+ public final String partition;
+ public final TitReference tit;
+ public final List<String> tables;
+ public final int replicationFactor;
+ public final String currentOwner;
+
+ public PartitionInformation(String partition, TitReference tit, List<String> tables, int replicationFactor, String currentOwner) {
+ this.partition=partition;
+ this.tit=tit;
+ this.tables=tables;
+ this.replicationFactor=replicationFactor;
+ this.currentOwner=currentOwner;
+ }
+}
diff --git a/src/main/java/com/att/research/mdbc/tables/RedoHistoryElement.java b/src/main/java/com/att/research/mdbc/tables/RedoHistoryElement.java
new file mode 100644
index 0000000..8d92216
--- /dev/null
+++ b/src/main/java/com/att/research/mdbc/tables/RedoHistoryElement.java
@@ -0,0 +1,15 @@
+package com.att.research.mdbc.tables;
+
+import java.util.List;
+
+public final class RedoHistoryElement {
+ public final String partition;
+ public final TitReference current;
+ public final List<TitReference> previous;
+
+ public RedoHistoryElement(String partition, TitReference current, List<TitReference> previous) {
+ this.partition = partition;
+ this.current = current;
+ this.previous = previous;
+ }
+}
diff --git a/src/main/java/com/att/research/mdbc/tables/RedoRecordId.java b/src/main/java/com/att/research/mdbc/tables/RedoRecordId.java
new file mode 100644
index 0000000..225c89e
--- /dev/null
+++ b/src/main/java/com/att/research/mdbc/tables/RedoRecordId.java
@@ -0,0 +1,15 @@
+package com.att.research.mdbc.tables;
+
+public final class RedoRecordId {
+ public final String leaseId;
+ public final String commitId;
+
+ public RedoRecordId(String leaseId, String commitId) {
+ this.leaseId = leaseId;
+ this.commitId = commitId;
+ }
+
+ public boolean isEmpty() {
+ return (this.leaseId==null || this.leaseId.isEmpty())&&(this.commitId==null||this.commitId.isEmpty());
+ }
+}
diff --git a/src/main/java/com/att/research/mdbc/tables/StagingTable.java b/src/main/java/com/att/research/mdbc/tables/StagingTable.java
new file mode 100644
index 0000000..c16f11c
--- /dev/null
+++ b/src/main/java/com/att/research/mdbc/tables/StagingTable.java
@@ -0,0 +1,51 @@
+package com.att.research.mdbc.tables;
+
+import java.io.Serializable;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
+import org.json.JSONObject;
+
+import com.att.research.logging.EELFLoggerDelegate;
+
+public class StagingTable implements Serializable{
+ /**
+ *
+ */
+ private static final long serialVersionUID = 7583182634761771943L;
+ private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(StagingTable.class);
+ //primary key -> Operation
+ private HashMap<String,Deque<Operation>> operations;
+
+ public StagingTable() {
+ operations = new HashMap<>();
+ }
+
+ synchronized public void addOperation(String key, OperationType type, String oldVal, String newVal) {
+ if(!operations.containsKey(key)) {
+ operations.put(key, new LinkedList<>());
+ }
+ operations.get(key).add(new Operation(type,newVal,oldVal));
+ }
+
+ synchronized public Deque<Pair<String,Operation>> getIterableSnapshot() throws NoSuchFieldException{
+ Deque<Pair<String,Operation>> response=new LinkedList<Pair<String,Operation>>();
+ //\TODO: check if we can just return the last change to a given key
+ Set<String> keys = operations.keySet();
+ for(String key : keys) {
+ Deque<Operation> ops = operations.get(key);
+ if(ops.isEmpty()) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Invalid state of the Operation data structure when creating snapshot");
+ throw new NoSuchFieldException("Invalid state of the operation data structure");
+ }
+ response.add(Pair.of(key,ops.getLast()));
+ }
+ return response;
+ }
+
+ synchronized public void clean() {
+ operations.clear();
+ }
+}
diff --git a/src/main/java/com/att/research/mdbc/tables/TablePartitionInformation.java b/src/main/java/com/att/research/mdbc/tables/TablePartitionInformation.java
new file mode 100644
index 0000000..9201de5
--- /dev/null
+++ b/src/main/java/com/att/research/mdbc/tables/TablePartitionInformation.java
@@ -0,0 +1,15 @@
+package com.att.research.mdbc.tables;
+
+import java.util.List;
+
+public final class TablePartitionInformation {
+ public final String table;
+ public final String partition;
+ public final List<String> oldPartitions;
+
+ public TablePartitionInformation(String table, String partition, List<String> oldPartitions) {
+ this.table = table;
+ this.partition = partition;
+ this.oldPartitions = oldPartitions;
+ }
+}
diff --git a/src/main/java/com/att/research/mdbc/tables/TitReference.java b/src/main/java/com/att/research/mdbc/tables/TitReference.java
new file mode 100644
index 0000000..2abb989
--- /dev/null
+++ b/src/main/java/com/att/research/mdbc/tables/TitReference.java
@@ -0,0 +1,12 @@
+package com.att.research.mdbc.tables;
+
+public final class TitReference {
+ public final String table;
+ public final String index;
+
+ public TitReference(String table, String index) {
+ this.table = table;
+ this.index= index;
+ }
+
+}
diff --git a/src/main/java/com/att/research/mdbc/tables/TransactionInformationElement.java b/src/main/java/com/att/research/mdbc/tables/TransactionInformationElement.java
new file mode 100644
index 0000000..a80cab1
--- /dev/null
+++ b/src/main/java/com/att/research/mdbc/tables/TransactionInformationElement.java
@@ -0,0 +1,19 @@
+package com.att.research.mdbc.tables;
+
+import java.util.List;
+
+public final class TransactionInformationElement {
+ public final String index;
+ public final List<RedoRecordId> redoLog;
+ public final String partition;
+ public final int latestApplied;
+ public final boolean applied;
+
+ public TransactionInformationElement(String index, List<RedoRecordId> redoLog, String partition, int latestApplied, boolean applied) {
+ this.index = index;
+ this.redoLog = redoLog;
+ this.partition = partition;
+ this.latestApplied = latestApplied;
+ this.applied = applied;
+ }
+}
diff --git a/src/main/java/com/att/research/mdbc/tables/TxCommitProgress.java b/src/main/java/com/att/research/mdbc/tables/TxCommitProgress.java
new file mode 100644
index 0000000..cecdb08
--- /dev/null
+++ b/src/main/java/com/att/research/mdbc/tables/TxCommitProgress.java
@@ -0,0 +1,206 @@
+package com.att.research.mdbc.tables;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import com.att.research.logging.EELFLoggerDelegate;
+
+import java.sql.Connection;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+public class TxCommitProgress{
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TxCommitProgress.class);
+
+ private AtomicReference<BigInteger> nextCommitId;
+ private Map<String, CommitProgress> transactionInfo;
+
+ public TxCommitProgress(){
+ nextCommitId=new AtomicReference<>(BigInteger.ZERO);
+ transactionInfo = new ConcurrentHashMap<>();
+ }
+
+ public boolean containsTx(String txId) {
+ return transactionInfo.containsKey(txId);
+ }
+
+ public BigInteger getCommitId(String txId) {
+ CommitProgress prog = transactionInfo.get(txId);
+ if(prog.isCommitIdAssigned()) {
+ return prog.getCommitId();
+ }
+ BigInteger commitId = nextCommitId.getAndUpdate((a)-> a.add(BigInteger.ONE));
+ prog.setCommitId(commitId);
+ return commitId;
+ }
+
+ public void createNewTransactionTracker(String id, Connection conn) {
+ transactionInfo.put(id, new CommitProgress(id,conn));
+ }
+
+ public void commitRequested(String txId) {
+ CommitProgress prog = transactionInfo.get(txId);
+ if(prog == null){
+ logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when storing commit request",txId);
+ }
+ prog.setCommitRequested();
+ }
+
+ public void setSQLDone(String txId) {
+ CommitProgress prog = transactionInfo.get(txId);
+ if(prog == null){
+ logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when storing saving completion of SQL",txId);
+ }
+ prog.setSQLCompleted();
+ }
+
+ public void setMusicDone(String txId) {
+ CommitProgress prog = transactionInfo.get(txId);
+ if(prog == null){
+ logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when storing saving completion of Music",txId);
+ }
+ prog.setMusicCompleted();
+ }
+
+ public Connection getConnection(String txId){
+ CommitProgress prog = transactionInfo.get(txId);
+ if(prog == null){
+ logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when retrieving statement",txId);
+ }
+ return prog.getConnection();
+ }
+
+ public void setRecordId(String txId, RedoRecordId recordId){
+ CommitProgress prog = transactionInfo.get(txId);
+ if(prog == null){
+ logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when setting record Id",txId);
+ }
+ prog.setRecordId(recordId);
+ }
+
+ public RedoRecordId getRecordId(String txId) {
+ CommitProgress prog = transactionInfo.get(txId);
+ if(prog == null){
+ logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when getting record Id",txId);
+ }
+ return prog.getRecordId();
+ }
+
+ public boolean isRecordIdAssigned(String txId) {
+ CommitProgress prog = transactionInfo.get(txId);
+ if(prog == null){
+ logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when checking record",txId);
+ }
+ return prog.isRedoRecordAssigned();
+ }
+
+ public boolean isComplete(String txId) {
+ CommitProgress prog = transactionInfo.get(txId);
+ if(prog == null){
+ logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when checking completion",txId);
+ }
+ return prog.isComplete();
+ }
+
+ public void reinitializeTxProgress(String txId) {
+ CommitProgress prog = transactionInfo.get(txId);
+ if(prog == null){
+ logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when reinitializing tx progress",txId);
+ }
+ prog.reinitialize();
+ }
+
+ public void deleteTxProgress(String txId){
+ transactionInfo.remove(txId);
+ }
+}
+
+final class CommitProgress{
+ private String lTxId; // local transaction id
+ private BigInteger commitId; // commit id
+ private boolean commitRequested; //indicates if the user tried to commit the request already.
+ private boolean SQLDone; // indicates if SQL was already committed
+ private boolean MusicDone; // indicates if music commit was already performed, atomic bool
+ private Connection connection;// reference to a connection object. This is used to complete a commit if it failed in the original thread.
+ private Long timestamp; // last time this data structure was updated
+ private RedoRecordId redoRecordId;// record id for each partition
+
+ public CommitProgress(String id,Connection conn){
+ redoRecordId=null;
+ lTxId = id;
+ commitRequested = false;
+ SQLDone = false;
+ MusicDone = false;
+ connection = conn;
+ commitId = null;
+ timestamp = System.currentTimeMillis();
+ }
+
+ public synchronized boolean isComplete() {
+ return commitRequested && SQLDone && MusicDone;
+ }
+
+ public synchronized void setCommitId(BigInteger commitId) {
+ this.commitId = commitId;
+ timestamp = System.currentTimeMillis();
+ }
+
+ public synchronized void reinitialize() {
+ commitId = null;
+ redoRecordId=null;
+ commitRequested = false;
+ SQLDone = false;
+ MusicDone = false;
+ timestamp = System.currentTimeMillis();
+ }
+
+ public synchronized void setCommitRequested() {
+ commitRequested = true;
+ timestamp = System.currentTimeMillis();
+ }
+
+ public synchronized void setSQLCompleted() {
+ SQLDone = true;
+ timestamp = System.currentTimeMillis();
+ }
+
+ public synchronized void setMusicCompleted() {
+ MusicDone = true;
+ timestamp = System.currentTimeMillis();
+ }
+
+ public Connection getConnection() {
+ timestamp = System.currentTimeMillis();
+ return connection;
+ }
+
+ public long getTimestamInMillis() {
+ return timestamp;
+ }
+
+ public synchronized void setRecordId(RedoRecordId id) {
+ redoRecordId = id;
+ timestamp = System.currentTimeMillis();
+ }
+
+ public synchronized boolean isRedoRecordAssigned() {
+ return this.redoRecordId!=null;
+ }
+
+ public synchronized RedoRecordId getRecordId() {
+ return redoRecordId;
+ }
+
+ public synchronized BigInteger getCommitId() {
+ return commitId;
+ }
+
+ public synchronized String getId() {
+ return this.lTxId;
+ }
+
+ public synchronized boolean isCommitIdAssigned() {
+ return this.commitId!= null;
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/att/research/mdbc/tools/CreatePartition.java b/src/main/java/com/att/research/mdbc/tools/CreatePartition.java
index 09524cb..83f210d 100644
--- a/src/main/java/com/att/research/mdbc/tools/CreatePartition.java
+++ b/src/main/java/com/att/research/mdbc/tools/CreatePartition.java
@@ -1,17 +1,10 @@
package com.att.research.mdbc.tools;
import com.att.research.logging.EELFLoggerDelegate;
-import com.att.research.mdbc.DatabasePartition;
-import com.att.research.mdbc.MDBCUtils;
-import com.att.research.mdbc.Range;
import com.att.research.mdbc.configurations.NodeConfiguration;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
public class CreatePartition {
public static final EELFLoggerDelegate LOG = EELFLoggerDelegate.getLogger(CreatePartition.class);
@@ -27,9 +20,9 @@ public class CreatePartition {
@Parameter(names = { "-n", "--tit-table-name" }, required = true,
description = "Tit Table name")
private String titTable;
- @Parameter(names = { "-r", "--redorecords-table-name" }, required = true,
- description = "Redo Records Table name")
- private String rrTable;
+ @Parameter(names = { "-r", "--music-tx-digest-table-name" }, required = true,
+ description = "Music Transaction Digest Table name")
+ private String mTxDTable;
@Parameter(names = { "-p", "--partition-id" }, required = true,
description = "Partition Id")
private String partitionId;
@@ -43,7 +36,7 @@ public class CreatePartition {
}
public void convert(){
- config = new NodeConfiguration(tables,titIndex,titTable,partitionId,"test","",rrTable);
+ config = new NodeConfiguration(tables,titIndex,titTable,partitionId,"test","", mTxDTable);
}
public void saveToFile(){
diff --git a/src/test/java/com/att/research/mdbc/MDBCUtilsTest.java b/src/test/java/com/att/research/mdbc/MDBCUtilsTest.java
index cdee078..28af754 100644
--- a/src/test/java/com/att/research/mdbc/MDBCUtilsTest.java
+++ b/src/test/java/com/att/research/mdbc/MDBCUtilsTest.java
@@ -1,7 +1,8 @@
package com.att.research.mdbc;
-import com.att.research.mdbc.mixins.OperationType;
-import com.att.research.mdbc.mixins.StagingTable;
+import com.att.research.mdbc.tables.OperationType;
+import com.att.research.mdbc.tables.StagingTable;
+
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.Test;