aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xmdbc-server/pom.xml12
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java4
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java18
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java5
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java59
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java1
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java60
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java9
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java10
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestMultiClient.java201
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java13
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java50
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java267
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java137
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java19
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java20
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java22
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java189
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java21
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java15
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java2
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java3
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java12
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tools/TxDigestDecompression.java41
-rw-r--r--mdbc-server/src/main/resources/key.properties1
-rwxr-xr-xmdbc-server/src/main/resources/mdbc.properties8
-rwxr-xr-xmdbc-server/src/main/resources/music.properties3
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java26
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java27
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MySQLMixinTest.java95
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java6
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java18
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java13
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/query/QueryProcessorTest.java10
-rwxr-xr-xmdbc-server/src/test/java/org/onap/music/mdbc/test/CrossSiteTest.java25
-rwxr-xr-xmdbc-server/src/test/java/org/onap/music/mdbc/test/TransactionTest.java6
-rwxr-xr-xpom.xml11
37 files changed, 964 insertions, 475 deletions
diff --git a/mdbc-server/pom.xml b/mdbc-server/pom.xml
index 7a46120..42fb9f8 100755
--- a/mdbc-server/pom.xml
+++ b/mdbc-server/pom.xml
@@ -180,8 +180,8 @@
</dependency>
<dependency>
<groupId>org.onap.music</groupId>
- <artifactId>dev-MUSIC-cassandra</artifactId>
- <version>3.2.1</version>
+ <artifactId>MUSIC-core</artifactId>
+ <version>3.2.37-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
@@ -211,7 +211,13 @@
<version>0.13.1</version>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>javax.websocket</groupId>
+ <artifactId>javax.websocket-api</artifactId>
+ <version>1.1</version>
+ <scope>provided</scope>
+ </dependency>
+
</dependencies>
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
index 91b13f3..efe4c21 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
@@ -48,4 +48,8 @@ public class Configuration {
public static final String KEY_WARMUPRANGES = "warmupranges";
/** Default async staging table update o ption*/
public static final String ASYNC_STAGING_TABLE_UPDATE = "false";
+ /** The property name to determine if only write locks are allowed */
+ public static final String KEY_WRITE_LOCKS_ONLY = "write_locks_only";
+ /** Default if only write locks are allowed */
+ public static final Boolean WRITE_LOCK_ONLY_DEFAULT = false;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
index 314248f..4122623 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
@@ -48,12 +48,18 @@ public class DatabasePartition {
* The only requirement is that the ranges are not overlapping.
*/
- public DatabasePartition() {
- this(new HashSet<Range>(),null,"");
- }
public DatabasePartition(UUID mriIndex) {
- this(new HashSet<Range>(), mriIndex,"");
+ this(new HashSet<Range>(), mriIndex);
+ }
+
+ /**
+ * Create unlocked partition
+ * @param ranges
+ * @param mriIndex
+ */
+ public DatabasePartition(Set<Range> ranges, UUID mriIndex) {
+ this(ranges, mriIndex, null);
}
public DatabasePartition(Set<Range> knownRanges, UUID mriIndex, String lockId) {
@@ -90,7 +96,9 @@ public class DatabasePartition {
}
- public synchronized boolean isLocked(){return lockId != null && !lockId.isEmpty(); }
+ public synchronized boolean isLocked(){
+ return lockId != null && !lockId.isEmpty();
+ }
public synchronized boolean isReady() {
return ready;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
index b60062e..ee742f8 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
@@ -49,6 +49,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONObject;
public class MDBCUtils {
+ public static boolean writeLocksOnly = false;
public static void saveToFile(String serializedContent, String filename, EELFLoggerDelegate logger) throws IOException {
try (PrintWriter fout = new PrintWriter(filename)) {
@@ -129,6 +130,10 @@ public class MDBCUtils {
* @return write if any table has a write query. Read otherwise
*/
public static SQLOperationType getOperationType(Map<String, List<SQLOperation>> tableToQueryType) {
+ if (writeLocksOnly) {
+ return SQLOperationType.WRITE;
+ }
+
for (List<org.onap.music.mdbc.query.SQLOperation> tablesOps : tableToQueryType.values()) {
for (org.onap.music.mdbc.query.SQLOperation op : tablesOps) {
if (op.getOperationType() != SQLOperationType.READ) {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
index 2294673..1707c03 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
@@ -44,6 +44,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executor;
import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.exceptions.MusicDeadlockException;
import org.onap.music.exceptions.QueryException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.logging.format.AppMessages;
@@ -61,6 +62,7 @@ import org.onap.music.mdbc.query.QueryProcessor;
import org.onap.music.mdbc.query.SQLOperation;
import org.onap.music.mdbc.query.SQLOperationType;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.TxCommitProgress;
@@ -82,9 +84,14 @@ public class MdbcConnection implements Connection {
private final TxCommitProgress progressKeeper;
private final DBInterface dbi;
private final StagingTable transactionDigest;
+ /** Set of tables in db */
private final Set<String> table_set;
private final StateManager statemanager;
+ /** partition owned for this transaction */
private DatabasePartition partition;
+ /** ranges needed for this transaction */
+ private Set<Range> rangesUsed;
+ private String ownerId = UUID.randomUUID().toString();
public MdbcConnection(String id, String url, Connection c, Properties info, MusicInterface mi,
TxCommitProgress progressKeeper, DatabasePartition partition, StateManager statemanager) throws MDBCServiceException {
@@ -187,9 +194,18 @@ public class MdbcConnection implements Connection {
dbi.preCommitHook();
try {
+ partition = mi.splitPartitionIfNecessary(partition, rangesUsed, ownerId);
+ } catch (MDBCServiceException e) {
+ logger.warn(EELFLoggerDelegate.errorLogger,
+ "Failure to split partition '" + partition.getMRIIndex() + "' trying to continue",
+ AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
+ }
+
+ try {
logger.debug(EELFLoggerDelegate.applicationLogger, " commit ");
// transaction was committed -- add all the updates into the REDO-Log in MUSIC
- mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper);
+ MusicTxDigestId digestCreated = mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper);
+ statemanager.getOwnAndCheck().updateAlreadyApplied(mi, dbi, partition.getSnapshot(), partition.getMRIIndex(), digestCreated);
} catch (MDBCServiceException e) {
//If the commit fail, then a new commitId should be used
logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
@@ -519,19 +535,17 @@ public class MdbcConnection implements Connection {
//Check ownership of keys
String defaultSchema = dbi.getSchema();
Set<Range> queryTables = MDBCUtils.getTables(defaultSchema, tableToQueryType);
- if (this.partition!=null) {
- Set<Range> snapshot = this.partition.getSnapshot();
- if(snapshot!=null){
- queryTables.addAll(snapshot);
- }
+ if (this.rangesUsed==null) {
+ rangesUsed = queryTables;
+ } else {
+ rangesUsed.addAll(queryTables);
}
// filter out ranges that fall under Eventually consistent
// category as these tables do not need ownership
- Set<Range> scQueryTables = filterEveTables(queryTables);
- DatabasePartition tempPartition = own(scQueryTables, MDBCUtils.getOperationType(tableToQueryType));
+ Set<Range> scRanges = filterEveTables(rangesUsed);
+ DatabasePartition tempPartition = own(scRanges, MDBCUtils.getOperationType(tableToQueryType));
if(tempPartition!=null && tempPartition != partition) {
this.partition.updateDatabasePartition(tempPartition);
- statemanager.getOwnAndCheck().reloadAlreadyApplied(this.partition);
}
dbi.preStatementHook(sql);
}
@@ -599,22 +613,31 @@ public class MdbcConnection implements Connection {
OwnershipAndCheckpoint ownAndCheck = statemanager.getOwnAndCheck();
UUID ownOpId = MDBCUtils.generateTimebasedUniqueKey();
try {
- final OwnershipReturn ownershipReturn = ownAndCheck.own(mi, ranges, partition, ownOpId, lockType);
+ final OwnershipReturn ownershipReturn = ownAndCheck.own(mi, ranges, partition, ownOpId, lockType, ownerId);
if(ownershipReturn==null){
return null;
}
Dag dag = ownershipReturn.getDag();
if(dag!=null) {
- DagNode node = dag.getNode(ownershipReturn.getRangeId());
- MusicRangeInformationRow row = node.getRow();
- Map<MusicRangeInformationRow, LockResult> lock = new HashMap<>();
- lock.put(row, new LockResult(row.getPartitionIndex(), ownershipReturn.getOwnerId(), true, ranges));
- ownAndCheck.checkpoint(this.mi, this.dbi, dag, ranges, lock, ownershipReturn.getOwnershipId());
+ ownAndCheck.checkpoint(this.mi, this.dbi, dag, ranges, ownershipReturn.getOwnershipId());
+ //TODO: need to update pointer in alreadyapplied if a merge happened instead of in prestatement hook
newPartition = new DatabasePartition(ownershipReturn.getRanges(), ownershipReturn.getRangeId(),
- ownershipReturn.getOwnerId());
+ ownershipReturn.getLockId());
}
- }
- finally{
+ } catch (MDBCServiceException e) {
+ MusicDeadlockException de = Utils.getDeadlockException(e);
+ if (de!=null) {
+ //release all partitions
+ mi.releaseAllLocksForOwner(de.getOwner(), de.getKeyspace(), de.getTable());
+ //rollback transaction
+ try {
+ rollback();
+ } catch (SQLException e1) {
+ throw new MDBCServiceException("Failed to rollback transaction after detecting deadlock while taking ownership of table, which, wow", e1);
+ }
+ }
+ throw e;
+ } finally {
ownAndCheck.stopOwnershipTimeoutClock(ownOpId);
}
return newPartition;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
index 500ed81..246044b 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
@@ -101,6 +101,7 @@ public class MdbcServer {
// Then start it
server.start();
+ System.out.println("Started Avatica server on port " + server.getPort());
logger.info("Started Avatica server on port {} with serialization {}", server.getPort(),
serialization);
} catch (Exception e) {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index 66c8fa9..fb39637 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -36,6 +36,7 @@ import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn;
import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicTxDigestDaemon;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.TxCommitProgress;
import java.io.IOException;
@@ -92,7 +93,7 @@ public class StateManager {
/** a set of ranges that should be periodically updated with latest information, if null all tables should be warmed up */
private Set<Range> rangesToWarmup;
/** map of transactions that have already been applied/updated in this sites SQL db */
- private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
+ private Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied;
private OwnershipAndCheckpoint ownAndCheck;
private Thread txDaemon ;
@@ -114,6 +115,7 @@ public class StateManager {
//\fixme this might not be used, delete?
try {
info.load(this.getClass().getClassLoader().getResourceAsStream("music.properties"));
+ info.load(this.getClass().getClassLoader().getResourceAsStream("key.properties"));
info.putAll(MDBCUtils.getMdbcProperties());
} catch (IOException e) {
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
@@ -122,13 +124,17 @@ public class StateManager {
cassandraUrl = info.getProperty(Configuration.KEY_CASSANDRA_URL, Configuration.CASSANDRA_URL_DEFAULT);
musicmixin = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT);
+ String writeLocksOnly = info.getProperty(Configuration.KEY_WRITE_LOCKS_ONLY);
+ MDBCUtils.writeLocksOnly = (writeLocksOnly==null) ? Configuration.WRITE_LOCK_ONLY_DEFAULT : Boolean.parseBoolean(writeLocksOnly);
+
initMusic();
- initSqlDatabase();
- initTxDaemonThread();
+ Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = initSqlDatabase();
+
String t = info.getProperty(Configuration.KEY_OWNERSHIP_TIMEOUT);
long timeout = (t == null) ? Configuration.DEFAULT_OWNERSHIP_TIMEOUT : Integer.parseInt(t);
- alreadyApplied = new ConcurrentHashMap<>();
ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout);
+
+ initTxDaemonThread();
}
protected String cleanSqlUrl(String url){
@@ -160,7 +166,12 @@ public class StateManager {
this.mdbcConnections = new HashMap<>();
}
- protected void initSqlDatabase() throws MDBCServiceException {
+ /**
+ * Do everything necessary to initialize the sql database
+ * @return the current checkpoint location of this database, if restarting
+ * @throws MDBCServiceException
+ */
+ protected Map<Range, Pair<MriReference, MusicTxDigestId>> initSqlDatabase() throws MDBCServiceException {
if(!this.sqlDBUrl.toLowerCase().startsWith("jdbc:postgresql")) {
try {
Connection sqlConnection = DriverManager.getConnection(this.sqlDBUrl, this.info);
@@ -178,16 +189,21 @@ public class StateManager {
}
}
- // Verify the tables in MUSIC match the tables in the database
- // and create triggers on any tables that need them
+ Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyAppliedToDb = null;
try {
MdbcConnection mdbcConn = (MdbcConnection) openConnection("init");
mdbcConn.initDatabase();
+ alreadyAppliedToDb = mdbcConn.getDBInterface().getCheckpointLocations();
closeConnection("init");
} catch (QueryException e) {
- logger.error("Error syncrhonizing tables");
+ logger.error("Error initializing sql database tables");
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
}
+
+ if (alreadyAppliedToDb==null) {
+ alreadyAppliedToDb = new ConcurrentHashMap<>();
+ }
+ return alreadyAppliedToDb;
}
/**
@@ -309,11 +325,9 @@ public class StateManager {
}
mdbcConnections.remove(connectionId);
}
- if(connectionRanges.containsKey(connectionId)){
- //We relinquish all locks obtained by a given
- //relinquish(connectionRanges.get(connectionId));
- connectionRanges.remove(connectionId);
- }
+
+ connectionRanges.remove(connectionId);
+
}
/**
@@ -334,18 +348,12 @@ public class StateManager {
ErrorTypes.QUERYERROR);
sqlConnection = null;
}
- //check if a range was already created for this connection
- //TODO: later we could try to match it to some more sticky client id
- DatabasePartition ranges;
- if(connectionRanges.containsKey(id)){
- ranges=connectionRanges.get(id);
- }
- else{
- //TODO: we don't need to create a partition for each connection
- ranges=new DatabasePartition(musicInterface.generateUniqueKey());
- connectionRanges.put(id,ranges);
- }
- //Create MDBC connection
+
+ //TODO: later we could try to match it to some more sticky client id
+ DatabasePartition ranges=new DatabasePartition(musicInterface.generateUniqueKey());
+ connectionRanges.put(id,ranges);
+
+ //Create MDBC connection
try {
newConnection = new MdbcConnection(id,this.sqlDBUrl+"/"+this.sqlDBName, sqlConnection, info, this.musicInterface,
transactionInfo,ranges, this);
@@ -414,7 +422,7 @@ public class StateManager {
* Close all connections for this server, relinquishing any locks/partitions owned by this server
*/
public void releaseAllPartitions() {
- for(String connection: this.connectionRanges.keySet()) {
+ for(String connection: this.mdbcConnections.keySet()) {
closeConnection(connection);
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java
index 496f48d..3dcfaf0 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java
@@ -34,6 +34,7 @@ import java.io.InputStream;
import java.util.*;
import org.onap.music.mdbc.mixins.MusicInterface;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.service.impl.MusicCassaCore;
public class TestUtils {
@@ -48,14 +49,14 @@ public class TestUtils {
new MusicRangeInformationRow(dbPartition, new ArrayList<>(), true);
MusicRangeInformationRow newRow = new MusicRangeInformationRow(dbPartition, new ArrayList<>(), true);
DatabasePartition partition=null;
- partition = mixin.createLockedMRIRow(newRow);
+ partition = mixin.createLockedMRIRow(newRow, "");
return partition;
}
public static void unlockRow(String keyspace, String mriTableName, DatabasePartition partition)
throws MusicLockingException {
String fullyQualifiedMriKey = keyspace+"."+ mriTableName+"."+partition.getMRIIndex().toString();
- MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId());
+ MusicLockState musicLockState = MusicCassaCore.getInstance().voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId());
}
public static void createKeyspace(String keyspace, Session session) {
@@ -142,7 +143,7 @@ public class TestUtils {
}
}
-
+/*
public static void populateMusicUtilsWithProperties(Properties prop){
//TODO: Learn how to do this properly within music
String[] propKeys = MusicUtil.getPropkeys();
@@ -207,6 +208,6 @@ public class TestUtils {
}
}
-
}
+*/
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java
index 7a09dca..f4f4820 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+
+import org.onap.music.exceptions.MusicDeadlockException;
import org.onap.music.logging.EELFLoggerDelegate;
public class Utils {
@@ -77,4 +79,12 @@ public class Utils {
}
}
}
+
+ public static MusicDeadlockException getDeadlockException(Throwable t) {
+ while (t!=null) {
+ if (t instanceof MusicDeadlockException) return (MusicDeadlockException)t;
+ t = t.getCause();
+ }
+ return null;
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestMultiClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestMultiClient.java
index 7320d34..02b7c7c 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestMultiClient.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestMultiClient.java
@@ -49,6 +49,14 @@ public class MdbcTestMultiClient implements Runnable {
private int selectInsteadOfUpdatePct = 25;
private int rollbackChancePct = 15;
private int maxTables = 0;
+ public static boolean[] threadsDone;
+
+
+ private boolean sequentialIds = false;
+ private static Integer currentId = -1;
+
+ private boolean sequentialFirsts = false;
+ private static Integer currentFirstFirst = 0, currentFirstSecond = 0;
private Long randomSeed = null;
@@ -56,6 +64,7 @@ public class MdbcTestMultiClient implements Runnable {
private static final List<String> defaultTableNames = Arrays.asList(new String[] {"persons", "persons2"});
private boolean explainConnection = true;
+ private boolean endInSelect = true;
public static class Employee {
public final int empid;
@@ -164,12 +173,12 @@ public class MdbcTestMultiClient implements Runnable {
break;
case "-u":
case "--update":
- currState = 'u';
+ doUpdate = false;
break;
case "-x":
case "--delete":
- currState = 'x';
- break;
+ doDelete = false;
+ break;
case "-l":
case "--closeChance":
currState = 'l';
@@ -192,6 +201,12 @@ public class MdbcTestMultiClient implements Runnable {
case "--randomSeed":
currState = '?';
break;
+ case "--sequentialId":
+ sequentialIds = true;
+ break;
+ case "--sequentialFirst":
+ sequentialFirsts = true;
+ break;
default:
System.out.println("Didn't understand switch " + arg);
}
@@ -225,12 +240,6 @@ public class MdbcTestMultiClient implements Runnable {
case 'a':
additionalDelayBetweenTestsMs = Integer.parseInt(arg);
break;
- case 'u':
- doUpdate = arg.toUpperCase().startsWith("Y");
- break;
- case 'x':
- doDelete = arg.toUpperCase().startsWith("Y");
- break;
case 'l':
connectionCloseChancePct = Integer.parseInt(arg);
break;
@@ -265,23 +274,25 @@ public class MdbcTestMultiClient implements Runnable {
private void showHelp() {
System.out.println(
"-?; --help: Show help\n" +
- "-c; --connection: MDBC connection string, may appear multiple times\n" +
- "-e; --tableName: Table name, may appear multiple times\n" +
- "-n; --name: Last name in persons table, default \"Lastname\"\n" +
- "-b; --baseId: Base ID, default 700\n" +
- "-r; --baseRange: Range of ID, default 50\n" +
- "-m; --maxCalls: Max number of commits (each may be 1+ updates), default 50\n" +
- "-t; --maxTime: Max time in ms test will run, default 60000\n" +
- "-d; --minDelay: Min delay between tests in ms, default 1000\n" +
- "-a; --addDelay: Max randomized additional delay between tests in ms, default 1000\n" +
- "-u; --update: Generate update statements; default Y\n" +
- "-x; --delete: Generate delete statements; default Y\n" +
- "-l; --closeChance: Percent chance of closing connection after each commit, default 50\n" +
- "-s; --skipInitialSelect: Percent chance of skipping each initial select in a transaction, default 25\n" +
- "-i; --selectNotUpdate: Percent chance of each action in a transaction being a select instead of an update, default 25\n" +
- "-o; --rollbackChance: Percent chance of rolling back each transaction instead of committing, default 15\n" +
- " --maxTables: Maximum number of tables per transaction, default 0 (no limit)\n" +
- " --randomSeed: Seed for the initial random number generator, default none\n" +
+ "-c; --connection [string]: MDBC connection string, may appear multiple times\n" +
+ "-e; --tableName [string]: Table name, may appear multiple times\n" +
+ "-n; --name [string]: Last name in persons table, default \"Lastname\"\n" +
+ "-b; --baseId [int]: Base ID, default 700\n" +
+ "-r; --baseRange [int]: Range of ID, default 50\n" +
+ "-m; --maxCalls [int]: Max number of commits (each may be 1+ updates), default 50\n" +
+ "-t; --maxTime [int]: Max time in ms test will run, default 60000\n" +
+ "-d; --minDelay [int]: Min delay between tests in ms, default 1000\n" +
+ "-a; --addDelay [int]: Max randomized additional delay between tests in ms, default 1000\n" +
+ "-u; --update: Don't generate update statements; default do\n" +
+ "-x; --delete: Don't generate delete statements; default do\n" +
+ "-l; --closeChance [int]: Percent chance of closing connection after each commit, default 50\n" +
+ "-s; --skipInitialSelect [int]: Percent chance of skipping each initial select in a transaction, default 25\n" +
+ "-i; --selectNotUpdate [int]: Percent chance of each action in a transaction being a select instead of an update, default 25\n" +
+ "-o; --rollbackChance [int]: Percent chance of rolling back each transaction instead of committing, default 15\n" +
+ " --maxTables [int]: Maximum number of tables per transaction, default 0 (no limit)\n" +
+ " --randomSeed [long]: Seed for the initial random number generator, default none (generate random random seed)\n" +
+ " --sequentialId: Generate sequential IDs instead of random ones (default random)\n" +
+ " --sequentialFirst: Generate alphabetically sequential first names (default completely random) \n" +
""
);
@@ -307,6 +318,8 @@ public class MdbcTestMultiClient implements Runnable {
this.selectInsteadOfUpdatePct = that.selectInsteadOfUpdatePct;
this.rollbackChancePct = that.rollbackChancePct;
this.maxTables = that.maxTables;
+ this.sequentialIds = that.sequentialIds;
+ this.sequentialFirsts = that.sequentialFirsts;
}
private void setRandomSeed(Long randomSeed) {
@@ -350,6 +363,8 @@ public class MdbcTestMultiClient implements Runnable {
// doLog("PersonId = " + rs.getInt("personId") + ", lastname = " + rs.getString("lastname") + ", firstname = " + rs.getString("firstname"));
Employee emp = new Employee(rs.getInt("personId"), rs.getString("lastname"), rs.getString("firstname"), rs.getString("address"), rs.getString("city"));
employeeMap.put(rs.getInt("personId"), emp);
+ if (sequentialIds) updateId(rs.getInt("personId"));
+ if (sequentialFirsts) updateFirst(rs.getString("firstname"));
doLog("Found: " + emp);
}
querySt.close();
@@ -388,7 +403,32 @@ public class MdbcTestMultiClient implements Runnable {
insertStmt.close();
}
- private List<String> chooseTableNames(Random r) {
+ private void updateFirst(String firstName) {
+ if (firstName==null || firstName.length()<2) return;
+ synchronized(currentFirstFirst) {
+// return (char)(65+currentFirstFirst) + "" + (char)(97+currentFirstSecond) + generateLetters(r, 4+r.nextInt(4));
+ int ff = ((int)firstName.charAt(0))-65;
+ int fs = ((int)firstName.charAt(1))-97;
+ if (ff>=26 || ff<0 || fs>=26 || fs<0) return;
+ if ( (ff>currentFirstFirst) || (ff==currentFirstFirst && fs>currentFirstSecond) ) {
+ currentFirstFirst = ff;
+ currentFirstSecond = fs;
+ doLog("Saw " + firstName + ", updating currentFirstName to " + currentFirstFirst + ", " + currentFirstSecond);
+ }
+ }
+
+ }
+
+ private void updateId(int id) {
+ synchronized(currentId) {
+ if (currentId<=id) {
+ currentId = id+1;
+ doLog ("Saw " + id + ", updating current id");
+ }
+ }
+ }
+
+ private List<String> chooseTableNames(Random r) {
if (maxTables<=0 || maxTables>=tableNames.size()) return tableNames;
boolean[] useTable = new boolean[tableNames.size()];
for (int i=0; i<tableNames.size(); i++) useTable[i] = false;
@@ -462,33 +502,63 @@ public class MdbcTestMultiClient implements Runnable {
private String generateInsert(HashMap<Integer, Employee> employeeMap, Random r, String tableName) {
String toRet = null;
- Integer id = null;
- int range = baseIdRange;
- while (id==null) {
- id = baseId + r.nextInt(range);
- if (employeeMap!=null && employeeMap.containsKey(id)) id = null;
- if (employeeMap==null) id+=baseIdRange;
- range+=(baseIdRange/5);
- }
- Employee newEmp = new Employee(id, lastName, Character.toUpperCase(randomLetter(r)) + generateLetters(r, 4+r.nextInt(4)), generateLetters(r, 4).toUpperCase(), generateLetters(r, 4).toUpperCase());
+ Integer id = generateId(employeeMap, r);
+ Employee newEmp = new Employee(id, lastName, generateFirstName(r), generateLetters(r, 4).toUpperCase(), generateLetters(r, 4).toUpperCase());
+// Employee newEmp = new Employee(id, lastName, Character.toUpperCase(randomLetter(r)) + generateLetters(r, 4+r.nextInt(4)), generateLetters(r, 4).toUpperCase(), generateLetters(r, 4).toUpperCase());
toRet = "insert into " + tableName + " values (" + id + ", '" + newEmp.getLastname() + "', '" + newEmp.getFirstname() + "', '" + newEmp.getAddress() + "', '" + newEmp.getCity() + "')";
if (employeeMap!=null) employeeMap.put(id, newEmp);
return toRet;
}
+ private String generateFirstName(Random r) {
+ if (sequentialFirsts) {
+ synchronized(currentFirstFirst) {
+ currentFirstSecond++;
+ if (currentFirstSecond==26) {
+ currentFirstSecond = 0;
+ currentFirstFirst++;
+ if (currentFirstFirst==26) currentFirstFirst=0;
+ }
+ return (char)(65+currentFirstFirst) + "" + (char)(97+currentFirstSecond) + generateLetters(r, 4+r.nextInt(4));
+ }
+ } else {
+ return Character.toUpperCase(randomLetter(r)) + generateLetters(r, 4+r.nextInt(4));
+ }
+ }
+
+ private Integer generateId(HashMap<Integer, Employee> employeeMap, Random r) {
+ Integer toRet = null;
+ if (currentId<0 && baseId>0) currentId = baseId; // setup, only matters if sequentialIds is true
+ int range = baseIdRange; // setup, only matters if sequentialIds is false
+ while (toRet==null) {
+ if (sequentialIds) {
+ synchronized(currentId) {
+ toRet = currentId++;
+ }
+ } else {
+ toRet = baseId + r.nextInt(range);
+ if (employeeMap==null) toRet+=baseIdRange;
+ range+=(baseIdRange/5);
+ }
+ if (employeeMap!=null && employeeMap.containsKey(toRet)) toRet = null;
+ }
+ return toRet;
+ }
+
private String generateUpdate(HashMap<Integer, Employee> employeeMap, Random r, String tableName) {
String toRet = null;
Employee toUpd = chooseTarget(employeeMap, r);
if (toUpd!=null) {
String newFirst = null;
- if (toUpd.getFirstname().length()<=3 || r.nextBoolean()) {
+ if (sequentialFirsts) {
+ newFirst = generateFirstName(r);
+ } else if (toUpd.getFirstname().length()<=3 || r.nextBoolean()) {
newFirst = toUpd.getFirstname() + randomLetter(r);
} else {
newFirst = toUpd.getFirstname().substring(0, toUpd.getFirstname().length()-1);
}
-// toRet = "update " + tableName + " set firstname = '" + newFirst + "' where personid = " + toUpd.getEmpid();
toRet = "update " + tableName + " set firstname = '" + newFirst + "' where personid = " + toUpd.getEmpid() + " and lastname = '" + toUpd.getLastname() + "'";
toUpd.setFirstname(newFirst);
}
@@ -649,7 +719,44 @@ public class MdbcTestMultiClient implements Runnable {
doLog("");
}
+ threadsDone[threadId] = true;
+ if (endInSelect) {
+ doLog("Ending in select to ensure all db's are in sync");
+ while (!allThreadsDone()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ continue;
+ }
+ }
+ doLog("All threads are done. Ending in select");
+ if (connection==null) {
+ try {
+ doLog("Opening new connection");
+ connection = DriverManager.getConnection(connectionString);
+ connection.setAutoCommit(false);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+ for (String tableName : tableNames) {
+ try {
+ Statement querySt = connection.createStatement();
+ ResultSet rs = executeQueryTimed("select * from " + tableName, querySt, false);
+ while (rs.next()) {
+ // doLog("PersonId = " + rs.getInt("personId") + ", lastname = " + rs.getString("lastname") + ", firstname = " + rs.getString("firstname"));
+ Employee emp = new Employee(rs.getInt("personId"), rs.getString("lastname"), rs.getString("firstname"), rs.getString("address"), rs.getString("city"));
+ doLog("Found: " + emp);
+ }
+ querySt.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
if (connection!=null) {
try {
doLog("Closing connection at end");
@@ -662,27 +769,39 @@ public class MdbcTestMultiClient implements Runnable {
doLog("All done.");
}
- private void doLog(String string) {
+ private boolean allThreadsDone() {
+ for (Boolean b: this.threadsDone) {
+ if (!b) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void doLog(String string) {
System.out.println(">> Thread " + threadId + " " + sdf.format(new java.util.Date()) + " >> " + string);
}
- public static void main(String[] args) {
+ public static void main(String[] args) throws InterruptedException {
MdbcTestMultiClient mtc = new MdbcTestMultiClient(args);
mtc.runTests();
}
- private void runTests() {
+ private void runTests() throws InterruptedException {
if (randomSeed==null) {
randomSeed = new Random().nextLong();
}
doLog("Using random seed = " + randomSeed);
Random seedRandom = new Random(randomSeed);
+ this.threadsDone = new boolean[connectionStrings.size()];
+
for (int i=0; i<connectionStrings.size(); i++) {
MdbcTestMultiClient mt = new MdbcTestMultiClient(this, i);
mt.setRandomSeed(seedRandom.nextLong());
Thread t = new Thread(mt);
t.start();
}
+
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
index 745307c..cba699f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
@@ -31,6 +31,8 @@ import org.apache.commons.lang3.tuple.Pair;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
+import org.onap.music.mdbc.tables.MriReference;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.StagingTable;
/**
@@ -134,13 +136,13 @@ public interface DBInterface {
* @throws SQLException if replay cannot occur correctly
* @throws MDBCServiceException
*/
- void replayTransaction(StagingTable digest, Set<Range> ranges) throws SQLException, MDBCServiceException;
+ void replayTransaction(StagingTable digest) throws SQLException, MDBCServiceException;
void disableForeignKeyChecks() throws SQLException;
void enableForeignKeyChecks() throws SQLException;
- void applyTxDigest(StagingTable txDigest, Set<Range> ranges) throws SQLException, MDBCServiceException;
+ void applyTxDigest(StagingTable txDigest) throws SQLException, MDBCServiceException;
Connection getSQLConnection();
@@ -151,7 +153,12 @@ public interface DBInterface {
* @param r
* @param playbackPointer
*/
- public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer);
+ public void updateCheckpointLocations(Range r, Pair<MriReference, MusicTxDigestId> playbackPointer);
+ /**
+ * Get current locations of this database's already applied locations
+ * @return
+ */
+ public Map<Range, Pair<MriReference, MusicTxDigestId>> getCheckpointLocations();
/**
* Initialize the SQL database by creating any tables necessary
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
index 637cb15..b8ac563 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
@@ -45,19 +45,19 @@ import org.onap.music.mdbc.tables.*;
*/
public interface MusicInterface {
class OwnershipReturn{
- private final UUID ownershipId;
+ private final UUID ownershipOpId;
private final String lockId;
private final UUID rangeId;
private final Set<Range> ranges;
private final Dag dag;
- public OwnershipReturn(UUID ownershipId, String ownerId, UUID rangeId, Set<Range> ranges, Dag dag){
- this.ownershipId=ownershipId;
- this.lockId=ownerId;
+ public OwnershipReturn(UUID ownershipOpId, String lockId, UUID rangeId, Set<Range> ranges, Dag dag){
+ this.ownershipOpId=ownershipOpId;
+ this.lockId=lockId;
this.rangeId=rangeId;
this.ranges=ranges;
this.dag=dag;
}
- public String getOwnerId(){
+ public String getLockId(){
return lockId;
}
public UUID getRangeId(){
@@ -65,7 +65,7 @@ public interface MusicInterface {
}
public Set<Range> getRanges(){ return ranges; }
public Dag getDag(){return dag;}
- public UUID getOwnershipId() { return ownershipId; }
+ public UUID getOwnershipId() { return ownershipOpId; }
}
/**
* Get the name of this MusicInterface mixin object.
@@ -181,15 +181,21 @@ public interface MusicInterface {
/**
* Commits the corresponding REDO-log into MUSIC
+ * Transaction is committed -- add all the updates into the REDO-Log in MUSIC
+ *
+ * This officially commits the transaction globally
+ *
+ *
*
* @param partition information related to ownership of partitions, used to verify ownership when commiting the Tx
* @param eventualRanges
* @param transactionDigest digest of the transaction that is being committed into the Redo log in music.
* @param txId id associated with the log being send
* @param progressKeeper data structure that is used to handle to detect failures, and know what to do
+ * @return digest that was created for this transaction commit
* @throws MDBCServiceException
*/
- void commitLog(DatabasePartition partition, Set<Range> eventualRanges, StagingTable transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException;
+ public MusicTxDigestId commitLog(DatabasePartition partition, Set<Range> eventualRanges, StagingTable transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException;
/**
@@ -213,10 +219,11 @@ public interface MusicInterface {
/**
* This function is used to create a new locked row in the MRI table
* @param info the information used to create the row
+ * @param owner owner of the lock for deadlock detection
* @return the new partition object that contain the new information used to create the row
* @throws MDBCServiceException
*/
- DatabasePartition createLockedMRIRow(MusicRangeInformationRow info) throws MDBCServiceException;
+ DatabasePartition createLockedMRIRow(MusicRangeInformationRow info, String owner) throws MDBCServiceException;
/**
* This function is used to create all the required music dependencies
@@ -320,10 +327,13 @@ public interface MusicInterface {
void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException;
String createLock(LockRequest request) throws MDBCServiceException;
+ String createLock(LockRequest request, String ownerId) throws MDBCServiceException;
LockResult acquireLock(LockRequest request, String lockId) throws MDBCServiceException;
void releaseLocks(Map<UUID, LockResult> newLocks) throws MDBCServiceException;
+ public void releaseAllLocksForOwner(String owner, String keyspace, String table) throws MDBCServiceException;
+
/**
* Combine previous musicrangeinformation rows for new partition, if necessary
*
@@ -331,13 +341,26 @@ public interface MusicInterface {
*
* @param currentlyOwned
* @param locksForOwnership
- * @param ownershipId
+ * @param ownershipOpId
+ * @param ownerId
* @return
* @throws MDBCServiceException
*/
- OwnershipReturn mergeLatestRowsIfNecessary(Dag currentlyOwned, Map<UUID, LockResult> locksForOwnership, UUID ownershipId)
- throws MDBCServiceException;
-
+ OwnershipReturn mergeLatestRowsIfNecessary(Dag currentlyOwned, Map<UUID, LockResult> locksForOwnership,
+ UUID ownershipOpId, String ownerId) throws MDBCServiceException;
+
+ /**
+ * If this connection is using fewer ranges than what is owned in the current partition, split
+ * the partition to avoid a universal partition being passed around.
+ *
+ * This will follow "most recently used" policy
+ * @param partition2 partition that this transaction currently owns
+ * @param rangesUsed set of ranges that is the minimal required for this transaction
+ * @throws MDBCServiceException
+ */
+ public DatabasePartition splitPartitionIfNecessary(DatabasePartition partition, Set<Range> rangesUsed,
+ String ownerId) throws MDBCServiceException;
+
/**
* Create ranges in MRI table, if not already present
* @param range to add into mri table
@@ -349,8 +372,9 @@ public interface MusicInterface {
* This is an eventual operation for minimal performance hits
* @param r
* @param playbackPointer
+ * @throws MDBCServiceException
*/
- public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer);
+ public void updateCheckpointLocations(Range r, Pair<MriReference, MusicTxDigestId> playbackPointer);
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index 5581573..a24ada2 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -51,12 +51,15 @@ import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.exceptions.MusicLockingException;
import org.onap.music.exceptions.MusicQueryException;
import org.onap.music.exceptions.MusicServiceException;
+import org.onap.music.lockingservice.cassandra.LockType;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.main.MusicCore;
+import org.onap.music.main.CorePropertiesLoader;
import org.onap.music.main.ResultType;
import org.onap.music.main.ReturnType;
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.MDBCUtils;
+import org.onap.music.mdbc.MdbcConnection;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.StateManager;
import org.onap.music.mdbc.TableInfo;
@@ -69,6 +72,7 @@ import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.RangeDependency;
import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.TxCommitProgress;
+import org.onap.music.service.impl.MusicCassaCore;
/**
* This class provides the methods that MDBC needs to access Cassandra directly in order to provide persistence
@@ -104,6 +108,8 @@ public class MusicMixin implements MusicInterface {
public static final String KEY_TIMEOUT = "mdbc_timeout";
/** The property name to use to provide a flag indicating if compression is required */
public static final String KEY_COMPRESSION = "mdbc_compression";
+ /** The property name to use to provide a flag indicating if mri row splits is allowable */
+ public static final String KEY_SPLIT = "partition_splitting";
/** Namespace for the tables in MUSIC (Cassandra) */
public static final String DEFAULT_MUSIC_NAMESPACE = "namespace";
/** The default property value to use for the Cassandra IP address. */
@@ -197,9 +203,9 @@ public class MusicMixin implements MusicInterface {
private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>());
private StateManager stateManager;
private boolean useCompression;
+ private boolean splitAllowed;
public MusicMixin() {
-
//this.logger = null;
this.musicAddress = null;
this.music_ns = null;
@@ -209,6 +215,8 @@ public class MusicMixin implements MusicInterface {
}
public MusicMixin(StateManager stateManager, String mdbcServerName, Properties info) throws MDBCServiceException {
+ CorePropertiesLoader.loadProperties(info);
+
// Default values -- should be overridden in the Properties
// Default to using the host_ids of the various peers as the replica IDs (this is probably preferred)
this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS);
@@ -237,6 +245,9 @@ public class MusicMixin implements MusicInterface {
String s = info.getProperty(KEY_MUSIC_RFACTOR);
this.music_rfactor = (s == null) ? DEFAULT_MUSIC_RFACTOR : Integer.parseInt(s);
+ String split = info.getProperty(KEY_SPLIT);
+ this.splitAllowed = (split == null) ? true: Boolean.parseBoolean(split);
+
initializeMetricTables();
commitExecutorThreads = Executors.newFixedThreadPool(4);
}
@@ -283,6 +294,8 @@ public class MusicMixin implements MusicInterface {
throw new MDBCServiceException("Error creating namespace: "+keyspace+". Internal error:"+e.getErrorMessage(),
e);
}
+ } catch (MusicQueryException e) {
+ throw new MDBCServiceException(e);
}
}
@@ -1118,22 +1131,19 @@ public class MusicMixin implements MusicInterface {
* Build a preparedQueryObject that appends a transaction to the mriTable
* @param mriTable
* @param uuid
- * @param table
* @param redoUuid
* @return
*/
- private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, String table, UUID redoUuid){
+ private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, UUID redoUuid){
PreparedQueryObject query = new PreparedQueryObject();
StringBuilder appendBuilder = new StringBuilder();
appendBuilder.append("UPDATE ")
.append(music_ns)
.append(".")
.append(mriTable)
- .append(" SET txredolog = txredolog +[('")
- .append(table)
- .append("',")
+ .append(" SET txredolog = txredolog +[")
.append(redoUuid)
- .append(")] WHERE rangeid = ")
+ .append("] WHERE rangeid = ")
.append(uuid)
.append(";");
query.appendQueryString(appendBuilder.toString());
@@ -1198,44 +1208,19 @@ public class MusicMixin implements MusicInterface {
return pendingRows;
}
- private List<Range> lockRow(LockRequest request,Map.Entry<UUID, Set<Range>> pending,Map<UUID, String> currentLockRef,
- String fullyQualifiedKey, String lockId, List<Range> pendingToLock,
- Map<UUID, LockResult> alreadyHeldLocks)
- throws MDBCServiceException{
- List<Range> newRanges = new ArrayList<>();
- String newFullyQualifiedKey = music_ns + "." + musicRangeInformationTableName + "." + pending.getKey().toString();
- String newLockId;
- boolean success;
- if (currentLockRef.containsKey(pending.getKey())) {
- newLockId = currentLockRef.get(pending.getKey());
- success = (MusicCore.whoseTurnIsIt(newFullyQualifiedKey) == newLockId);
- } else {
- newLockId = MusicCore.createLockReference(newFullyQualifiedKey);
- ReturnType newLockReturn = acquireLock(fullyQualifiedKey, lockId);
- success = newLockReturn.getResult().compareTo(ResultType.SUCCESS) == 0;
- }
- if (!success) {
- pendingToLock.addAll(pending.getValue());
- currentLockRef.put(pending.getKey(), newLockId);
- } else {
- if(alreadyHeldLocks.containsKey(pending.getKey())){
- throw new MDBCServiceException("Adding key that already exist");
- }
- alreadyHeldLocks.put(pending.getKey(),new LockResult(pending.getKey(), newLockId, true,
- pending.getValue()));
- newRanges.addAll(pending.getValue());
- }
- return newRanges;
- }
-
private boolean isDifferent(NavigableMap<UUID, List<Range>> previous, NavigableMap<UUID, List<Range>> current){
return previous.keySet().equals(current.keySet());
}
- protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException {
+ protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition, String ownerId)
+ throws MDBCServiceException {
UUID mriIndex = partition.getMRIIndex();
String lockId;
- lockId = MusicCore.createLockReference(fullyQualifiedKey);
+ try {
+ lockId = MusicCore.createLockReference(fullyQualifiedKey, ownerId);
+ } catch (MusicLockingException e1) {
+ throw new MDBCServiceException(e1);
+ }
if(lockId==null) {
throw new MDBCServiceException("lock reference is null");
}
@@ -1263,15 +1248,15 @@ public class MusicMixin implements MusicInterface {
return lockId;
}
- protected void changeIsLatestToMRI(MusicRangeInformationRow row, boolean isLatest, LockResult lock) throws MDBCServiceException{
+ protected void changeIsLatestToMRI(UUID mrirow, boolean isLatest, String lockref) throws MDBCServiceException{
- if(lock == null)
+ if(lockref == null)
return;
- PreparedQueryObject appendQuery = createChangeIsLatestToMriQuery(musicRangeInformationTableName, row.getPartitionIndex(),
+ PreparedQueryObject appendQuery = createChangeIsLatestToMriQuery(musicRangeInformationTableName, mrirow,
musicTxDigestTableName, isLatest);
- ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, row.getPartitionIndex().toString(),
+ ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, mrirow.toString(),
appendQuery,
- lock.getLockId()
+ lockref
, null);
if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
logger.error(EELFLoggerDelegate.errorLogger, "Error when executing change isLatest operation with return type: "+returnType.getMessage());
@@ -1289,12 +1274,8 @@ public class MusicMixin implements MusicInterface {
addTxDigest(digestId, serializedTransactionDigest);
}
- /**
- * Writes the transaction information to metric's txDigest and musicRangeInformation table
- * This officially commits the transaction globally
- */
@Override
- public void commitLog(DatabasePartition partition,Set<Range> eventualRanges, StagingTable transactionDigest,
+ public MusicTxDigestId commitLog(DatabasePartition partition,Set<Range> eventualRanges, StagingTable transactionDigest,
String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException {
// first deal with commit for eventually consistent tables
@@ -1302,18 +1283,18 @@ public class MusicMixin implements MusicInterface {
if(partition==null){
logger.warn("Trying tcommit log with null partition");
- return;
+ return null;
}
Set<Range> snapshot = partition.getSnapshot();
if(snapshot==null || snapshot.isEmpty()){
logger.warn("Trying to commit log with empty ranges");
- return;
+ return null;
}
//Add creation type of transaction digest
if(transactionDigest == null || transactionDigest.isEmpty()) {
- return;
+ return null;
}
UUID mriIndex = partition.getMRIIndex();
@@ -1325,7 +1306,7 @@ public class MusicMixin implements MusicInterface {
}
- final MusicTxDigestId digestId = new MusicTxDigestId(MDBCUtils.generateUniqueKey(), -1);
+ final MusicTxDigestId digestId = new MusicTxDigestId(mriIndex, MDBCUtils.generateUniqueKey(), -1);
Callable<Boolean> insertDigestCallable =()-> {
try {
createAndAddTxDigest(transactionDigest,digestId.transactionId);
@@ -1337,8 +1318,7 @@ public class MusicMixin implements MusicInterface {
};
Callable<Boolean> appendCallable=()-> {
try {
- appendToRedoLog(music_ns, mriIndex, digestId.transactionId, lockId, musicTxDigestTableName,
- musicRangeInformationTableName);
+ appendToRedoLog(music_ns, mriIndex, digestId.transactionId, lockId, musicRangeInformationTableName);
return true;
} catch (MDBCServiceException e) {
logger.error(EELFLoggerDelegate.errorLogger, "Error creating and pushing tx digest to music",e);
@@ -1349,9 +1329,7 @@ public class MusicMixin implements MusicInterface {
Future<Boolean> appendResultFuture = commitExecutorThreads.submit(appendCallable);
Future<Boolean> digestFuture = commitExecutorThreads.submit(insertDigestCallable);
try {
- //Boolean appendResult = appendResultFuture.get();
- Boolean digestResult = digestFuture.get();
- if(/*!appendResult ||*/ !digestResult){
+ if(!appendResultFuture.get() || !digestFuture.get()){
logger.error(EELFLoggerDelegate.errorLogger, "Error appending to log or adding tx digest");
throw new MDBCServiceException("Error appending to log or adding tx digest");
}
@@ -1364,21 +1342,8 @@ public class MusicMixin implements MusicInterface {
if (progressKeeper != null) {
progressKeeper.setRecordId(txId, digestId);
}
- Set<Range> ranges = partition.getSnapshot();
- for(Range r : ranges) {
- Map<Range, Pair<MriReference, Integer>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied();
- if(!alreadyApplied.containsKey(r)){
- throw new MDBCServiceException("already applied data structure was not updated correctly and range "
- +r+" is not contained");
- }
- Pair<MriReference, Integer> rowAndIndex = alreadyApplied.get(r);
- MriReference key = rowAndIndex.getKey();
- if(!mriIndex.equals(key.index)){
- throw new MDBCServiceException("already applied data structure was not updated correctly and range "+
- r+" is not pointing to row: "+mriIndex.toString());
- }
- alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), rowAndIndex.getValue()+1));
- }
+
+ return digestId;
}
private void filterAndAddEventualTxDigest(Set<Range> eventualRanges,
@@ -1477,20 +1442,18 @@ public class MusicMixin implements MusicInterface {
static public MusicRangeInformationRow getMRIRowFromCassandraRow(Row newRow){
UUID partitionIndex = newRow.getUUID("rangeid");
- List<TupleValue> log = newRow.getList("txredolog",TupleValue.class);
+ List<UUID> log = newRow.getList("txredolog",UUID.class);
List<MusicTxDigestId> digestIds = new ArrayList<>();
int index=0;
- for(TupleValue t: log){
- //final String tableName = t.getString(0);
- final UUID id = t.getUUID(1);
- digestIds.add(new MusicTxDigestId(partitionIndex,id,index++));
+ for(UUID u: log){
+ digestIds.add(new MusicTxDigestId(partitionIndex,u,index++));
}
Set<Range> partitions = new HashSet<>();
Set<String> tables = newRow.getSet("keys",String.class);
for (String table:tables){
partitions.add(new Range(table));
}
- return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex, ""),
+ return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex),
digestIds, newRow.getBool("islatest"), newRow.getSet("prevmrirows", UUID.class));
}
@@ -1564,7 +1527,7 @@ public class MusicMixin implements MusicInterface {
fields.append("prevmrirows set<uuid>, ");
fields.append("islatest boolean, ");
//TODO: Frozen is only needed for old versions of cassandra, please update correspondingly
- fields.append("txredolog list<frozen<tuple<text,uuid>>> ");
+ fields.append("txredolog list<uuid> ");
String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));",
namespace, tableName, fields, priKey);
try {
@@ -1577,14 +1540,15 @@ public class MusicMixin implements MusicInterface {
@Override
- public DatabasePartition createLockedMRIRow(MusicRangeInformationRow info) throws MDBCServiceException {
+ public DatabasePartition createLockedMRIRow(MusicRangeInformationRow info, String ownerId)
+ throws MDBCServiceException {
DatabasePartition newPartition = info.getDBPartition();
String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+newPartition.getMRIIndex().toString();
String lockId;
int counter=0;
do {
- lockId = createAndAssignLock(fullyQualifiedMriKey, newPartition);
+ lockId = createAndAssignLock(fullyQualifiedMriKey, newPartition, ownerId);
//TODO: fix this retry logic
} while ((lockId ==null||lockId.isEmpty())&&(counter++<3));
if (lockId == null || lockId.isEmpty()) {
@@ -1698,15 +1662,12 @@ public class MusicMixin implements MusicInterface {
@Override
public void appendToRedoLog(UUID MRIIndex, String lockId, MusicTxDigestId newRecord) throws MDBCServiceException {
logger.debug("Appending to redo log for partition " + MRIIndex + " txId=" + newRecord.transactionId);
- appendToRedoLog(music_ns,MRIIndex,newRecord.transactionId,lockId,musicTxDigestTableName,
- musicRangeInformationTableName);
+ appendToRedoLog(music_ns,MRIIndex,newRecord.transactionId,lockId,musicRangeInformationTableName);
}
- public void appendToRedoLog(String musicNamespace, UUID MRIIndex, UUID transactionId, String lockId,
- String musicTxDigestTableName, String musicRangeInformationTableName)
+ public void appendToRedoLog(String musicNamespace, UUID MRIIndex, UUID transactionId, String lockId, String musicRangeInformationTableName)
throws MDBCServiceException{
- PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MRIIndex,
- musicTxDigestTableName, transactionId);
+ PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MRIIndex, transactionId);
ReturnType returnType = MusicCore.criticalPut(musicNamespace, musicRangeInformationTableName, MRIIndex.toString(),
appendQuery, lockId, null);
//returnType.getExecutionInfo()
@@ -1805,13 +1766,9 @@ public class MusicMixin implements MusicInterface {
}
public static void createMusicMdbcCheckpointTable(String namespace, String checkpointTable) throws MDBCServiceException {
- String priKey = "txid";
- StringBuilder fields = new StringBuilder();
- fields.append("txid uuid, ");
- fields.append("compressed boolean, ");
- fields.append("transactiondigest blob ");//notice lack of ','
String cql =
- String.format("CREATE TABLE IF NOT EXISTS %s.%s (mdbcnode UUID, mridigest UUID, digestindex int, PRIMARY KEY (mdbcnode));",
+ String.format("CREATE TABLE IF NOT EXISTS %s.%s (mdbcnode text, range text, mridigest UUID,"
+ + "digestid UUID, PRIMARY KEY (mdbcnode, range));",
namespace, checkpointTable);
try {
executeMusicWriteQuery(namespace,checkpointTable,cql);
@@ -1844,6 +1801,8 @@ public class MusicMixin implements MusicInterface {
} catch (MusicServiceException e) {
logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for digest id "+digestId.toString()+ "with error "+e.getErrorMessage());
throw new MDBCServiceException("Transaction Digest serialization for digest id "+digestId.toString(), e);
+ } catch (MusicQueryException e) {
+ throw new MDBCServiceException(e);
}
}
@@ -1870,6 +1829,8 @@ public class MusicMixin implements MusicInterface {
} catch (MusicServiceException e) {
logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.transactionId.toString()+ "with error "+e.getErrorMessage());
throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.transactionId.toString(), e);
+ } catch (MusicQueryException e) {
+ throw new MDBCServiceException(e);
}
}
@@ -2027,7 +1988,7 @@ public class MusicMixin implements MusicInterface {
private void unlockKeyInMusic(String table, String key, String lockref) throws MDBCServiceException {
String fullyQualifiedKey= music_ns+"."+ table+"."+key;
try {
- MusicCore.voluntaryReleaseLock(fullyQualifiedKey,lockref);
+ MusicCassaCore.getInstance().voluntaryReleaseLock(fullyQualifiedKey,lockref);
} catch (MusicLockingException e) {
throw new MDBCServiceException(e.getMessage(), e);
}
@@ -2066,6 +2027,15 @@ public class MusicMixin implements MusicInterface {
}
}
+ @Override
+ public void releaseAllLocksForOwner(String ownerId, String keyspace, String table) throws MDBCServiceException {
+ try {
+ MusicCore.releaseAllLocksForOwner(ownerId, keyspace, table);
+ } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
+ throw new MDBCServiceException(e);
+ }
+ }
+
/**
* Get a list of ranges and their range dependencies
* @param range
@@ -2087,9 +2057,19 @@ public class MusicMixin implements MusicInterface {
@Override
public String createLock(LockRequest request) throws MDBCServiceException{
+ return createLock(request, null);
+ }
+
+ @Override
+ public String createLock(LockRequest request, String ownerId) throws MDBCServiceException{
String fullyQualifiedKey= music_ns+"."+ musicRangeInformationTableName + "." + request.getId();
boolean isWrite = (request.getLockType()==SQLOperationType.WRITE);
- String lockId = MusicCore.createLockReference(fullyQualifiedKey, isWrite);
+ String lockId;
+ try {
+ lockId = MusicCore.createLockReference(fullyQualifiedKey, isWrite?LockType.WRITE:LockType.READ, ownerId);
+ } catch (MusicLockingException e) {
+ throw new MDBCServiceException(e);
+ }
return lockId;
}
@@ -2117,7 +2097,8 @@ public class MusicMixin implements MusicInterface {
* @param locks
* @throws MDBCServiceException
*/
- private void recoverFromFailureAndUpdateDag(Dag latestDag, Map<UUID,LockResult> locks) throws MDBCServiceException {
+ private void recoverFromFailureAndUpdateDag(Dag latestDag, Map<UUID, LockResult> locks, String ownerId)
+ throws MDBCServiceException {
Pair<Set<Range>, Set<DagNode>> rangesAndDependents = latestDag.getIncompleteRangesAndDependents();
if(rangesAndDependents.getKey()==null || rangesAndDependents.getKey().size()==0 ||
rangesAndDependents.getValue()==null || rangesAndDependents.getValue().size() == 0){
@@ -2129,8 +2110,9 @@ public class MusicMixin implements MusicInterface {
prevPartitions.add(dagnode.getId());
}
- MusicRangeInformationRow r = createAndAssignLock(rangesAndDependents.getKey(), prevPartitions);
- locks.put(r.getPartitionIndex(),new LockResult(r.getPartitionIndex(),r.getDBPartition().getLockId(),true,rangesAndDependents.getKey()));
+ MusicRangeInformationRow r = createAndAssignLock(rangesAndDependents.getKey(), prevPartitions, ownerId);
+ locks.put(r.getPartitionIndex(), new LockResult(true, r.getPartitionIndex(), r.getDBPartition().getLockId(),
+ true, rangesAndDependents.getKey()));
latestDag.addNewNode(r,new ArrayList<>(rangesAndDependents.getValue()));
}
@@ -2140,7 +2122,10 @@ public class MusicMixin implements MusicInterface {
List<MusicRangeInformationRow> returnInfo = new ArrayList<>();
List<DagNode> toDisable = latestDag.getOldestDoubles();
for(DagNode node : toDisable){
- changeIsLatestToMRI(node.getRow(),false,locks.get(node.getId()));
+ LockResult lockToDisable = locks.get(node.getId());
+ if (lockToDisable!=null) {
+ changeIsLatestToMRI(node.getRow().getPartitionIndex(),false,lockToDisable.getLockId());
+ }
latestDag.setIsLatest(node.getId(),false);
returnInfo.add(node.getRow());
}
@@ -2161,13 +2146,14 @@ public class MusicMixin implements MusicInterface {
}
@Override
- public OwnershipReturn mergeLatestRowsIfNecessary(Dag currentlyOwned, Map<UUID, LockResult> locksForOwnership, UUID ownershipId) throws MDBCServiceException {
- recoverFromFailureAndUpdateDag(currentlyOwned,locksForOwnership);
+ public OwnershipReturn mergeLatestRowsIfNecessary(Dag currentlyOwned, Map<UUID, LockResult> locksForOwnership,
+ UUID ownershipOpId, String ownerId) throws MDBCServiceException {
+ recoverFromFailureAndUpdateDag(currentlyOwned,locksForOwnership, ownerId);
if (locksForOwnership.keySet().size()==1) {
//reuse if overlapping single partition, no merge necessary
for (UUID uuid: locksForOwnership.keySet()) {
- return new OwnershipReturn(ownershipId, locksForOwnership.get(uuid).getLockId(), uuid,
+ return new OwnershipReturn(ownershipOpId, locksForOwnership.get(uuid).getLockId(), uuid,
currentlyOwned.getNode(uuid).getRangeSet(), currentlyOwned);
}
}
@@ -2178,19 +2164,70 @@ public class MusicMixin implements MusicInterface {
Set<Range> ranges = extractRangesToOwn(currentlyOwned, locksForOwnership.keySet());
- MusicRangeInformationRow createdRow = createAndAssignLock(ranges, locksForOwnership.keySet());
+ MusicRangeInformationRow createdRow = createAndAssignLock(ranges, locksForOwnership.keySet(), ownerId);
currentlyOwned.addNewNodeWithSearch(createdRow, ranges);
changed = setReadOnlyAnyDoubleRow(currentlyOwned, locksForOwnership);
releaseLocks(locksForOwnership);
- return new OwnershipReturn(ownershipId, createdRow.getDBPartition().getLockId(), createdRow.getPartitionIndex(),
+ return new OwnershipReturn(ownershipOpId, createdRow.getDBPartition().getLockId(), createdRow.getPartitionIndex(),
createdRow.getDBPartition().getSnapshot(), currentlyOwned);
}
+
+
+ @Override
+ public DatabasePartition splitPartitionIfNecessary(DatabasePartition partition, Set<Range> rangesUsed,
+ String ownerId) throws MDBCServiceException {
+ if (!this.splitAllowed) {
+ return partition;
+ }
+ Set<Range> rangesOwned = partition.getSnapshot();
+ if (rangesOwned==null || rangesUsed==null) {
+ return partition;
+ }
+ if (!rangesOwned.containsAll(rangesUsed)) {
+ throw new MDBCServiceException("Transaction was unable to acquire all necessary ranges.");
+ }
+
+ if (rangesUsed.containsAll(rangesOwned)) {
+ //using all ranges in this partition
+ return partition;
+ }
+
+ //split partition
+ logger.info(EELFLoggerDelegate.applicationLogger, "Full partition not being used need (" + rangesUsed
+ +") and own (" + rangesOwned + ", splitting the partition");
+ Set<UUID> prevPartitions = new HashSet<>();
+ prevPartitions.add(partition.getMRIIndex());
+ MusicRangeInformationRow usedRow = createAndAssignLock(rangesUsed, prevPartitions, ownerId);
+ rangesOwned.removeAll(rangesUsed);
+ Set<Range> rangesNotUsed = rangesOwned;
+ MusicRangeInformationRow unusedRow = createAndAssignLock(rangesNotUsed, prevPartitions, ownerId);
+
+ changeIsLatestToMRI(partition.getMRIIndex(), false, partition.getLockId());
+
+ /*
+ Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied();
+ for (Range range: rangesUsed) {
+ alreadyApplied.put(range, Pair.of(new MriReference(usedRow.getPartitionIndex()), -1));
+ }
+ for (Range range: rangesNotUsed) {
+ alreadyApplied.put(range, Pair.of(new MriReference(unusedRow.getPartitionIndex()), -1));
+ }
+ */
- private MusicRangeInformationRow createAndAssignLock(Set<Range> ranges, Set<UUID> prevPartitions) throws MDBCServiceException {
+ //release/update old partition info
+ relinquish(unusedRow.getDBPartition());
+ relinquish(partition);
+
+ return usedRow.getDBPartition();
+ }
+
+
+ private MusicRangeInformationRow createAndAssignLock(Set<Range> ranges, Set<UUID> prevPartitions, String ownerId)
+ throws MDBCServiceException {
UUID newUUID = MDBCUtils.generateTimebasedUniqueKey();
DatabasePartition newPartition = new DatabasePartition(ranges,newUUID,null);
MusicRangeInformationRow row = new MusicRangeInformationRow(newPartition, true, prevPartitions);
- createLockedMRIRow(row);
+ createLockedMRIRow(row, ownerId);
return row;
}
@@ -2269,7 +2306,7 @@ public class MusicMixin implements MusicInterface {
} catch (MDBCServiceException e) {
logger.error("Error relinquishing lock, will use timeout to solve");
}
- partition.setLockId("");
+ partition.setLockId(null);
}
}
@@ -2509,25 +2546,27 @@ public class MusicMixin implements MusicInterface {
}
MusicRangeInformationRow mriRow =
- createAndAssignLock(new HashSet<Range>(Arrays.asList(rangeToCreate)), new HashSet<UUID>());
+ createAndAssignLock(new HashSet<Range>(Arrays.asList(rangeToCreate)), new HashSet<UUID>(), "");
//TODO: should make sure we didn't create 2 new rows simultaneously, while we still own the lock
unlockKeyInMusic(musicRangeInformationTableName, mriRow.getPartitionIndex().toString(),
mriRow.getDBPartition().getLockId());
}
@Override
- public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) {
- String cql = String.format("INSERT INTO %s.%s (mdbcnode, mridigest, digestindex) VALUES ("
- + this.myId + ", " + playbackPointer.getLeft() + ", " + playbackPointer.getRight() + ");",
- music_ns, this.musicMdbcCheckpointsTableName);
+ public void updateCheckpointLocations(Range r, Pair<MriReference, MusicTxDigestId> playbackPointer) {
+ String cql = String.format("INSERT INTO %s.%s (mdbcnode, range, mridigest, digestid) VALUES ('%s', '%s', %s, %s);",
+ music_ns, this.musicMdbcCheckpointsTableName, this.stateManager.getMdbcServerName(), r.getTable(),
+ playbackPointer.getLeft().getIndex(), playbackPointer.getRight().transactionId);
PreparedQueryObject pQueryObject = new PreparedQueryObject();
pQueryObject.appendQueryString(cql);
try {
MusicCore.nonKeyRelatedPut(pQueryObject,"eventual");
} catch (MusicServiceException e) {
logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to update the checkpoint location", e);
+ } catch (MusicQueryException e) {
+ logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to update the checkpoint location with query", e);
}
}
-
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
index ec91ceb..b544b94 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
@@ -35,9 +35,9 @@ import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONObject;
-import org.json.JSONTokener;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.Configuration;
@@ -45,7 +45,8 @@ import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
import org.onap.music.mdbc.query.SQLOperation;
-import org.onap.music.mdbc.query.SQLOperationType;
+import org.onap.music.mdbc.tables.MriReference;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.Operation;
import org.onap.music.mdbc.tables.StagingTable;
import net.sf.jsqlparser.JSQLParserException;
@@ -87,7 +88,7 @@ public class MySQLMixin implements DBInterface {
+ "CONNECTION_ID INT, PRIMARY KEY (IX));";
private static final String CKPT_TBL = "MDBC_CHECKPOINT";
private static final String CREATE_CKPT_SQL =
- "CREATE TABLE IF NOT EXISTS " + CKPT_TBL + " (RANGENAME VARCHAR(64) PRIMARY KEY, MRIROW VARCHAR(36), DIGESTINDEX INT);";
+ "CREATE TABLE IF NOT EXISTS " + CKPT_TBL + " (RANGENAME VARCHAR(64) PRIMARY KEY, MRIROW VARCHAR(36), DIGESTID VARCHAR(36));";
private final MusicInterface mi;
private final int connId;
@@ -187,7 +188,7 @@ public class MySQLMixin implements DBInterface {
String dbname = "mdbc"; // default name
try {
Statement stmt = conn.createStatement();
- ResultSet rs = stmt.executeQuery("SELECT DATABASE() AS DB");
+ ResultSet rs = stmt.executeQuery("SELECT UPPER(DATABASE()) AS DB");
if (rs.next()) {
dbname = rs.getString("DB");
}
@@ -214,7 +215,7 @@ public class MySQLMixin implements DBInterface {
public Set<String> getSQLTableSet() {
Set<String> set = new TreeSet<String>();
String sql =
- "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
+ "SELECT CONCAT(UPPER(TABLE_SCHEMA), '.', UPPER(TABLE_NAME)) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE';";
try {
Statement stmt = jdbcConn.createStatement();
ResultSet rs = stmt.executeQuery(sql);
@@ -234,7 +235,7 @@ public class MySQLMixin implements DBInterface {
public Set<Range> getSQLRangeSet() {
Set<String> set = new TreeSet<String>();
String sql =
- "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
+ "SELECT CONCAT(UPPER(TABLE_SCHEMA), '.', UPPER(TABLE_NAME)) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE';";
try {
Statement stmt = jdbcConn.createStatement();
ResultSet rs = stmt.executeQuery(sql);
@@ -249,7 +250,10 @@ public class MySQLMixin implements DBInterface {
logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set);
Set<Range> rangeSet = new HashSet<>();
for (String table : set) {
- rangeSet.add(new Range(table));
+ if (!getReservedTblNames().contains(table)) {
+ // Don't create triggers for the table the triggers write into!!!
+ rangeSet.add(new Range(table));
+ }
}
return rangeSet;
}
@@ -814,15 +818,31 @@ public class MySQLMixin implements DBInterface {
private ArrayList<String> getMusicKey(String tbl, String cmd, String sql) {
ArrayList<String> musicKeys = new ArrayList<String>();
/*
- * if (cmd.equalsIgnoreCase("insert")) { //create key, return key musicKeys.add(msm.generatePrimaryKey()); }
- * else if (cmd.equalsIgnoreCase("update") || cmd.equalsIgnoreCase("delete")) { try {
- * net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql); String where; if (stmt instanceof
- * Update) { where = ((Update) stmt).getWhere().toString(); } else if (stmt instanceof Delete) { where =
- * ((Delete) stmt).getWhere().toString(); } else { System.err.println("Unknown type: " +stmt.getClass()); where
- * = ""; } ResultSet rs = executeSQLRead("SELECT * FROM " + tbl + " WHERE " + where); musicKeys =
- * msm.getMusicKeysWhere(tbl, Utils.parseResults(getTableInfo(tbl), rs)); } catch (JSQLParserException e) {
- *
- * e.printStackTrace(); } catch (SQLException e) { //Not a valid sql query e.printStackTrace(); } }
+ if (cmd.equalsIgnoreCase("insert")) {
+ //create key, return key
+ musicKeys.add(msm.generatePrimaryKey());
+ } else if (cmd.equalsIgnoreCase("update") || cmd.equalsIgnoreCase("delete")) {
+ try {
+ net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql);
+ String where;
+ if (stmt instanceof Update) {
+ where = ((Update) stmt).getWhere().toString();
+ } else if (stmt instanceof Delete) {
+ where = ((Delete) stmt).getWhere().toString();
+ } else {
+ System.err.println("Unknown type: " +stmt.getClass());
+ where = "";
+ }
+ ResultSet rs = executeSQLRead("SELECT * FROM " + tbl + " WHERE " + where);
+ musicKeys = msm.getMusicKeysWhere(tbl, Utils.parseResults(getTableInfo(tbl), rs));
+ } catch (JSQLParserException e) {
+
+ e.printStackTrace();
+ } catch (SQLException e) {
+ //Not a valid sql query
+ e.printStackTrace();
+ }
+ }
*/
return musicKeys;
}
@@ -877,7 +897,7 @@ public class MySQLMixin implements DBInterface {
* @param transaction - base 64 encoded, serialized digest
* @throws MDBCServiceException
*/
- public void replayTransaction(StagingTable transaction, Set<Range> ranges)
+ public void replayTransaction(StagingTable transaction)
throws SQLException, MDBCServiceException {
boolean autocommit = jdbcConn.getAutoCommit();
jdbcConn.setAutoCommit(false);
@@ -885,7 +905,6 @@ public class MySQLMixin implements DBInterface {
ArrayList<Operation> opList = transaction.getOperationList();
for (Operation op : opList) {
- if (Range.overlaps(ranges, op.getTable())) {
try {
replayOperationIntoDB(jdbcStmt, op);
} catch (SQLException | MDBCServiceException e) {
@@ -895,7 +914,6 @@ public class MySQLMixin implements DBInterface {
jdbcConn.rollback();
throw e;
}
- }
}
clearReplayedOperations(jdbcStmt);
@@ -920,8 +938,8 @@ public class MySQLMixin implements DBInterface {
}
@Override
- public void applyTxDigest(StagingTable txDigest, Set<Range> ranges) throws SQLException, MDBCServiceException {
- replayTransaction(txDigest, ranges);
+ public void applyTxDigest(StagingTable txDigest) throws SQLException, MDBCServiceException {
+ replayTransaction(txDigest);
}
/**
@@ -939,17 +957,7 @@ public class MySQLMixin implements DBInterface {
ArrayList<String> cols = new ArrayList<String>();
ArrayList<Object> vals = new ArrayList<Object>();
- Iterator<String> colIterator = jsonOp.keys();
- while (colIterator.hasNext()) {
- String col = colIterator.next();
- // FIXME: should not explicitly refer to cassandramixin
- if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) {
- // reserved name
- continue;
- }
- cols.add(col);
- vals.add(jsonOp.get(col));
- }
+ constructColValues(jsonOp, cols, vals);
// build and replay the queries
StringBuilder sql = constructSQL(op, cols, vals);
@@ -965,7 +973,11 @@ public class MySQLMixin implements DBInterface {
logger.warn("Error Replaying operation: " + sql.toString()
+ "; Replacing insert/replace/viceversa and replaying ");
- buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals);
+ try {
+ buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals);
+ } catch (Exception e) {
+ logger.warn(" Error replaying inverse operation; " + sql + "Ignore the exception");
+ }
}
} catch (SQLException sqlE) {
// This applies for replaying transactions involving Eventually Consistent tables
@@ -977,6 +989,20 @@ public class MySQLMixin implements DBInterface {
}
}
+ public void constructColValues(JSONObject jsonOp, ArrayList<String> cols,
+ ArrayList<Object> vals) {
+ Iterator<String> colIterator = jsonOp.keys();
+ while(colIterator.hasNext()) {
+ String col = colIterator.next();
+ //FIXME: should not explicitly refer to cassandramixin
+ if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) {
+ //reserved name
+ continue;
+ }
+ cols.add(col);
+ vals.add(jsonOp.get(col));
+ }
+ }
protected void buildAndExecuteSQLInverse(Statement jdbcStmt, Operation op, ArrayList<String> cols,
ArrayList<Object> vals) throws SQLException, MDBCServiceException {
@@ -999,7 +1025,7 @@ public class MySQLMixin implements DBInterface {
* @throws MDBCServiceException
*/
- protected StringBuilder constructSQLInverse(Operation op, ArrayList<String> cols, ArrayList<Object> vals)
+ public StringBuilder constructSQLInverse(Operation op, ArrayList<String> cols, ArrayList<Object> vals)
throws MDBCServiceException {
StringBuilder sqlInverse = null;
switch (op.getOperationType()) {
@@ -1015,7 +1041,7 @@ public class MySQLMixin implements DBInterface {
return sqlInverse;
}
- protected StringBuilder constructSQL(Operation op, ArrayList<String> cols, ArrayList<Object> vals)
+ public StringBuilder constructSQL(Operation op, ArrayList<String> cols, ArrayList<Object> vals)
throws MDBCServiceException {
StringBuilder sql = null;
switch (op.getOperationType()) {
@@ -1059,7 +1085,7 @@ public class MySQLMixin implements DBInterface {
sql.append(") VALUES (");
sep = "";
for (Object val : vals) {
- sql.append(sep + "\"" + val + "\"");
+ sql.append(sep + (val!=JSONObject.NULL?"\"" + val +"\"":"null"));
sep = ", ";
}
sql.append(");");
@@ -1074,7 +1100,7 @@ public class MySQLMixin implements DBInterface {
sql.append(r + " SET ");
sep = "";
for (int i = 0; i < cols.size(); i++) {
- sql.append(sep + cols.get(i) + "=\"" + vals.get(i) + "\"");
+ sql.append(sep + cols.get(i) + (vals.get(i)!=JSONObject.NULL?"=\"" + vals.get(i) +"\"":"=null"));
sep = ", ";
}
sql.append(" WHERE ");
@@ -1095,7 +1121,7 @@ public class MySQLMixin implements DBInterface {
String and = "";
for (String key : primaryKeys.keySet()) {
// We cannot use the default primary key for the sql table and operations
- if (!key.equals(mi.getMusicDefaultPrimaryKeyName())) {
+ if(!key.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) {
Object val = primaryKeys.get(key);
keyCondStmt.append(and + key + "=\"" + val + "\"");
and = " AND ";
@@ -1122,12 +1148,12 @@ public class MySQLMixin implements DBInterface {
}
@Override
- public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) {
- String query = "UPDATE " + CKPT_TBL + " SET MRIROW=?, DIGESTINDEX=? where RANGENAME=?;";
+ public void updateCheckpointLocations(Range r, Pair<MriReference, MusicTxDigestId> playbackPointer) {
+ String query = "UPDATE " + CKPT_TBL + " SET MRIROW=?, DIGESTID=? where RANGENAME=?;";
try {
PreparedStatement stmt = jdbcConn.prepareStatement(query);
- stmt.setString(1, playbackPointer.getLeft().toString());
- stmt.setInt(2, playbackPointer.getRight());
+ stmt.setString(1, playbackPointer.getLeft().getIndex().toString());
+ stmt.setString(2, playbackPointer.getRight().transactionId.toString());
stmt.setString(3, r.getTable());
stmt.execute();
stmt.close();
@@ -1137,6 +1163,30 @@ public class MySQLMixin implements DBInterface {
}
@Override
+ public Map<Range, Pair<MriReference, MusicTxDigestId>> getCheckpointLocations() {
+ Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = new ConcurrentHashMap<>();
+ try {
+ Statement stmt = jdbcConn.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT * FROM " + CKPT_TBL + ";");
+ while (rs.next()) {
+ Range r = new Range(rs.getString("RANGENAME"));
+ String mrirow = rs.getString("MRIROW");
+ String txId = rs.getString("DIGESTID");
+ if (mrirow!=null) {
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "Previously checkpointed: " + r.getTable() + " at (" + mrirow + ", " + txId + ")");
+ alreadyApplied.put(r, Pair.of(new MriReference(mrirow), new MusicTxDigestId(mrirow, txId, -1)));
+ }
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Unable to get replay checkpoint location", e);
+ }
+
+ return alreadyApplied;
+ }
+
+ @Override
public void initTables() {
try {
Statement stmt = jdbcConn.createStatement();
@@ -1161,5 +1211,4 @@ public class MySQLMixin implements DBInterface {
logger.error(EELFLoggerDelegate.errorLogger, "initTables: problem creating th mdbc tables!");
}
}
-
-}
+} \ No newline at end of file
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java
index 4afaa71..15c7620 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java
@@ -33,7 +33,7 @@ import net.sf.jsqlparser.statement.delete.Delete;
import net.sf.jsqlparser.statement.insert.Insert;
import net.sf.jsqlparser.statement.update.Update;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.zookeeper.KeeperException.UnimplementedException;
+//import org.apache.zookeeper.KeeperException.UnimplementedException;
import org.json.JSONObject;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
@@ -42,6 +42,8 @@ import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
import org.onap.music.mdbc.mixins.MySQLMixin.StagingTableUpdateRunnable;
+import org.onap.music.mdbc.tables.MriReference;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.Operation;
import org.onap.music.mdbc.query.SQLOperation;
import org.onap.music.mdbc.tables.StagingTable;
@@ -817,7 +819,7 @@ public class PostgresMixin implements DBInterface {
* @param transaction - base 64 encoded, serialized digest
*/
@Override
- public void replayTransaction(StagingTable transaction, Set<Range> ranges)
+ public void replayTransaction(StagingTable transaction)
throws SQLException, MDBCServiceException {
boolean autocommit = jdbcConn.getAutoCommit();
jdbcConn.setAutoCommit(false);
@@ -825,7 +827,6 @@ public class PostgresMixin implements DBInterface {
final ArrayList<Operation> opList = transaction.getOperationList();
for (Operation op : opList) {
- if (Range.overlaps(ranges, op.getTable())) {
try {
replayOperationIntoDB(jdbcStmt, op);
} catch (SQLException | MDBCServiceException e) {
@@ -835,7 +836,6 @@ public class PostgresMixin implements DBInterface {
jdbcConn.rollback();
throw e;
}
- }
}
clearReplayedOperations(jdbcStmt);
@@ -859,8 +859,8 @@ public class PostgresMixin implements DBInterface {
}
@Override
- public void applyTxDigest(StagingTable txDigest, Set<Range> ranges) throws SQLException, MDBCServiceException {
- replayTransaction(txDigest, ranges);
+ public void applyTxDigest(StagingTable txDigest) throws SQLException, MDBCServiceException {
+ replayTransaction(txDigest);
}
/**
@@ -1067,7 +1067,12 @@ public class PostgresMixin implements DBInterface {
}
@Override
- public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) {
+ public void updateCheckpointLocations(Range r, Pair<MriReference, MusicTxDigestId> playbackPointer) {
+ throw new org.apache.commons.lang.NotImplementedException();
+ }
+
+ @Override
+ public Map<Range, Pair<MriReference, MusicTxDigestId>> getCheckpointLocations() {
throw new org.apache.commons.lang.NotImplementedException();
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
index 9d1685c..142cb34 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
@@ -32,6 +32,7 @@ import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MriRowComparator;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
public class Dag {
@@ -145,7 +146,6 @@ public class Dag {
if(!readyInit){
initApplyDatastructures();
}
- Set<Range> rangesSet = new HashSet<>(ranges);
while(!toApplyNodes.isEmpty()){
DagNode nextNode = toApplyNodes.poll();
List<DagNode> outgoing = nextNode.getOutgoingEdges();
@@ -155,7 +155,7 @@ public class Dag {
toApplyNodes.add(out);
}
}
- if(!nextNode.wasApplied(rangesSet)){
+ if(!nextNode.wasApplied(ranges)){
return nextNode;
}
}
@@ -233,23 +233,23 @@ public class Dag {
return toApplyNodes.isEmpty();
}
- public void setAlreadyApplied(Map<Range, Pair<MriReference,Integer>> alreadyApplied, Set<Range> ranges)
+ public void setAlreadyApplied(Map<Range, Pair<MriReference,MusicTxDigestId>> alreadyApplied, Set<Range> ranges)
throws MDBCServiceException {
- for(Map.Entry<UUID,DagNode> node : nodes.entrySet()){
+ for (DagNode node: nodes.values()) {
Set<Range> intersection = new HashSet<>(ranges);
- intersection.retainAll(node.getValue().getRangeSet());
+ intersection.retainAll(node.getRangeSet());
for(Range r : intersection){
if(alreadyApplied.containsKey(r)){
- final Pair<MriReference, Integer> appliedPair = alreadyApplied.get(r);
+ final Pair<MriReference, MusicTxDigestId> appliedPair = alreadyApplied.get(r);
final MriReference appliedRow = appliedPair.getKey();
- final int index = appliedPair.getValue();
+ final int index = appliedPair.getValue().index;
final long appliedTimestamp = appliedRow.getTimestamp();
- final long nodeTimestamp = node.getValue().getTimestamp();
+ final long nodeTimestamp = node.getTimestamp();
if(appliedTimestamp > nodeTimestamp){
- setReady(node.getValue(),r);
+ setReady(node,r);
}
else if(appliedTimestamp == nodeTimestamp){
- setPartiallyReady(node.getValue(),r,index);
+ setPartiallyReady(node,r,index);
}
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java
index 78c68e1..5e4c899 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java
@@ -30,6 +30,7 @@ import java.util.UUID;
import org.apache.commons.lang3.tuple.Pair;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.MusicTxDigestId;
@@ -73,6 +74,10 @@ public class DagNode {
return owned;
}
+ /**
+ *
+ * @return the row's MRI Index represented by this dagnode
+ */
public UUID getId(){
return row.getPartitionIndex();
}
@@ -149,20 +154,25 @@ public class DagNode {
currentIndex = currentIndex+1;
}
- public synchronized Pair<MusicTxDigestId, List<Range>> nextNotAppliedTransaction(Set<Range> ranges){
+ /**
+ *
+ * @param ranges
+ * @return the index of the next transaction to replay and the ranges needed for this transaction
+ */
+ public synchronized Pair<MusicTxDigestId, Set<Range>> nextNotAppliedTransaction(Set<Range> ranges){
if(row.getRedoLog().isEmpty()) return null;
if(!applyInit){
initializeApply(ranges);
}
final List<MusicTxDigestId> redoLog = row.getRedoLog();
if(currentIndex < redoLog.size()){
- List<Range> responseRanges= new ArrayList<>();
+ Set<Range> responseRanges= new HashSet<>();
startIndex.forEach((r, index) -> {
if(index < currentIndex){
responseRanges.add(r);
}
});
- return Pair.of(redoLog.get(currentIndex++),responseRanges);
+ return Pair.of(row.getRedoLog().get(currentIndex++),responseRanges);
}
return null;
}
@@ -179,7 +189,7 @@ public class DagNode {
if(row.getRedoLog().isEmpty()) return true;
if(!applyInit){
initializeApply(ranges);
- }
+ }
return currentIndex >= row.getRedoLog().size();
}
@@ -194,11 +204,13 @@ public class DagNode {
if(o == null) return false;
if(!(o instanceof DagNode)) return false;
DagNode other = (DagNode) o;
- return other.row.getPartitionIndex().equals(this.row.getPartitionIndex());
+ return other.row.equals(this.row);
}
@Override
public int hashCode(){
return row.getPartitionIndex().hashCode();
}
+
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
index 933e000..0898e5d 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
@@ -27,9 +27,11 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.tuple.Pair;
import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.exceptions.MusicDeadlockException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.Utils;
import org.onap.music.mdbc.mixins.DBInterface;
import org.onap.music.mdbc.mixins.LockRequest;
import org.onap.music.mdbc.mixins.LockResult;
@@ -45,8 +47,7 @@ public class OwnershipAndCheckpoint{
private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(OwnershipAndCheckpoint.class);
private Lock checkpointLock;
- private AtomicBoolean change;
- private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
+ private Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied;
private Map<UUID,Long> ownershipBeginTime;
private long timeoutInMs;
@@ -54,8 +55,7 @@ public class OwnershipAndCheckpoint{
this(new HashMap<>(),Long.MAX_VALUE);
}
- public OwnershipAndCheckpoint(Map<Range, Pair<MriReference, Integer>> alreadyApplied, long timeoutInMs){
- change = new AtomicBoolean(true);
+ public OwnershipAndCheckpoint(Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied, long timeoutInMs){
checkpointLock = new ReentrantLock();
this.alreadyApplied = alreadyApplied;
ownershipBeginTime = new HashMap<>();
@@ -130,20 +130,17 @@ public class OwnershipAndCheckpoint{
* @param di
* @param extendedDag
* @param ranges
- * @param locks
* @param ownOpId
* @throws MDBCServiceException
*/
- public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, Set<Range> ranges,
- Map<MusicRangeInformationRow, LockResult> locks, UUID ownOpId) throws MDBCServiceException {
+ public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, Set<Range> ranges, UUID ownOpId)
+ throws MDBCServiceException {
if(ranges.isEmpty()){
return;
}
try {
checkpointLock.lock();
- change.set(true);
- Set<Range> rangesSet = new HashSet<>(ranges);
- extendedDag.setAlreadyApplied(alreadyApplied, rangesSet);
+ extendedDag.setAlreadyApplied(alreadyApplied, ranges);
applyRequiredChanges(mi, di, extendedDag, ranges, ownOpId);
}
catch(MDBCServiceException e){
@@ -163,18 +160,18 @@ public class OwnershipAndCheckpoint{
}
}
- private void disableForeignKeys(DBInterface di) throws MDBCServiceException {
+ private void disableForeignKeys(DBInterface dbi) throws MDBCServiceException {
try {
- di.disableForeignKeyChecks();
+ dbi.disableForeignKeyChecks();
} catch (SQLException e) {
throw new MDBCServiceException("Error disable foreign keys checks",e);
}
}
- private void applyTxDigest(Set<Range> ranges, DBInterface di, StagingTable txDigest)
+ private void applyTxDigest(DBInterface dbi, StagingTable txDigest)
throws MDBCServiceException {
try {
- di.applyTxDigest(txDigest,ranges);
+ dbi.applyTxDigest(txDigest);
} catch (SQLException e) {
throw new MDBCServiceException("Error applying tx digest in local SQL",e);
}
@@ -191,69 +188,82 @@ public class OwnershipAndCheckpoint{
if(rangesToWarmup.isEmpty()){
return;
}
- boolean ready = false;
- change.set(true);
- Set<Range> rangeSet = new HashSet<Range>(rangesToWarmup);
Dag dag = new Dag(false);
- while(!ready){
- if(change.get()){
- change.set(false);
- final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, rangesToWarmup,false);
- dag = Dag.getDag(rows,rangesToWarmup);
- }
- else if(!dag.applied()){
- DagNode node = dag.nextToApply(rangesToWarmup);
- if(node!=null) {
- Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet);
- while (pair != null) {
+ final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, rangesToWarmup,false);
+ dag = Dag.getDag(rows,rangesToWarmup);
+ dag.setAlreadyApplied(alreadyApplied, rangesToWarmup);
+ while(!dag.applied()){
+ DagNode node = dag.nextToApply(rangesToWarmup);
+ if(node!=null) {
+ Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(rangesToWarmup);
+ while (pair != null) {
+ checkpointLock.lock();
+ try {
disableForeignKeys(di);
- checkpointLock.lock();
- if (change.get()) {
- enableForeignKeys(di);
- checkpointLock.unlock();
- break;
- } else {
- applyDigestAndUpdateDataStructures(mi, di, rangesToWarmup, node, pair);
- }
- pair = node.nextNotAppliedTransaction(rangeSet);
+ applyDigestAndUpdateDataStructures(mi, di, node, pair.getLeft(), pair.getRight());
+ pair = node.nextNotAppliedTransaction(rangesToWarmup);
enableForeignKeys(di);
+ } catch (MDBCServiceException e) {
checkpointLock.unlock();
+ throw e;
}
+ checkpointLock.unlock();
}
}
- else{
- ready = true;
- }
}
}
/**
- * Apply tx digest for ranges, update checkpoint location (alreadyApplied)
+ * Apply tx digest for dagnode update checkpoint location (alreadyApplied)
* @param mi
* @param di
- * @param ranges
* @param node
* @param pair
* @throws MDBCServiceException
*/
- private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface di, Set<Range> ranges, DagNode node,
- Pair<MusicTxDigestId, List<Range>> pair) throws MDBCServiceException {
+ private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface dbi, DagNode node,
+ MusicTxDigestId digestId, Set<Range> ranges) throws MDBCServiceException {
+ if (alreadyReplayed(node, digestId)) {
+ return;
+ }
+
final StagingTable txDigest;
try {
- txDigest = mi.getTxDigest(pair.getKey());
+ txDigest = mi.getTxDigest(digestId);
} catch (MDBCServiceException e) {
logger.warn("Transaction digest was not found, this could be caused by a failure of the previous owner"
+"And would normally only happen as the last ID of the corresponding redo log. Please check that this is the"
- +" case for txID "+pair.getKey().transactionId.toString());
+ +" case for txID "+digestId.transactionId.toString());
return;
}
- applyTxDigest(ranges,di, txDigest);
- for (Range r : pair.getValue()) {
- MusicRangeInformationRow row = node.getRow();
- alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index));
-
- updateCheckpointLocations(mi, di, r, row.getPartitionIndex(), pair.getKey().index);
+ applyTxDigest(dbi, txDigest);
+ MusicRangeInformationRow row = node.getRow();
+ updateAlreadyApplied(mi, dbi, ranges, row.getPartitionIndex(), digestId);
+ }
+
+ /**
+ * Determine if this musictxdigest id has already been replayed
+ * @param node
+ * @param redoLogIndex
+ * @return true if alreadyApplied is past this node/redolog, false if it hasn't been replayed
+ */
+ public boolean alreadyReplayed(DagNode node, MusicTxDigestId txdigest) {
+ int index = node.getRow().getRedoLog().indexOf(txdigest);
+ for (Range range: node.getRangeSet()) {
+ Pair<MriReference, MusicTxDigestId> applied = alreadyApplied.get(range);
+ if (applied==null) {
+ return false;
+ }
+ MriReference appliedMriRef = applied.getLeft();
+ MusicTxDigestId appliedDigest = applied.getRight();
+ appliedDigest.index = node.getRow().getRedoLog().indexOf(appliedDigest);
+ if (appliedMriRef==null || appliedMriRef.getTimestamp() < node.getTimestamp()
+ || (appliedMriRef.getTimestamp() == node.getTimestamp()
+ && appliedDigest.index < index)) {
+ return false;
+ }
}
+ return true;
}
/**
@@ -261,12 +271,13 @@ public class OwnershipAndCheckpoint{
* @param mi
* @param di
* @param r
- * @param partitionIndex
+ * @param mriRef
* @param index
+ * @throws MDBCServiceException
*/
- private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, int index) {
- dbi.updateCheckpointLocations(r, Pair.of(partitionIndex, index));
- mi.updateCheckpointLocations(r, Pair.of(partitionIndex, index));
+ private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, MriReference mriRef, MusicTxDigestId txdigest) {
+ dbi.updateCheckpointLocations(r, Pair.of(mriRef, txdigest));
+ mi.updateCheckpointLocations(r, Pair.of(mriRef, txdigest));
}
/**
@@ -280,15 +291,14 @@ public class OwnershipAndCheckpoint{
*/
private void applyRequiredChanges(MusicInterface mi, DBInterface db, Dag extendedDag, Set<Range> ranges, UUID ownOpId)
throws MDBCServiceException {
- Set<Range> rangeSet = new HashSet<Range>(ranges);
disableForeignKeys(db);
while(!extendedDag.applied()){
DagNode node = extendedDag.nextToApply(ranges);
if(node!=null) {
- Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet);
+ Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(ranges);
while (pair != null) {
- applyDigestAndUpdateDataStructures(mi, db, ranges, node, pair);
- pair = node.nextNotAppliedTransaction(rangeSet);
+ applyDigestAndUpdateDataStructures(mi, db, node, pair.getLeft(), pair.getRight());
+ pair = node.nextNotAppliedTransaction(ranges);
if (timeout(ownOpId)) {
enableForeignKeys(db);
throw new MDBCServiceException("Timeout apply changes to local dbi");
@@ -311,7 +321,11 @@ public class OwnershipAndCheckpoint{
*/
public OwnershipReturn own(MusicInterface mi, Set<Range> ranges,
DatabasePartition currPartition, UUID opId, SQLOperationType lockType) throws MDBCServiceException {
-
+ return own(mi, ranges, currPartition, opId, lockType, null);
+ }
+
+ public OwnershipReturn own(MusicInterface mi, Set<Range> ranges,
+ DatabasePartition currPartition, UUID opId, SQLOperationType lockType, String ownerId) throws MDBCServiceException {
if (ranges == null || ranges.isEmpty()) {
return null;
}
@@ -331,7 +345,18 @@ public class OwnershipAndCheckpoint{
while ( (toOwn.isDifferent(currentlyOwn) || !currentlyOwn.isOwned() ) &&
!timeout(opId)
) {
- takeOwnershipOfDag(mi, currPartition, opId, locksForOwnership, toOwn, lockType);
+ try {
+ takeOwnershipOfDag(mi, currPartition, opId, locksForOwnership, toOwn, lockType, ownerId);
+ } catch (MDBCServiceException e) {
+ MusicDeadlockException de = Utils.getDeadlockException(e);
+ if (de!=null) {
+ locksForOwnership.remove(currPartition.getMRIIndex());
+ mi.releaseLocks(locksForOwnership);
+ stopOwnershipTimeoutClock(opId);
+ logger.error("Error when owning a range: Deadlock detected");
+ }
+ throw e;
+ }
currentlyOwn=toOwn;
//TODO instead of comparing dags, compare rows
rangesToOwnRows = extractRowsForRange(mi, rangesToOwn, false);
@@ -347,9 +372,9 @@ public class OwnershipAndCheckpoint{
}
Set<Range> allRanges = currentlyOwn.getAllRanges();
//TODO: we shouldn't need to go back to music at this point
- List<MusicRangeInformationRow> latestRows = extractRowsForRange(mi, new HashSet<>(allRanges), true);
+ List<MusicRangeInformationRow> latestRows = extractRowsForRange(mi, allRanges, true);
currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,latestRows));
- return mi.mergeLatestRowsIfNecessary(currentlyOwn,locksForOwnership,opId);
+ return mi.mergeLatestRowsIfNecessary(currentlyOwn,locksForOwnership,opId, ownerId);
}
/**
@@ -362,7 +387,8 @@ public class OwnershipAndCheckpoint{
* @throws MDBCServiceException
*/
private void takeOwnershipOfDag(MusicInterface mi, DatabasePartition partition, UUID opId,
- Map<UUID, LockResult> ownershipLocks, Dag toOwn, SQLOperationType lockType) throws MDBCServiceException {
+ Map<UUID, LockResult> ownershipLocks, Dag toOwn, SQLOperationType lockType, String ownerId)
+ throws MDBCServiceException {
while(toOwn.hasNextToOwn()){
DagNode node = toOwn.nextToOwn();
@@ -374,10 +400,16 @@ public class OwnershipAndCheckpoint{
false, partition.getSnapshot()));
} else if ( ownershipLocks.containsKey(uuidToOwn) || !row.getIsLatest() ) {
toOwn.setOwn(node);
+ if (ownershipLocks.containsKey(uuidToOwn) && !row.getIsLatest()) {
+ //previously owned partition that is no longer latest, don't need anymore
+ LockResult result = ownershipLocks.get(uuidToOwn);
+ ownershipLocks.remove(uuidToOwn);
+ mi.relinquish(result.getLockId(), uuidToOwn.toString());
+ }
} else {
LockRequest request = new LockRequest(uuidToOwn,
new ArrayList<>(node.getRangeSet()), lockType);
- String lockId = mi.createLock(request);
+ String lockId = mi.createLock(request, ownerId);
LockResult result = null;
boolean owned = false;
while(!owned && !timeout(opId)){
@@ -457,15 +489,6 @@ public class OwnershipAndCheckpoint{
}
-
- public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException {
- Set<Range> snapshot = partition.getSnapshot();
- UUID row = partition.getMRIIndex();
- for(Range r : snapshot){
- alreadyApplied.put(r,Pair.of(new MriReference(row),-1));
- }
- }
-
// \TODO merge with dag code
private Map<Range,Set<DagNode>> getIsLatestPerRange(Dag dag, List<MusicRangeInformationRow> rows) throws MDBCServiceException {
Map<Range,Set<DagNode>> rowsPerLatestRange = new HashMap<>();
@@ -490,8 +513,20 @@ public class OwnershipAndCheckpoint{
}
- public Map<Range, Pair<MriReference, Integer>> getAlreadyApplied() {
+ public Map<Range, Pair<MriReference, MusicTxDigestId>> getAlreadyApplied() {
return this.alreadyApplied;
- }
+ }
+ public void updateAlreadyApplied(MusicInterface mi, DBInterface dbi, Set<Range> ranges, UUID mriIndex, MusicTxDigestId digestId) {
+ for (Range r: ranges) {
+ updateAlreadyApplied(mi, dbi, r, mriIndex, digestId);
+ }
+ }
+
+ public void updateAlreadyApplied(MusicInterface mi, DBInterface dbi, Range r, UUID mriIndex, MusicTxDigestId digestId) {
+ MriReference mriRef = new MriReference(mriIndex);
+ alreadyApplied.put(r, Pair.of(mriRef, digestId));
+ updateCheckpointLocations(mi, dbi, r, mriRef, digestId);
+ }
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java b/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java
index 6d6c661..27ea6ea 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java
@@ -30,6 +30,7 @@ import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.avatica.util.Quoting;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlDelete;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlJoin;
@@ -120,12 +121,15 @@ public class QueryProcessor {
case UPDATE:
parseUpdate((SqlUpdate) sqlNode, tableOpsMap);
break;
+ case DELETE:
+ parseDelete((SqlDelete) sqlNode, tableOpsMap);
+ break;
case SELECT:
parseSelect((SqlSelect) sqlNode, tableOpsMap);
break;
case ORDER_BY:
parseSelect((SqlSelect)((SqlOrderBy) sqlNode).query, tableOpsMap);
- break;
+ break;
default:
logger.error("Unhandled sql query type " + sqlNode.getKind() +" for query " + query);
}
@@ -144,7 +148,7 @@ public class QueryProcessor {
Ops.add(SQLOperation.INSERT);
tableOpsMap.put(tableName, Ops);
}
-
+
private static void parseUpdate(SqlUpdate sqlUpdate, Map<String, List<SQLOperation>> tableOpsMap) {
SqlNode targetTable = sqlUpdate.getTargetTable();
switch (targetTable.getKind()) {
@@ -155,7 +159,18 @@ public class QueryProcessor {
logger.error("Unable to process: " + targetTable.getKind() + " query");
}
}
-
+
+ private static void parseDelete(SqlDelete sqlDelete, Map<String, List<SQLOperation>> tableOpsMap) {
+ SqlNode targetTable = sqlDelete.getTargetTable();
+ switch (targetTable.getKind()) {
+ case IDENTIFIER:
+ addIdentifierToMap(tableOpsMap, (SqlIdentifier) targetTable, SQLOperation.DELETE);
+ break;
+ default:
+ logger.error("Unable to process: " + targetTable.getKind() + " query");
+ }
+ }
+
private static void parseSelect(SqlSelect sqlSelect, Map<String, List<SQLOperation>> tableOpsMap ) {
SqlNode from = sqlSelect.getFrom();
switch (from.getKind()) {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java
index 8aad335..3c15487 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java
@@ -28,6 +28,19 @@ public final class MriReference {
this.index= index;
}
- public long getTimestamp() { return index.timestamp();}
+ public MriReference(String mrirow) {
+ index = UUID.fromString(mrirow);
+ }
+ public long getTimestamp() {
+ return index.timestamp();
+ }
+
+ public UUID getIndex() {
+ return this.index;
+ }
+
+ public String toString() {
+ return index.toString();
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java
index de711ef..8c95047 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java
@@ -96,7 +96,7 @@ public final class MusicRangeInformationRow implements Comparable<MusicRangeInfo
if(o == null) return false;
if(!(o instanceof MusicRangeInformationRow)) return false;
MusicRangeInformationRow other = (MusicRangeInformationRow) o;
- return other.getPartitionIndex().equals(this.getPartitionIndex());
+ return other.getPartitionIndex().equals(this.getPartitionIndex()) && other.getRedoLog().equals(this.getRedoLog());
}
@Override
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
index 6f95d3c..a1ef346 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
@@ -60,8 +60,7 @@ public class MusicTxDigestDaemon implements Runnable {
for (UUID txTimeID : keys) {
transaction = ecDigestInformation.get(txTimeID);
try {
- dbi.replayTransaction(transaction,
- ranges); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??)
+ dbi.replayTransaction(transaction); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??)
} catch (SQLException e) {
logger.error("EC:Rolling back the entire digest replay.");
return;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
index db9e455..59eb97e 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
@@ -24,13 +24,19 @@ import java.util.UUID;
public final class MusicTxDigestId {
public final UUID mriId;
public final UUID transactionId;
- public final int index;
+ public int index;
public MusicTxDigestId(UUID mriRowId, UUID digestId, int index) {
this.mriId=mriRowId;
this.transactionId= digestId;
this.index=index;
}
+
+ public MusicTxDigestId(String mriRowId, String digestId, int index) {
+ this.mriId = UUID.fromString(mriRowId);
+ this.transactionId = UUID.fromString(digestId);
+ this.index = index;
+ }
public MusicTxDigestId(UUID digestId, int index) {
this.mriId = null;
@@ -55,4 +61,8 @@ public final class MusicTxDigestId {
public int hashCode(){
return transactionId.hashCode();
}
+
+ public String toString() {
+ return this.transactionId.toString();
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/TxDigestDecompression.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/TxDigestDecompression.java
index 0b422fa..6b7b7be 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/TxDigestDecompression.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/TxDigestDecompression.java
@@ -21,13 +21,15 @@
package org.onap.music.mdbc.tools;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
-import org.onap.music.mdbc.StateManager;
+import org.onap.music.mdbc.mixins.MusicInterface;
import org.onap.music.mdbc.mixins.MusicMixin;
+import org.onap.music.mdbc.mixins.MySQLMixin;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.Operation;
@@ -42,6 +44,12 @@ import org.onap.music.mdbc.tables.StagingTable;
public class TxDigestDecompression {
public static final EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TxDigestDecompression.class);
MusicMixin mi;
+ MySQLMixin ms;
+
+ public TxDigestDecompression(MusicInterface _mi) {
+ mi = (MusicMixin) _mi;
+ ms = new MySQLMixin();
+ }
public TxDigestDecompression() {
Properties prop = new Properties();
@@ -52,6 +60,7 @@ public class TxDigestDecompression {
}
try {
mi = new MusicMixin(null, "mdbcservername", prop);
+ ms = new MySQLMixin();
} catch (MDBCServiceException e) {
e.printStackTrace();
return;
@@ -64,16 +73,7 @@ public class TxDigestDecompression {
List<MusicRangeInformationRow> rows = mi.getAllMriRows();
for (MusicRangeInformationRow row: rows) {
UUID mriId = row.getPartitionIndex();
- for (MusicTxDigestId id: row.getRedoLog()) {
- StagingTable st = mi.getTxDigest(id);
- System.out.print(id.transactionId + ": [");
- String sep = "";
- for (Operation op: st.getOperationList()) {
- System.out.print(sep + op.getOperationType() + "-" + op.getTable() + "->" + op.getVal());
- sep =", ";
- }
- System.out.println("]");
- }
+ extractedRedoLog(row);
}
} catch (MDBCServiceException e) {
e.printStackTrace();
@@ -81,6 +81,25 @@ public class TxDigestDecompression {
}
System.exit(0);
}
+
+ public void extractedRedoLog(MusicRangeInformationRow row) throws MDBCServiceException {
+ for (MusicTxDigestId id: row.getRedoLog()) {
+ StagingTable st = mi.getTxDigest(id);
+ System.out.print(id.transactionId + ": [");
+ String sep = ", ";
+ for (Operation op: st.getOperationList()) {
+
+ ArrayList<String> cols = new ArrayList<String>();
+ ArrayList<Object> vals = new ArrayList<Object>();
+ ms.constructColValues(op.getVal(), cols, vals);
+ StringBuilder sql = ms.constructSQL(op, cols, vals);
+
+ System.out.print(sql + sep);
+
+ }
+ System.out.println("]");
+ }
+ }
public static void main(String[] args) {
TxDigestDecompression txDecompress = new TxDigestDecompression();
diff --git a/mdbc-server/src/main/resources/key.properties b/mdbc-server/src/main/resources/key.properties
new file mode 100644
index 0000000..bd5d472
--- /dev/null
+++ b/mdbc-server/src/main/resources/key.properties
@@ -0,0 +1 @@
+cipher.enc.key=AAECAwQFBgcICQoLDA0ODw==
diff --git a/mdbc-server/src/main/resources/mdbc.properties b/mdbc-server/src/main/resources/mdbc.properties
index 49fdfd2..60adfae 100755
--- a/mdbc-server/src/main/resources/mdbc.properties
+++ b/mdbc-server/src/main/resources/mdbc.properties
@@ -15,4 +15,10 @@ DEFAULT_DRIVERS=\
org.mariadb.jdbc.Driver \
org.postgresql.Driver
-txdaemonsleeps=15
+# whether or not to split the partitions
+partition_splitting=true
+
+write_locks_only=true
+
+#time, in seconds, between when the daemon catches up
+txdaemonsleeps=15 \ No newline at end of file
diff --git a/mdbc-server/src/main/resources/music.properties b/mdbc-server/src/main/resources/music.properties
index 1aaf7fd..23908ad 100755
--- a/mdbc-server/src/main/resources/music.properties
+++ b/mdbc-server/src/main/resources/music.properties
@@ -3,6 +3,7 @@ cassandra.host =\
cassandra.user =\
cassandra
cassandra.password =\
- cassandra
+ OB06GaQG8BJOts8diB1jXS+LZrNUkplCt1XW5XwMAes=
+# password "cassandra" encrypted with trivial key found in key.properties
music_namespace =\
mdbc_namespace
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java
index 72ec8d3..626b6ca 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java
@@ -40,9 +40,11 @@ import org.junit.rules.TemporaryFolder;
import org.onap.music.datastore.MusicDataStore;
import org.onap.music.datastore.MusicDataStoreHandle;
import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.lockingservice.cassandra.CassaLockStore;
import org.onap.music.mdbc.mixins.MusicMixin;
import org.onap.music.mdbc.mixins.PostgresMixin;
+import org.powermock.reflect.Whitebox;
public class MdbcTestUtils {
@@ -68,7 +70,7 @@ public class MdbcTestUtils {
final private static String nodeInfoTableName = "nodeinfo";
//Mariadb variables
static DB db=null;
- final public static String mariaDBDatabaseName="test";
+ final public static String mariaDBDatabaseName="TEST";
final static Integer mariaDbPort=13306;
@@ -197,13 +199,14 @@ public class MdbcTestUtils {
static void stopMySql(){
try {
db.stop();
+ db=null;
} catch (ManagedProcessException e) {
e.printStackTrace();
fail("Error closing mysql");
}
}
- public static void cleanDatabase(DBType type){
+ public static void stopDatabase(DBType type){
switch(type) {
case MySQL:
stopMySql();
@@ -216,7 +219,7 @@ public class MdbcTestUtils {
}
}
- public static void initCassandra(){
+ public static void initCassandra() throws MDBCServiceException {
try {
EmbeddedCassandraServerHelper.startEmbeddedCassandra(EmbeddedCassandraServerHelper.CASSANDRA_RNDPORT_YML_FILE);
} catch (Exception e) {
@@ -230,8 +233,11 @@ public class MdbcTestUtils {
session = EmbeddedCassandraServerHelper.getSession();
assertNotNull("Invalid configuration for cassandra", session);
- MusicDataStoreHandle.mDstoreHandle = new MusicDataStore(cluster, session);
- CassaLockStore store = new CassaLockStore(MusicDataStoreHandle.mDstoreHandle);
+
+ MusicDataStore mds = new MusicDataStore(cluster, session);
+ Whitebox.setInternalState(MusicDataStoreHandle.class, "mDstoreHandle", mds);
+ CassaLockStore store = new CassaLockStore(mds);
+
assertNotNull("Invalid configuration for music", store);
}
@@ -249,7 +255,6 @@ public class MdbcTestUtils {
public static MusicMixin getMusicMixin() throws MDBCServiceException {
initNamespaces();
- initTables();
MusicMixin mixin=null;
try {
Properties properties = new Properties();
@@ -268,13 +273,4 @@ public class MdbcTestUtils {
MusicMixin.createKeyspace("music_internal",1);
MusicMixin.createKeyspace(keyspace,1);
}
-
- public static void initTables() throws MDBCServiceException{
- MusicMixin.createMusicRangeInformationTable(keyspace, mriTableName);
- MusicMixin.createMusicTxDigest(mtdTableName,keyspace, -1);
- MusicMixin.createMusicEventualTxDigest(eventualMtxdTableName,keyspace, -1);
- MusicMixin.createMusicNodeInfoTable(nodeInfoTableName,keyspace,-1);
- MusicMixin.createMusicRangeDependencyTable(keyspace,rangeDependencyTableName);
- }
-
}
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java
index ef26cb6..bf27ea8 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java
@@ -67,6 +67,8 @@ import org.onap.music.mdbc.query.SQLOperationType;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.StagingTable;
+import org.onap.music.service.impl.MusicCassaCore;
+
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -77,6 +79,7 @@ import org.onap.music.mdbc.ownership.Dag;
import org.onap.music.mdbc.ownership.DagNode;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+@Ignore
public class MusicMixinTest {
@@ -107,7 +110,7 @@ public class MusicMixinTest {
public void initTest() throws MDBCServiceException {
session = MdbcTestUtils.getSession();
session.execute("DROP KEYSPACE IF EXISTS "+ MdbcTestUtils.getKeyspace());
- mixin=MdbcTestUtils.getMusicMixin();
+ mixin = MdbcTestUtils.getMusicMixin();
}
//@Test(timeout=10000)
@@ -141,17 +144,17 @@ public class MusicMixinTest {
private DatabasePartition addRow(Set<Range> ranges,boolean isLatest){
final UUID uuid = MDBCUtils.generateTimebasedUniqueKey();
- DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null);
+ DatabasePartition dbPartition = new DatabasePartition(ranges,uuid);
MusicRangeInformationRow newRow = new MusicRangeInformationRow(dbPartition, new ArrayList<>(), isLatest);
DatabasePartition partition=null;
try {
- partition = mixin.createLockedMRIRow(newRow);
+ partition = mixin.createLockedMRIRow(newRow, "");
} catch (MDBCServiceException e) {
fail("failure when creating new row");
}
String fullyQualifiedMriKey = MdbcTestUtils.getKeyspace()+"."+ MdbcTestUtils.getMriTableName()+"."+partition.getMRIIndex().toString();
try {
- MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId());
+ MusicLockState musicLockState = MusicCassaCore.getInstance().voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId());
} catch (MusicLockingException e) {
fail("failure when releasing lock");
}
@@ -253,21 +256,7 @@ public class MusicMixinTest {
mixin.addEventualTxDigest(digestId, compressed);
LinkedHashMap<UUID, StagingTable> digest = mixin.getEveTxDigest("n1");
-
- Consumer<Map.Entry<UUID,StagingTable>> consumer = new Consumer<Map.Entry<UUID,StagingTable>>() {
-
- @Override
- public void accept(Entry<UUID, StagingTable> mapEntry) {
- assertNotNull(mapEntry.getValue());
- }
-
- };
-
- digest.entrySet().forEach(consumer);
-
-
-
-
+ digest.entrySet().forEach(e -> assertNotNull(e.getValue()));
}
protected ByteBuffer mockCompressedProtoByteBuff() throws MDBCServiceException, InvalidProtocolBufferException {
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MySQLMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MySQLMixinTest.java
index 1f2c1dd..cf23305 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MySQLMixinTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MySQLMixinTest.java
@@ -20,16 +20,25 @@
package org.onap.music.mdbc.mixins;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Properties;
+import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.*;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
+import java.sql.Statement;
+import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.MdbcTestUtils;
import org.onap.music.mdbc.MdbcTestUtils.DBType;
+import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.mixins.MySQLMixin;
-
+import org.onap.music.mdbc.tables.MriReference;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
import ch.vorburger.mariadb4j.DB;
public class MySQLMixinTest {
@@ -60,7 +69,7 @@ public class MySQLMixinTest {
@AfterClass
public static void close() throws Exception {
-
+ MdbcTestUtils.stopDatabase(DBType.MySQL);
}
@Before
@@ -70,9 +79,81 @@ public class MySQLMixinTest {
this.mysqlMixin = new MySQLMixin(null, null, conn, info);
}
- @Test
- public void testGetDataBaseName() throws SQLException {
- Assert.assertEquals(MdbcTestUtils.getMariaDBDBName(), mysqlMixin.getDatabaseName());
- }
+
+ @After
+ public void afterEachTest() throws SQLException {
+ clearTables();
+ }
+
+ @Test
+ public void testGetDataBaseName() throws SQLException {
+ assertEquals(MdbcTestUtils.getMariaDBDBName(), mysqlMixin.getDatabaseName());
+ }
+
+ @Test
+ public void testGetTableSet() throws SQLException {
+ Set<Range> rangesAdded = new HashSet<>();
+ rangesAdded.add(new Range("TEST.RANGER"));
+ rangesAdded.add(new Range("TEST.RANGES"));
+ Statement st = conn.createStatement();
+ for (Range r: rangesAdded) {
+ st.execute("CREATE TABLE " + r + " (name VARCHAR(20));");
+ }
+ st.close();
+ Set<Range> ranges = mysqlMixin.getSQLRangeSet();
+
+ assertTrue(ranges.containsAll(rangesAdded));
+ assertTrue(rangesAdded.containsAll(ranges));
+ }
+
+ @Test
+ public void testCkpt() throws SQLException {
+ createTables();
+
+ Range r1 = new Range(MdbcTestUtils.mariaDBDatabaseName + ".RANGER");
+ MriReference mri1 = new MriReference(MDBCUtils.generateUniqueKey());
+ MusicTxDigestId i1 = new MusicTxDigestId(mri1.index, MDBCUtils.generateUniqueKey(), 1);
+ Pair<MriReference, MusicTxDigestId> digestId = Pair.of(mri1, i1);
+ mysqlMixin.updateCheckpointLocations(r1, digestId);
+
+ Range r2 = new Range(MdbcTestUtils.mariaDBDatabaseName + ".RANGES");
+ MriReference mri2 = new MriReference(MDBCUtils.generateUniqueKey());
+ MusicTxDigestId i2 = new MusicTxDigestId(mri2.index, MDBCUtils.generateUniqueKey(), 2);
+ Pair<MriReference, MusicTxDigestId> p2 = Pair.of(mri2, i2);
+ mysqlMixin.updateCheckpointLocations(r2, p2);
+
+ Map<Range, Pair<MriReference, MusicTxDigestId>> ckptmap = mysqlMixin.getCheckpointLocations();
+ assertTrue(ckptmap.containsKey(r1));
+ assertEquals(mri1.getIndex(), ckptmap.get(r1).getLeft().getIndex());
+ assertEquals(i1.transactionId, ckptmap.get(r1).getRight().transactionId);
+
+ assertTrue(ckptmap.containsKey(r2));
+ assertEquals(mri2.getIndex(), ckptmap.get(r2).getLeft().getIndex());
+ assertEquals(i2.transactionId, ckptmap.get(r2).getRight().transactionId);
+ }
+
+ private void createTables() throws SQLException {
+ Statement st = conn.createStatement();
+ st.execute("CREATE TABLE RANGER (name VARCHAR(20));");
+ st.execute("CREATE TABLE RANGES (name VARCHAR(20));");
+ st.close();
+ //need to re-initiate the tables
+ this.mysqlMixin.initTables();
+ }
+
+
+ private void clearTables() throws SQLException {
+ Set<Range> ranges = mysqlMixin.getSQLRangeSet();
+ Statement st = conn.createStatement();
+ for (Range r: ranges) {
+ try {
+ st.execute("DROP TABLE " + r + ";");
+ } catch (SQLException e) {
+ System.out.println("Trouble dropping: " + r);
+ e.printStackTrace();
+ }
+ }
+ st.close();
+ }
}
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java
index a1cf2b1..1ee8de7 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java
@@ -30,6 +30,7 @@ import java.util.Properties;
import java.util.Set;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.mdbc.MdbcTestUtils;
@@ -38,6 +39,7 @@ import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
import org.onap.music.mdbc.tables.StagingTable;
+@Ignore
public class PostgresMixinTest {
final private static String keyspace="metricmusictest";
final private static String mdbcServerName = "name";
@@ -65,7 +67,7 @@ public class PostgresMixinTest {
public static void close(){
//TODO: shutdown cassandra
mixin=null;
- MdbcTestUtils.cleanDatabase(DBType.POSTGRES);
+ MdbcTestUtils.stopDatabase(DBType.POSTGRES);
MdbcTestUtils.stopCassandra();
}
@@ -211,7 +213,7 @@ public class PostgresMixinTest {
Set<Range> ranges = new HashSet<>();
ranges.add(new Range("public.testtable"));
try {
- mixin.applyTxDigest(st,ranges);
+ mixin.applyTxDigest(st);
} catch (SQLException|MDBCServiceException e) {
e.printStackTrace();
fail();
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
index ee50dca..afe378e 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
@@ -170,7 +170,7 @@ public class DagTest {
HashSet<Range> rangesSet = new HashSet<>(ranges);
while(!dag.applied()){
DagNode node = dag.nextToApply(ranges);
- Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangesSet);
+ Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(rangesSet);
int transactionCounter = 0;
while(pair!=null) {
assertNotEquals(1,transactionCounter);
@@ -178,9 +178,10 @@ public class DagTest {
MusicTxDigestId id = row.getRedoLog().get(transactionCounter);
assertEquals(id,pair.getKey());
assertEquals(0,pair.getKey().index);
- List<Range> value = pair.getValue();
+ Set<Range> value = pair.getValue();
assertEquals(1,value.size());
- assertEquals(new Range("schema.range1"),value.get(0));
+ assertTrue(value.contains(new Range("schema.range1")));
+ //assertEquals(new Range("schema.range1"),value.get(0));
pair = node.nextNotAppliedTransaction(rangesSet);
transactionCounter++;
}
@@ -192,7 +193,7 @@ public class DagTest {
@Test
public void nextToApply2() throws InterruptedException, MDBCServiceException {
- Map<Range, Pair<MriReference, Integer>> alreadyApplied = new HashMap<>();
+ Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = new HashMap<>();
List<MusicRangeInformationRow> rows = new ArrayList<>();
Set<Range> ranges = new HashSet<>( Arrays.asList(
new Range("schema.range1")
@@ -207,7 +208,7 @@ public class DagTest {
new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),1)
));
MusicRangeInformationRow newRow = createNewRow(new HashSet<>(ranges), "", false, redo2);
- alreadyApplied.put(new Range("schema.range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0));
+ alreadyApplied.put(new Range("schema.range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), redo1.get(0)));
rows.add(newRow);
MILLISECONDS.sleep(10);
List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList(
@@ -220,7 +221,7 @@ public class DagTest {
int nodeCounter = 1;
while(!dag.applied()){
DagNode node = dag.nextToApply(ranges);
- Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangesSet);
+ Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(rangesSet);
int transactionCounter = 0;
while(pair!=null) {
assertNotEquals(1,transactionCounter);
@@ -228,9 +229,10 @@ public class DagTest {
MusicTxDigestId id = row.getRedoLog().get(2-nodeCounter);
assertEquals(id,pair.getKey());
assertEquals(2-nodeCounter,pair.getKey().index);
- List<Range> value = pair.getValue();
+ Set<Range> value = pair.getValue();
assertEquals(1,value.size());
- assertEquals(new Range("schema.range1"),value.get(0));
+ assertTrue(value.contains(new Range("schema.range1")));
+ //assertEquals(new Range("schema.range1"),value.get(0));
pair = node.nextNotAppliedTransaction(rangesSet);
transactionCounter++;
}
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java
index 2443d1e..c0e7c50 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java
@@ -34,6 +34,7 @@ import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
@@ -60,6 +61,7 @@ import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.TxCommitProgress;
+@Ignore
public class OwnershipAndCheckpointTest {
public static final String DATABASE = MdbcTestUtils.mariaDBDatabaseName;
public static final String TABLE= MdbcTestUtils.mariaDBDatabaseName+".PERSONS";
@@ -83,7 +85,7 @@ public class OwnershipAndCheckpointTest {
@BeforeClass
- public static void init() throws MusicServiceException, ClassNotFoundException, ManagedProcessException {
+ public static void init() throws MusicServiceException, ClassNotFoundException, ManagedProcessException, MDBCServiceException {
MdbcTestUtils.initCassandra();
Class.forName("org.mariadb.jdbc.Driver");
//start embedded mariadb
@@ -94,7 +96,7 @@ public class OwnershipAndCheckpointTest {
public static void close() throws MusicServiceException, MusicQueryException, ManagedProcessException {
//TODO: shutdown cassandra
musicMixin=null;
- MdbcTestUtils.cleanDatabase(DBType.MySQL);
+ MdbcTestUtils.stopDatabase(DBType.MySQL);
MdbcTestUtils.stopCassandra();
}
@@ -151,7 +153,6 @@ public class OwnershipAndCheckpointTest {
String sqlOperation = "INSERT INTO "+TABLE+" (PersonID,LastName,FirstName,Address,City) VALUES "+
"(1,'SAUREZ','ENRIQUE','GATECH','ATLANTA');";
StagingTable stagingTable = new StagingTable();
- ownAndCheck.reloadAlreadyApplied(partition);
final Statement executeStatement = this.conn.createStatement();
executeStatement.execute(sqlOperation);
this.conn.commit();
@@ -222,9 +223,9 @@ public class OwnershipAndCheckpointTest {
Map<MusicRangeInformationRow, LockResult> locks = new HashMap<>();
if(own.getDag()!=null) {
locks.put(own.getDag().getNode(own.getRangeId()).getRow(),
- new LockResult(own.getRangeId(), own.getOwnerId(), true,
+ new LockResult(own.getRangeId(), own.getLockId(), true,
ranges));
- ownAndCheck.checkpoint(musicMixin, mysqlMixin, own.getDag(), ranges, locks, ownOpId);
+ ownAndCheck.checkpoint(musicMixin, mysqlMixin, own.getDag(), ranges, ownOpId);
}
checkData();
@@ -247,7 +248,7 @@ public class OwnershipAndCheckpointTest {
Map<MusicRangeInformationRow, LockResult> locks = new HashMap<>();
if(own.getDag()!=null) {
locks.put(own.getDag().getNode(own.getRangeId()).getRow(),
- new LockResult(own.getRangeId(), own.getOwnerId(), true,
+ new LockResult(own.getRangeId(), own.getLockId(), true,
ranges));
}
ownAndCheck.warmup(musicMixin,mysqlMixin,ranges);
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/query/QueryProcessorTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/query/QueryProcessorTest.java
index 8d851c7..99e8244 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/query/QueryProcessorTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/query/QueryProcessorTest.java
@@ -98,6 +98,16 @@ public class QueryProcessorTest {
}
@Test
+ public void deleteQuery() throws SQLException {
+ String sqlQuery = "delete from db.employees where personid = 721 and lastname = 'Lastname'";
+ HashMap<String, List<SQLOperation>> expectedOut = new HashMap<>();
+ List<SQLOperation> t1op = new ArrayList<>();
+ t1op.add(SQLOperation.DELETE);
+ expectedOut.put("DB.EMPLOYEES", t1op);
+ assertEquals(expectedOut, QueryProcessor.parseSqlQuery(sqlQuery, null));
+ }
+
+ @Test
public void insertSelect() throws SQLException {
String sqlQuery =
"INSERT INTO table1 (CustomerName, City, Country) SELECT SupplierName, City, Country FROM table2";
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/test/CrossSiteTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/test/CrossSiteTest.java
index d4a7a27..57cbcd6 100755
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/test/CrossSiteTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/test/CrossSiteTest.java
@@ -31,12 +31,11 @@ import java.sql.Statement;
import java.sql.Timestamp;
import java.util.Random;
-import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-
+import org.onap.music.logging.EELFLoggerDelegate;
/**
* This test tests a copy of data from DB1 to DB2. It tests the following H2 data types:
@@ -46,7 +45,7 @@ public class CrossSiteTest extends TestCommon {
private static final String DB_CONNECTION1 = "avatica://" + "mem:db1";
private static final String DB_CONNECTION2 = "avatica://" + "mem:db2";
private static final String KEYSPACE = "CrossSite_Test";
- private final static Logger logger = Logger.getLogger(CrossSiteTest.class);
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CrossSiteTest.class);
private Connection db1, db2;
@@ -96,7 +95,7 @@ public class CrossSiteTest extends TestCommon {
fail("SELECT COUNT(*) produced no result");
}
} catch (Exception e) {
- logger.error(e);
+ logger.error(EELFLoggerDelegate.errorLogger, "Error", e);
e.printStackTrace();
fail("2: " + e.toString());
}
@@ -121,7 +120,7 @@ public class CrossSiteTest extends TestCommon {
fail("SELECT COUNT(*) produced no result");
}
} catch (Exception e) {
- logger.error(e);
+ logger.error(EELFLoggerDelegate.errorLogger, "Error", e);
e.printStackTrace();
fail("2: " + e.toString());
}
@@ -158,7 +157,7 @@ public class CrossSiteTest extends TestCommon {
fail("SELECT COUNT(*) produced no result");
}
} catch (Exception e) {
- logger.error(e);
+ logger.error(EELFLoggerDelegate.errorLogger, "Error", e);
e.printStackTrace();
fail("2: " + e.toString());
}
@@ -214,7 +213,7 @@ public class CrossSiteTest extends TestCommon {
fail("SELECT * FROM DATATYPES");
}
} catch (Exception ex) {
- logger.error(ex);
+ logger.error(EELFLoggerDelegate.errorLogger, "Error", ex);
ex.printStackTrace();
fail("2: " + ex.toString());
}
@@ -252,7 +251,7 @@ public class CrossSiteTest extends TestCommon {
fail("SELECT * FROM DATATYPES");
}
} catch (Exception ex) {
- logger.error(ex);
+ logger.error(EELFLoggerDelegate.errorLogger, "Error", ex);
ex.printStackTrace();
fail("testIdentity 2: " + ex.toString());
}
@@ -320,7 +319,7 @@ public class CrossSiteTest extends TestCommon {
fail("SELECT * FROM BLOBTEST");
}
} catch (Exception ex) {
- logger.error(ex);
+ logger.error(EELFLoggerDelegate.errorLogger, "Error", ex);
ex.printStackTrace();
fail("testBLOBColumn 2: " + ex.toString());
}
@@ -358,7 +357,7 @@ public class CrossSiteTest extends TestCommon {
fail("SELECT COUNT(*) produced no result");
}
} catch (Exception e) {
- logger.error(e);
+ logger.error(EELFLoggerDelegate.errorLogger, "Error", e);
e.printStackTrace();
fail("2: " + e.toString());
}
@@ -383,7 +382,7 @@ public class CrossSiteTest extends TestCommon {
fail("SELECT COUNT(*) produced no result");
}
} catch (Exception e) {
- logger.error(e);
+ logger.error(EELFLoggerDelegate.errorLogger, "Error", e);
e.printStackTrace();
fail("2: " + e.toString());
}
@@ -417,7 +416,7 @@ public class CrossSiteTest extends TestCommon {
fail("SELECT COUNT(*) produced no result");
}
} catch (Exception e) {
- logger.error(e);
+ logger.error(EELFLoggerDelegate.errorLogger, "Error", e);
e.printStackTrace();
fail("2: " + e.toString());
}
@@ -442,7 +441,7 @@ public class CrossSiteTest extends TestCommon {
fail("SELECT OTHER produced no result");
}
} catch (Exception e) {
- logger.error(e);
+ logger.error(EELFLoggerDelegate.errorLogger, "Error", e);
e.printStackTrace();
fail("2: " + e.toString());
}
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/test/TransactionTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/test/TransactionTest.java
index 55b0f09..6a2f954 100755
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/test/TransactionTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/test/TransactionTest.java
@@ -29,15 +29,15 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
-import org.apache.log4j.Logger;
import org.junit.Test;
+import org.onap.music.logging.EELFLoggerDelegate;
public class TransactionTest extends TestCommon {
private static final String DB_CONNECTION1 = "avatica://" + "mem:db1";
private static final String DB_CONNECTION2 = "avatica://" + "mem:db2";
private static final String KEYSPACE = "CrossSite_Test";
- private final static Logger logger = Logger.getLogger(CrossSiteTest.class);
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TransactionTest.class);
//@Test
public void testWithAutocommitTrue() {
@@ -175,7 +175,7 @@ public class TransactionTest extends TestCommon {
fail("missing element: "+t);
}
} catch (Exception e) {
- logger.error(e);
+ logger.error(EELFLoggerDelegate.errorLogger, "Error", e);
e.printStackTrace();
fail("2: " + e.toString());
}
diff --git a/pom.xml b/pom.xml
index 14f132c..4238ee4 100755
--- a/pom.xml
+++ b/pom.xml
@@ -228,17 +228,6 @@
<version>1.1.7</version>
</dependency>
<dependency>
- <groupId>org.onap.music</groupId>
- <artifactId>dev-MUSIC-cassandra</artifactId>
- <version>3.2.1</version>
- <exclusions>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.1.30.Final</version>