aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTschaen, Brendan <ctschaen@att.com>2018-10-25 13:50:35 -0400
committerTschaen, Brendan <ctschaen@att.com>2018-10-25 13:57:46 -0400
commita6768820181ce55a76f5e007c376489a0057445f (patch)
treefe61025754ea8158e5aeec26cd5309080d779cef
parent8165fa0afdcb16e01495421b3d500389438bf8a8 (diff)
Update Music Interface and readme instructions
Change-Id: I998845eba64d712dd489a5af8b63c4c6b5a066d8 Issue-ID: MUSIC-149 Signed-off-by: Tschaen, Brendan <ctschaen@att.com>
-rwxr-xr-xREADME.md64
-rwxr-xr-xpom.xml2
-rw-r--r--src/main/java/com/att/research/mdbc/DatabaseOperations.java74
-rw-r--r--src/main/java/com/att/research/mdbc/configurations/TablesConfiguration.java4
-rwxr-xr-xsrc/main/java/com/att/research/mdbc/mixins/Cassandra2Mixin.java8
-rwxr-xr-xsrc/main/java/com/att/research/mdbc/mixins/CassandraMixin.java53
-rwxr-xr-xsrc/main/java/com/att/research/mdbc/mixins/MusicConnector.java4
-rw-r--r--src/main/java/com/att/research/mdbc/mixins/MusicMixin.java6
-rw-r--r--src/main/java/com/att/research/mdbc/tools/CreateNodeConfigurations.java1
9 files changed, 150 insertions, 66 deletions
diff --git a/README.md b/README.md
index 49f750d..7e4580a 100755
--- a/README.md
+++ b/README.md
@@ -3,6 +3,70 @@ ETDB
To enable edge computing in its full capacity, a crucial requirement is to manage the state of edge applications, preferably in database that provides the full features of SQL including joins and transactions. The key challenge here is to provide a replicated database for the edge that can scale to thousands of geo-distributed nodes. Existing solutions either provide semantics that are too weak (PostgreSQL replicates asynchronously) or too strong and hence expensive to realize in a geo-distributed network with its WAN latencies and complex failure modes (MariaDb, Spanner, provide full transactionality). Inspired by entry consistency in shared memory systems, wherein only the lock holder for an object obtains sequential consistency for the object, we define the notion of an entry transactional database, which is a novel partitioned database in which only the “owner” of a partition obtains full ACID transactionality. In this work, we define the semantics of an entry transactional database, describe the hard challenges faced in building it and present a novel middleware called mdbc that combines existing SQL databases with an underlying geo-distributed entry consistent store to provide entry transactional semantics. Further, we present crucial use cases such as a federated regional controller for the network control plane and a state management service for edge mobility enabled by entry transactionality.
+## Running METRIC
+
+run cassandra (tested with v3.11)
+run mysql/mariadb (5.7+, 10.2.3+)
+
+Download and install MUSIC (branch dev-cassandra-only). Install the jar into your local maven repository (from MUSIC home run)
+mvn install:install-file -Dfile=target/MUSIC.jar -DpomFile=./pom.xml
+
+
+1) Create a configuration file using as a template:
+src/main/java/com/att/research/mdbc/configurations/tableConfiguration.json
+
+The meaning of the fields is as follows:
+
+• partitions: is an array of each partition in the system. There is a partition for each ETDB (EDM). Each partition is composed of the following fields:
+o tables: all the tables that are going to be within this table. Should at least have one element
+o owner: is the url of the ETDB (EDM) node. It can be an empty string
+o titTableName: it is the name of the transaction information table that the owner of this partition is going to be using
+o rrtTableName: it is the name of the redo records table that the owner of this partition is going to be using
+o partitionId: if this partition was previously createad, this is the uuid associated, if new just leave it empty
+o replicationFactor: indicates the needs of replication for this partition (the max of all the tables involved). Note: this is not used yet in the code.
+• musicNamespace: is the music (cassandra) namespace that is going to be used by all the tables
+• tableToPartitionName: it is the name of the table to partition table that the all nodes in the system are going to use
+• partitionInformationTableName: it is the name of the partition information table that the all nodes in the system are going to use
+• redoHistoryTableName: it is the name of the redo history able that the all nodes in the system are going to use
+• sqlDatabaseName: is the name of the local SQL database that is going to be used on each ETDB node.
+
+2) Create the configuration for each node using the command line program in the following location:
+
+src/main/java/com/att/research/mdbc/tools/CreateNodeConfiguration.java
+
+To run it, use the following parameters:
+
+-t ../ETDB/src/main/java/com/att/research/mdbc/configurations/tableConfiguration.json -b base -o /Users/quique/Desktop/
+
+This program is going to generate all the required configuration json for each ETDB node in the system and additionally initialize all the corresponding rows and tables for the system to correctly work. The meaning of the parameters is:
+• -t: the tableConfiguration.json explained in the step 1
+• -b: is the basename that is going to prepend to all the output files
+• -d: output directory where all the configuration files are going to be saved (It has to exist already)
+
+Some notes about the limitations of this command line program:
+• It cannot handle multiple nodes handling the same lock. For example when creating a new row (or modifying one) in the table to partition table, the program is just going to crash
+• The creation of tables doesn’t include replication yet.
+• It doesn’t create the directory for the output configurations.
+
+3) Run each of the server in its corresponding node: The ETDB server can be found in the file:
+
+src/main/java/com/att/research/mdbc/MdbcServer.java
+
+It requires three parameters:
+
+ -c ../ETDB/src/main/java/com/att/research/mdbc/configurations/config-0.json -u jdbc:mysql://localhost -p 30000
+
+ -c is a json with the configuration created in step 2.
+• -u is where the local mysql database is located (without the database name, just the url, see example)
+• -p is the port that server is going to be used
+
+4) Run the clients. A client example can be found in this folder:
+
+src/main/java/com/att/research/mdbc/examples
+
+
+
+
## Building ETDB
ETDB is built with Maven. This directory contains two pom.xml files.
diff --git a/pom.xml b/pom.xml
index 8ef98e3..aa693ba 100755
--- a/pom.xml
+++ b/pom.xml
@@ -96,7 +96,7 @@
<dependency>
<groupId>org.onap.music</groupId>
<artifactId>MUSIC</artifactId>
- <version>3.0.2</version>
+ <version>3.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.calcite.avatica/avatica-server -->
<dependency>
diff --git a/src/main/java/com/att/research/mdbc/DatabaseOperations.java b/src/main/java/com/att/research/mdbc/DatabaseOperations.java
index 406152e..750e918 100644
--- a/src/main/java/com/att/research/mdbc/DatabaseOperations.java
+++ b/src/main/java/com/att/research/mdbc/DatabaseOperations.java
@@ -6,7 +6,7 @@ import org.onap.music.datastore.PreparedQueryObject;
import org.onap.music.exceptions.MusicLockingException;
import org.onap.music.exceptions.MusicQueryException;
import org.onap.music.exceptions.MusicServiceException;
-import org.onap.music.main.MusicPureCassaCore;
+import org.onap.music.main.MusicCore;
import org.onap.music.main.ResultType;
import org.onap.music.main.ReturnType;
@@ -38,9 +38,11 @@ public class DatabaseOperations {
* @param namespace namespace where the TableToPartition resides
* @param tableToPartitionTableName name of the tableToPartition table
* @param tableName name of the application table that is being added to the system
- * @param lockId if the lock for this key is already hold, this is the id of that lock. May be <code>null</code> if lock is not hold for the corresponding key
+ * @param lockId if the lock for this key is already hold, this is the id of that lock.
+ * May be <code>null</code> if lock is not hold for the corresponding key
*/
- public static void createNewTableToPartitionRow(String namespace, String tableToPartitionTableName, String tableName,String lockId) throws MDBCServiceException {
+ public static void createNewTableToPartitionRow(String namespace, String tableToPartitionTableName,
+ String tableName,String lockId) throws MDBCServiceException {
final String primaryKey = getTableToPartitionPrimaryKey(namespace,tableToPartitionTableName,tableName);
StringBuilder insert = new StringBuilder("INSERT INTO ")
.append(namespace)
@@ -66,9 +68,11 @@ public class DatabaseOperations {
* @param tableToPartitionTableName name of the tableToPartition table
* @param table name of the application table that is being added to the system
* @param newPartition partition to which the application table is assigned
- * @param lockId if the lock for this key is already hold, this is the id of that lock. May be <code>null</code> if lock is not hold for the corresponding key
+ * @param lockId if the lock for this key is already hold, this is the id of that lock.
+ * May be <code>null</code> if lock is not hold for the corresponding key
*/
- public static void updateTableToPartition(String namespace, String tableToPartitionTableName, String table, String newPartition, String lockId) throws MDBCServiceException {
+ public static void updateTableToPartition(String namespace, String tableToPartitionTableName,
+ String table, String newPartition, String lockId) throws MDBCServiceException {
final String primaryKey = getTableToPartitionPrimaryKey(namespace,tableToPartitionTableName,table);
PreparedQueryObject query = new PreparedQueryObject();
StringBuilder update = new StringBuilder("UPDATE ")
@@ -105,7 +109,8 @@ public class DatabaseOperations {
* @param lockId if the lock for this key is already hold, this is the id of that lock. May be <code>null</code> if lock is not hold for the corresponding key
* @return the partition uuid associated to the new row
*/
- public static String createPartitionInfoRow(String namespace, String partitionInfoTableName, int replicationFactor, List<String> tables, String lockId) throws MDBCServiceException {
+ public static String createPartitionInfoRow(String namespace, String partitionInfoTableName,
+ int replicationFactor, List<String> tables, String lockId) throws MDBCServiceException {
String id = generateUniqueKey();
final String primaryKey = getPartitionInformationPrimaryKey(namespace,partitionInfoTableName,id);
StringBuilder insert = new StringBuilder("INSERT INTO ")
@@ -149,7 +154,8 @@ public class DatabaseOperations {
* @param owner owner that is handling the new tit row (url to the corresponding etdb nodej
* @param lockId if the lock for this key is already hold, this is the id of that lock. May be <code>null</code> if lock is not hold for the corresponding key
*/
- public static void updateRedoRow(String namespace, String partitionInfoTableName, String partitionId, RedoRow newTitRow, String owner, String lockId) throws MDBCServiceException {
+ public static void updateRedoRow(String namespace, String partitionInfoTableName, String partitionId,
+ RedoRow newTitRow, String owner, String lockId) throws MDBCServiceException {
final String primaryKey = getTableToPartitionPrimaryKey(namespace,partitionInfoTableName,partitionId);
PreparedQueryObject query = new PreparedQueryObject();
String newOwner = (owner==null)?"":owner;
@@ -182,7 +188,8 @@ public class DatabaseOperations {
* @param firstTitRow first tit associated to the partition
* @param partitionId partition for which a history is created
*/
- public static void createRedoHistoryBeginRow(String namespace, String redoHistoryTableName, RedoRow firstTitRow, String partitionId, String lockId) throws MDBCServiceException {
+ public static void createRedoHistoryBeginRow(String namespace, String redoHistoryTableName,
+ RedoRow firstTitRow, String partitionId, String lockId) throws MDBCServiceException {
createRedoHistoryRow(namespace,redoHistoryTableName,firstTitRow,partitionId, new ArrayList<>(),lockId);
}
@@ -194,7 +201,8 @@ public class DatabaseOperations {
* @param partitionId partition for which a history is created
* @param parentsRows parent tit rows associated to this partition
*/
- public static void createRedoHistoryRow(String namespace, String redoHistoryTableName, RedoRow currentRow, String partitionId, List<RedoRow> parentsRows, String lockId) throws MDBCServiceException {
+ public static void createRedoHistoryRow(String namespace, String redoHistoryTableName,
+ RedoRow currentRow, String partitionId, List<RedoRow> parentsRows, String lockId) throws MDBCServiceException {
final String primaryKey = partitionId+"-"+currentRow.getRedoTableName()+"-"+currentRow.getRedoRowIndex();
StringBuilder insert = new StringBuilder("INSERT INTO ")
.append(namespace)
@@ -240,7 +248,8 @@ public class DatabaseOperations {
* @param partitionId partition to which the redo log is hold
* @return uuid associated to the new row
*/
- public static String CreateEmptyTitRow(String namespace, String titTableName, String partitionId, String lockId) throws MDBCServiceException {
+ public static String CreateEmptyTitRow(String namespace, String titTableName,
+ String partitionId, String lockId) throws MDBCServiceException {
String id = generateUniqueKey();
StringBuilder insert = new StringBuilder("INSERT INTO ")
.append(namespace)
@@ -274,7 +283,8 @@ public class DatabaseOperations {
* * Redo: list of uiids associated to the Redo Records Table
*
*/
- public static void CreateTransactionInformationTable( String musicNamespace, String transactionInformationTableName) throws MDBCServiceException {
+ public static void CreateTransactionInformationTable( String musicNamespace,
+ String transactionInformationTableName) throws MDBCServiceException {
String tableName = transactionInformationTableName;
String priKey = "id";
StringBuilder fields = new StringBuilder();
@@ -284,7 +294,8 @@ public class DatabaseOperations {
fields.append("applied boolean, ");
//TODO: Frozen is only needed for old versions of cassandra, please update correspondingly
fields.append("redo list<frozen<tuple<text,tuple<text,varint>>>> ");
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));",
+ musicNamespace, tableName, fields, priKey);
try {
executeMusicWriteQuery(musicNamespace,tableName,cql);
} catch (MDBCServiceException e) {
@@ -299,7 +310,8 @@ public class DatabaseOperations {
* * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
* * TransactionDigest: text that contains all the changes in the transaction
*/
- public static void CreateRedoRecordsTable(int redoTableNumber, String musicNamespace, String redoRecordTableName) throws MDBCServiceException {
+ public static void CreateRedoRecordsTable(int redoTableNumber, String musicNamespace,
+ String redoRecordTableName) throws MDBCServiceException {
String tableName = redoRecordTableName;
if(redoTableNumber >= 0) {
StringBuilder table = new StringBuilder();
@@ -313,7 +325,8 @@ public class DatabaseOperations {
fields.append("leaseid text, ");
fields.append("leasecounter varint, ");
fields.append("transactiondigest text ");//notice lack of ','
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));",
+ musicNamespace, tableName, fields, priKey);
try {
executeMusicWriteQuery(musicNamespace,tableName,cql);
} catch (MDBCServiceException e) {
@@ -325,14 +338,16 @@ public class DatabaseOperations {
/**
* This function creates the Table To Partition table. It contain information related to
*/
- public static void CreateTableToPartitionTable(String musicNamespace, String tableToPartitionTableName) throws MDBCServiceException {
+ public static void CreateTableToPartitionTable(String musicNamespace, String tableToPartitionTableName)
+ throws MDBCServiceException {
String tableName = tableToPartitionTableName;
String priKey = "tablename";
StringBuilder fields = new StringBuilder();
fields.append("tablename text, ");
fields.append("partition uuid, ");
fields.append("previouspartitions set<uuid> ");
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));",
+ musicNamespace, tableName, fields, priKey);
try {
executeMusicWriteQuery(musicNamespace,tableName,cql);
} catch (MDBCServiceException e) {
@@ -341,7 +356,8 @@ public class DatabaseOperations {
}
}
- public static void CreatePartitionInfoTable(String musicNamespace, String partitionInformationTableName) throws MDBCServiceException {
+ public static void CreatePartitionInfoTable(String musicNamespace, String partitionInformationTableName)
+ throws MDBCServiceException {
String tableName = partitionInformationTableName;
String priKey = "partition";
StringBuilder fields = new StringBuilder();
@@ -351,7 +367,8 @@ public class DatabaseOperations {
fields.append("tables set<text>, ");
fields.append("replicationfactor int, ");
fields.append("currentowner text");
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));",
+ musicNamespace, tableName, fields, priKey);
try {
executeMusicWriteQuery(musicNamespace,tableName,cql);
} catch (MDBCServiceException e) {
@@ -360,7 +377,8 @@ public class DatabaseOperations {
}
}
- public static void CreateRedoHistoryTable(String musicNamespace, String redoHistoryTableName) throws MDBCServiceException {
+ public static void CreateRedoHistoryTable(String musicNamespace, String redoHistoryTableName)
+ throws MDBCServiceException {
String tableName = redoHistoryTableName;
String priKey = "partition,redotable,redoindex";
StringBuilder fields = new StringBuilder();
@@ -369,7 +387,8 @@ public class DatabaseOperations {
fields.append("redoindex uuid, ");
//TODO: Frozen is only needed for old versions of cassandra, please update correspondingly
fields.append("previousredo set<frozen<tuple<text,uuid>>>");
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));",
+ musicNamespace, tableName, fields, priKey);
try {
executeMusicWriteQuery(musicNamespace,tableName,cql);
} catch (MDBCServiceException e) {
@@ -382,12 +401,13 @@ public class DatabaseOperations {
* This method executes a write query in Music
* @param cql the CQL to be sent to Cassandra
*/
- protected static void executeMusicWriteQuery(String keyspace, String table, String cql) throws MDBCServiceException {
+ protected static void executeMusicWriteQuery(String keyspace, String table, String cql)
+ throws MDBCServiceException {
PreparedQueryObject pQueryObject = new PreparedQueryObject();
pQueryObject.appendQueryString(cql);
ResultType rt = null;
try {
- rt = MusicPureCassaCore.createTable(keyspace,table,pQueryObject,"critical");
+ rt = MusicCore.createTable(keyspace,table,pQueryObject,"critical");
} catch (MusicServiceException e) {
e.printStackTrace();
}
@@ -396,11 +416,13 @@ public class DatabaseOperations {
}
}
- protected static void executedLockedPut(String namespace, String tableName, String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId, MusicPureCassaCore.Condition conditionInfo) throws MDBCServiceException {
+ protected static void executedLockedPut(String namespace, String tableName,
+ String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId,
+ MusicCore.Condition conditionInfo) throws MDBCServiceException {
ReturnType rt ;
if(lockId==null) {
try {
- rt = MusicPureCassaCore.atomicPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, conditionInfo);
+ rt = MusicCore.atomicPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, conditionInfo);
} catch (MusicLockingException e) {
logger.error("Music locked put failed");
throw new MDBCServiceException("Music locked put failed");
@@ -413,7 +435,7 @@ public class DatabaseOperations {
}
}
else {
- rt = MusicPureCassaCore.criticalPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, lockId, conditionInfo);
+ rt = MusicCore.criticalPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, lockId, conditionInfo);
}
if (rt.getResult().getResult().toLowerCase().equals("failure")) {
throw new MDBCServiceException("Music locked put failed");
@@ -430,7 +452,7 @@ public class DatabaseOperations {
"CREATE KEYSPACE " + namespace + " WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":"));
try {
- MusicPureCassaCore.nonKeyRelatedPut(queryObject, "critical");
+ MusicCore.nonKeyRelatedPut(queryObject, "critical");
} catch (MusicServiceException e) {
if (e.getMessage().equals("Keyspace "+namespace+" already exists")) {
// ignore
diff --git a/src/main/java/com/att/research/mdbc/configurations/TablesConfiguration.java b/src/main/java/com/att/research/mdbc/configurations/TablesConfiguration.java
index 0d28b51..664520f 100644
--- a/src/main/java/com/att/research/mdbc/configurations/TablesConfiguration.java
+++ b/src/main/java/com/att/research/mdbc/configurations/TablesConfiguration.java
@@ -8,7 +8,7 @@ import com.att.research.mdbc.mixins.CassandraMixin;
import com.google.gson.Gson;
import org.onap.music.datastore.PreparedQueryObject;
import org.onap.music.exceptions.MusicServiceException;
-import org.onap.music.main.MusicPureCassaCore;
+import org.onap.music.main.MusicCore;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
@@ -100,7 +100,7 @@ public class TablesConfiguration {
PreparedQueryObject queryObject = new PreparedQueryObject();
queryObject.appendQueryString(createKeysTableCql.toString());
try {
- MusicPureCassaCore.createTable(internalNamespace,"unsynced_keys", queryObject,"critical");
+ MusicCore.createTable(internalNamespace,"unsynced_keys", queryObject,"critical");
} catch (MusicServiceException e) {
logger.error("Error creating unsynced keys table" );
throw new MDBCServiceException("Error creating unsynced keys table");
diff --git a/src/main/java/com/att/research/mdbc/mixins/Cassandra2Mixin.java b/src/main/java/com/att/research/mdbc/mixins/Cassandra2Mixin.java
index cc67edf..1e57e60 100755
--- a/src/main/java/com/att/research/mdbc/mixins/Cassandra2Mixin.java
+++ b/src/main/java/com/att/research/mdbc/mixins/Cassandra2Mixin.java
@@ -12,7 +12,7 @@ import org.json.JSONObject;
import org.json.JSONTokener;
import org.onap.music.datastore.PreparedQueryObject;
import org.onap.music.exceptions.MusicServiceException;
-import org.onap.music.main.MusicPureCassaCore;
+import org.onap.music.main.MusicCore;
import org.onap.music.main.ReturnType;
import com.att.research.logging.EELFLoggerDelegate;
@@ -127,7 +127,7 @@ public class Cassandra2Mixin extends CassandraMixin {
pQueryObject.addValue(tableName);
pQueryObject.addValue(myId);
pQueryObject.addValue(keys);
- ReturnType rt = MusicPureCassaCore.eventualPut(pQueryObject);
+ ReturnType rt = MusicCore.eventualPut(pQueryObject);
if(rt.getResult().getResult().toLowerCase().equals("failure")) {
logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage());
}
@@ -155,7 +155,7 @@ public class Cassandra2Mixin extends CassandraMixin {
pQueryObject.addValue(myId);
ResultSet results = null;
try {
- results = MusicPureCassaCore.get(pQueryObject);
+ results = MusicCore.get(pQueryObject);
} catch (MusicServiceException e) {
e.printStackTrace();
}
@@ -278,7 +278,7 @@ public class Cassandra2Mixin extends CassandraMixin {
pQueryObject.addValue(tableName);
pQueryObject.addValue(repl);
pQueryObject.addValue(buildJSON(ti, tableName, keys));
- ReturnType rt = MusicPureCassaCore.eventualPut(pQueryObject);
+ ReturnType rt = MusicCore.eventualPut(pQueryObject);
if(rt.getResult().getResult().toLowerCase().equals("failure")) {
System.out.println("Failure while critical put..."+rt.getMessage());
}
diff --git a/src/main/java/com/att/research/mdbc/mixins/CassandraMixin.java b/src/main/java/com/att/research/mdbc/mixins/CassandraMixin.java
index 82c7169..033179a 100755
--- a/src/main/java/com/att/research/mdbc/mixins/CassandraMixin.java
+++ b/src/main/java/com/att/research/mdbc/mixins/CassandraMixin.java
@@ -27,11 +27,12 @@ import com.att.research.mdbc.tables.TxCommitProgress;
import org.json.JSONObject;
import org.onap.music.datastore.CassaLockStore;
+import org.onap.music.datastore.CassaLockStore.LockObject;
import org.onap.music.datastore.PreparedQueryObject;
import org.onap.music.exceptions.MusicLockingException;
import org.onap.music.exceptions.MusicQueryException;
import org.onap.music.exceptions.MusicServiceException;
-import org.onap.music.main.MusicPureCassaCore;
+import org.onap.music.main.MusicCore;
import org.onap.music.main.ResultType;
import org.onap.music.main.ReturnType;
@@ -176,7 +177,7 @@ public class CassandraMixin implements MusicInterface {
"CREATE KEYSPACE " + this.music_ns + " WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":"));
try {
- MusicPureCassaCore.nonKeyRelatedPut(queryObject, "eventual");
+ MusicCore.nonKeyRelatedPut(queryObject, "eventual");
} catch (MusicServiceException e) {
if (e.getMessage().equals("Keyspace "+this.music_ns+" already exists")) {
// ignore
@@ -416,7 +417,7 @@ public class CassandraMixin implements MusicInterface {
String cql = String.format("DELETE FROM %s.DIRTY_%s WHERE %s;", music_ns, tableName, cols.toString());
logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
pQueryObject.appendQueryString(cql);
- ReturnType rt = MusicPureCassaCore.eventualPut(pQueryObject);
+ ReturnType rt = MusicCore.eventualPut(pQueryObject);
if(rt.getResult().getResult().toLowerCase().equals("failure")) {
System.out.println("Failure while cleanDirtyRow..."+rt.getMessage());
}
@@ -450,7 +451,7 @@ public class CassandraMixin implements MusicInterface {
PreparedQueryObject pQueryObject = new PreparedQueryObject();
pQueryObject.appendQueryString(cql);
try {
- results = MusicPureCassaCore.get(pQueryObject);
+ results = MusicCore.get(pQueryObject);
} catch (MusicServiceException e) {
e.printStackTrace();
@@ -564,7 +565,7 @@ public class CassandraMixin implements MusicInterface {
if(MusicMixin.criticalTables.contains(tableName)) {
ReturnType rt = null;
try {
- rt = MusicPureCassaCore.atomicPut(music_ns, tableName, primaryKey, pQueryObject, null);
+ rt = MusicCore.atomicPut(music_ns, tableName, primaryKey, pQueryObject, null);
} catch (MusicLockingException e) {
e.printStackTrace();
} catch (MusicServiceException e) {
@@ -576,7 +577,7 @@ public class CassandraMixin implements MusicInterface {
System.out.println("Failure while critical put..."+rt.getMessage());
}
} else {
- ReturnType rt = MusicPureCassaCore.eventualPut(pQueryObject);
+ ReturnType rt = MusicCore.eventualPut(pQueryObject);
if(rt.getResult().getResult().toLowerCase().equals("failure")) {
System.out.println("Failure while critical put..."+rt.getMessage());
}
@@ -627,7 +628,7 @@ public class CassandraMixin implements MusicInterface {
ResultSet dirtyRows = null;
try {
//\TODO Why is this an eventual put?, this should be an atomic
- dirtyRows = MusicPureCassaCore.get(pQueryObject);
+ dirtyRows = MusicCore.get(pQueryObject);
} catch (MusicServiceException e) {
e.printStackTrace();
@@ -890,7 +891,7 @@ public class CassandraMixin implements MusicInterface {
logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql);
PreparedQueryObject pQueryObject = new PreparedQueryObject();
pQueryObject.appendQueryString(cql);
- ReturnType rt = MusicPureCassaCore.eventualPut(pQueryObject);
+ ReturnType rt = MusicCore.eventualPut(pQueryObject);
if(rt.getResult().getResult().toLowerCase().equals("failure")) {
logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage());
}
@@ -913,7 +914,7 @@ public class CassandraMixin implements MusicInterface {
pQueryObject.appendQueryString(cql);
ResultSet results = null;
try {
- results = MusicPureCassaCore.get(pQueryObject);
+ results = MusicCore.get(pQueryObject);
} catch (MusicServiceException e) {
e.printStackTrace();
@@ -1005,7 +1006,7 @@ public class CassandraMixin implements MusicInterface {
if(MusicMixin.criticalTables.contains(tableName)) {
ReturnType rt = null;
try {
- rt = MusicPureCassaCore.atomicPut(music_ns, tableName, primaryKey, pQObject, null);
+ rt = MusicCore.atomicPut(music_ns, tableName, primaryKey, pQObject, null);
} catch (MusicLockingException e) {
e.printStackTrace();
} catch (MusicServiceException e) {
@@ -1017,7 +1018,7 @@ public class CassandraMixin implements MusicInterface {
System.out.println("Failure while critical put..."+rt.getMessage());
}
} else {
- ReturnType rt = MusicPureCassaCore.eventualPut(pQObject);
+ ReturnType rt = MusicCore.eventualPut(pQObject);
if(rt.getResult().getResult().toLowerCase().equals("failure")) {
System.out.println("Failure while critical put..."+rt.getMessage());
}
@@ -1045,10 +1046,10 @@ public class CassandraMixin implements MusicInterface {
protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition, String keyspace, String table, String key) throws MDBCServiceException {
String lockId;
- lockId = MusicPureCassaCore.createLockReference(fullyQualifiedKey);
+ lockId = MusicCore.createLockReference(fullyQualifiedKey);
ReturnType lockReturn;
try {
- lockReturn = MusicPureCassaCore.acquireLock(fullyQualifiedKey,lockId);
+ lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId);
} catch (MusicLockingException e) {
logger.error(EELFLoggerDelegate.errorLogger, "Lock was not acquire correctly for key "+fullyQualifiedKey);
throw new MDBCServiceException("Lock was not acquire correctly for key "+fullyQualifiedKey);
@@ -1062,22 +1063,20 @@ public class CassandraMixin implements MusicInterface {
//\TODO this is wrong, we should have a better way to obtain a lock forcefully, clean the queue and obtain the lock
if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) {
try {
- MusicPureCassaCore.releaseLock(fullyQualifiedKey,lockId,false);
- CassaLockStore lockingServiceHandle = MusicPureCassaCore.getLockingServiceHandle();
- UUID uuid = lockingServiceHandle.peekLockQueue(keyspace, table, key);
- String uuidStr = uuid.toString();
- while(uuidStr != lockId) {
- MusicPureCassaCore.releaseLock(fullyQualifiedKey, uuid.toString(), false);
- try {
- uuid = lockingServiceHandle.peekLockQueue(keyspace, table, key);
- uuidStr = uuid.toString();
+ MusicCore.forciblyReleaseLock(fullyQualifiedKey,lockId);
+ CassaLockStore lockingServiceHandle = MusicCore.getLockingServiceHandle();
+ LockObject lockOwner = lockingServiceHandle.peekLockQueue(keyspace, table, key);
+ while(lockOwner.lockRef != lockId) {
+ MusicCore.forciblyReleaseLock(fullyQualifiedKey, lockOwner.lockRef);
+ try {
+ lockOwner = lockingServiceHandle.peekLockQueue(keyspace, table, key);
} catch(NullPointerException e){
//Ignore null pointer exception
- lockId = MusicPureCassaCore.createLockReference(fullyQualifiedKey);
- uuidStr = lockId;
+ lockId = MusicCore.createLockReference(fullyQualifiedKey);
+ break;
}
}
- lockReturn = MusicPureCassaCore.acquireLock(fullyQualifiedKey,lockId);
+ lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId);
} catch (MusicLockingException e) {
throw new MDBCServiceException("Could not lock the corresponding lock");
@@ -1117,7 +1116,7 @@ public class CassandraMixin implements MusicInterface {
query.appendQueryString(cqlQuery.toString());
//\TODO check if I am not shooting on my own foot
try {
- MusicPureCassaCore.nonKeyRelatedPut(query,"critical");
+ MusicCore.nonKeyRelatedPut(query,"critical");
} catch (MusicServiceException e) {
logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+commitId);
throw new MDBCServiceException("Transaction Digest serialization for commit "+commitId);
@@ -1132,7 +1131,7 @@ public class CassandraMixin implements MusicInterface {
.append(commitId)
.append(")");
PreparedQueryObject appendQuery = createAppendRRTIndexToTitQuery(transactionInformationTableName, TITIndex, redoRecordTableName, redoUuidBuilder.toString());
- ReturnType returnType = MusicPureCassaCore.criticalPut(music_ns, transactionInformationTableName, TITIndex, appendQuery, lockId, null);
+ ReturnType returnType = MusicCore.criticalPut(music_ns, transactionInformationTableName, TITIndex, appendQuery, lockId, null);
if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage());
throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage());
diff --git a/src/main/java/com/att/research/mdbc/mixins/MusicConnector.java b/src/main/java/com/att/research/mdbc/mixins/MusicConnector.java
index ea32a85..8df6ed5 100755
--- a/src/main/java/com/att/research/mdbc/mixins/MusicConnector.java
+++ b/src/main/java/com/att/research/mdbc/mixins/MusicConnector.java
@@ -15,7 +15,7 @@ import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
-import org.onap.music.main.MusicPureCassaCore;
+import org.onap.music.main.MusicCore;
/**
* This class allows for management of the Cassandra Cluster and Session objects.
@@ -70,7 +70,7 @@ public class MusicConnector {
}
private void connectToMultipleAddresses(String address) {
- MusicPureCassaCore.getDSHandle(address);
+ MusicCore.getDSHandle(address);
/*
PoolingOptions poolingOptions =
new PoolingOptions()
diff --git a/src/main/java/com/att/research/mdbc/mixins/MusicMixin.java b/src/main/java/com/att/research/mdbc/mixins/MusicMixin.java
index c8e6944..bf865b2 100644
--- a/src/main/java/com/att/research/mdbc/mixins/MusicMixin.java
+++ b/src/main/java/com/att/research/mdbc/mixins/MusicMixin.java
@@ -27,7 +27,7 @@ import com.att.research.mdbc.tables.TitReference;
import com.att.research.mdbc.tables.TransactionInformationElement;
import com.att.research.mdbc.tables.TxCommitProgress;
-import org.onap.music.main.MusicPureCassaCore;
+import org.onap.music.main.MusicCore;
/**
@@ -166,8 +166,8 @@ public class MusicMixin implements MusicInterface {
for(LockId lockId: lockIds) {
System.out.println("Releasing lock: "+lockId);
try {
- MusicPureCassaCore.voluntaryReleaseLock(lockId.getFullyQualifiedLockKey(),lockId.getLockReference());
- MusicPureCassaCore.destroyLockRef(lockId.getFullyQualifiedLockKey(),lockId.getLockReference());
+ MusicCore.voluntaryReleaseLock(lockId.getFullyQualifiedLockKey(),lockId.getLockReference());
+ MusicCore.destroyLockRef(lockId.getFullyQualifiedLockKey(),lockId.getLockReference());
} catch (MusicLockingException e) {
e.printStackTrace();
}
diff --git a/src/main/java/com/att/research/mdbc/tools/CreateNodeConfigurations.java b/src/main/java/com/att/research/mdbc/tools/CreateNodeConfigurations.java
index 555b863..f0eca5b 100644
--- a/src/main/java/com/att/research/mdbc/tools/CreateNodeConfigurations.java
+++ b/src/main/java/com/att/research/mdbc/tools/CreateNodeConfigurations.java
@@ -6,7 +6,6 @@ import com.att.research.mdbc.configurations.NodeConfiguration;
import com.att.research.mdbc.configurations.TablesConfiguration;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
-import org.onap.music.main.MusicPureCassaCore;
import java.io.FileNotFoundException;
import java.util.List;