aboutsummaryrefslogtreecommitdiffstats
path: root/mdbc-server/src/main/java/org/onap/music/mdbc
diff options
context:
space:
mode:
Diffstat (limited to 'mdbc-server/src/main/java/org/onap/music/mdbc')
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java20
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java1
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java7
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java10
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java6
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java52
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java2
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java29
8 files changed, 108 insertions, 19 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
index 42864ea..f662207 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
@@ -44,6 +44,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executor;
import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.exceptions.MusicDeadlockException;
import org.onap.music.exceptions.QueryException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.logging.format.AppMessages;
@@ -89,6 +90,7 @@ public class MdbcConnection implements Connection {
private DatabasePartition partition;
/** ranges needed for this transaction */
private Set<Range> rangesUsed;
+ private String ownerId = UUID.randomUUID().toString();
public MdbcConnection(String id, String url, Connection c, Properties info, MusicInterface mi,
TxCommitProgress progressKeeper, DatabasePartition partition, StateManager statemanager) throws MDBCServiceException {
@@ -609,7 +611,7 @@ public class MdbcConnection implements Connection {
OwnershipAndCheckpoint ownAndCheck = statemanager.getOwnAndCheck();
UUID ownOpId = MDBCUtils.generateTimebasedUniqueKey();
try {
- final OwnershipReturn ownershipReturn = ownAndCheck.own(mi, ranges, partition, ownOpId, lockType);
+ final OwnershipReturn ownershipReturn = ownAndCheck.own(mi, ranges, partition, ownOpId, lockType, ownerId);
if(ownershipReturn==null){
return null;
}
@@ -624,8 +626,20 @@ public class MdbcConnection implements Connection {
newPartition = new DatabasePartition(ownershipReturn.getRanges(), ownershipReturn.getRangeId(),
ownershipReturn.getOwnerId());
}
- }
- finally{
+ } catch (MDBCServiceException e) {
+ MusicDeadlockException de = Utils.getDeadlockException(e);
+ if (de!=null) {
+ //release all partitions
+ mi.releaseAllLocksForOwner(de.getOwner(), de.getKeyspace(), de.getTable());
+ //rollback transaction
+ try {
+ rollback();
+ } catch (SQLException e1) {
+ throw new MDBCServiceException("Failed to rollback transaction after detecting deadlock while taking ownership of table, which, wow", e1);
+ }
+ }
+ throw e;
+ } finally {
ownAndCheck.stopOwnershipTimeoutClock(ownOpId);
}
return newPartition;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index 5b2b8df..04ac789 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -115,6 +115,7 @@ public class StateManager {
//\fixme this might not be used, delete?
try {
info.load(this.getClass().getClassLoader().getResourceAsStream("music.properties"));
+ info.load(this.getClass().getClassLoader().getResourceAsStream("key.properties"));
info.putAll(MDBCUtils.getMdbcProperties());
} catch (IOException e) {
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java
index 496f48d..736e579 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java
@@ -34,6 +34,7 @@ import java.io.InputStream;
import java.util.*;
import org.onap.music.mdbc.mixins.MusicInterface;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.service.impl.MusicCassaCore;
public class TestUtils {
@@ -55,7 +56,7 @@ public class TestUtils {
public static void unlockRow(String keyspace, String mriTableName, DatabasePartition partition)
throws MusicLockingException {
String fullyQualifiedMriKey = keyspace+"."+ mriTableName+"."+partition.getMRIIndex().toString();
- MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId());
+ MusicLockState musicLockState = MusicCassaCore.getInstance().voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId());
}
public static void createKeyspace(String keyspace, Session session) {
@@ -142,7 +143,7 @@ public class TestUtils {
}
}
-
+/*
public static void populateMusicUtilsWithProperties(Properties prop){
//TODO: Learn how to do this properly within music
String[] propKeys = MusicUtil.getPropkeys();
@@ -207,6 +208,6 @@ public class TestUtils {
}
}
-
}
+*/
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java
index 7a09dca..f4f4820 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+
+import org.onap.music.exceptions.MusicDeadlockException;
import org.onap.music.logging.EELFLoggerDelegate;
public class Utils {
@@ -77,4 +79,12 @@ public class Utils {
}
}
}
+
+ public static MusicDeadlockException getDeadlockException(Throwable t) {
+ while (t!=null) {
+ if (t instanceof MusicDeadlockException) return (MusicDeadlockException)t;
+ t = t.getCause();
+ }
+ return null;
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
index 3afc726..a4e3e08 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
@@ -321,10 +321,13 @@ public interface MusicInterface {
void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException;
String createLock(LockRequest request) throws MDBCServiceException;
+ String createLock(LockRequest request, String ownerId) throws MDBCServiceException;
LockResult acquireLock(LockRequest request, String lockId) throws MDBCServiceException;
void releaseLocks(Map<UUID, LockResult> newLocks) throws MDBCServiceException;
+ public void releaseAllLocksForOwner(String owner, String keyspace, String table) throws MDBCServiceException;
+
/**
* Combine previous musicrangeinformation rows for new partition, if necessary
*
@@ -362,8 +365,9 @@ public interface MusicInterface {
* This is an eventual operation for minimal performance hits
* @param r
* @param playbackPointer
+ * @throws MDBCServiceException
*/
- public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer);
+ public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) throws MDBCServiceException;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index 20e1d5d..264b320 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -51,8 +51,10 @@ import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.exceptions.MusicLockingException;
import org.onap.music.exceptions.MusicQueryException;
import org.onap.music.exceptions.MusicServiceException;
+import org.onap.music.lockingservice.cassandra.LockType;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.main.MusicCore;
+import org.onap.music.main.CorePropertiesLoader;
import org.onap.music.main.ResultType;
import org.onap.music.main.ReturnType;
import org.onap.music.mdbc.DatabasePartition;
@@ -70,6 +72,7 @@ import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.RangeDependency;
import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.TxCommitProgress;
+import org.onap.music.service.impl.MusicCassaCore;
/**
* This class provides the methods that MDBC needs to access Cassandra directly in order to provide persistence
@@ -212,6 +215,8 @@ public class MusicMixin implements MusicInterface {
}
public MusicMixin(StateManager stateManager, String mdbcServerName, Properties info) throws MDBCServiceException {
+ CorePropertiesLoader.loadProperties(info);
+
// Default values -- should be overridden in the Properties
// Default to using the host_ids of the various peers as the replica IDs (this is probably preferred)
this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS);
@@ -289,6 +294,8 @@ public class MusicMixin implements MusicInterface {
throw new MDBCServiceException("Error creating namespace: "+keyspace+". Internal error:"+e.getErrorMessage(),
e);
}
+ } catch (MusicQueryException e) {
+ throw new MDBCServiceException(e);
}
}
@@ -1213,7 +1220,11 @@ public class MusicMixin implements MusicInterface {
newLockId = currentLockRef.get(pending.getKey());
success = (MusicCore.whoseTurnIsIt(newFullyQualifiedKey) == newLockId);
} else {
- newLockId = MusicCore.createLockReference(newFullyQualifiedKey);
+ try {
+ newLockId = MusicCore.createLockReference(newFullyQualifiedKey);
+ } catch (MusicLockingException e) {
+ throw new MDBCServiceException(e);
+ }
ReturnType newLockReturn = acquireLock(fullyQualifiedKey, lockId);
success = newLockReturn.getResult().compareTo(ResultType.SUCCESS) == 0;
}
@@ -1238,7 +1249,11 @@ public class MusicMixin implements MusicInterface {
protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException {
UUID mriIndex = partition.getMRIIndex();
String lockId;
- lockId = MusicCore.createLockReference(fullyQualifiedKey);
+ try {
+ lockId = MusicCore.createLockReference(fullyQualifiedKey);
+ } catch (MusicLockingException e1) {
+ throw new MDBCServiceException(e1);
+ }
if(lockId==null) {
throw new MDBCServiceException("lock reference is null");
}
@@ -1833,6 +1848,8 @@ public class MusicMixin implements MusicInterface {
} catch (MusicServiceException e) {
logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for digest id "+digestId.toString()+ "with error "+e.getErrorMessage());
throw new MDBCServiceException("Transaction Digest serialization for digest id "+digestId.toString(), e);
+ } catch (MusicQueryException e) {
+ throw new MDBCServiceException(e);
}
}
@@ -1859,6 +1876,8 @@ public class MusicMixin implements MusicInterface {
} catch (MusicServiceException e) {
logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.transactionId.toString()+ "with error "+e.getErrorMessage());
throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.transactionId.toString(), e);
+ } catch (MusicQueryException e) {
+ throw new MDBCServiceException(e);
}
}
@@ -2016,7 +2035,7 @@ public class MusicMixin implements MusicInterface {
private void unlockKeyInMusic(String table, String key, String lockref) throws MDBCServiceException {
String fullyQualifiedKey= music_ns+"."+ table+"."+key;
try {
- MusicCore.voluntaryReleaseLock(fullyQualifiedKey,lockref);
+ MusicCassaCore.getInstance().voluntaryReleaseLock(fullyQualifiedKey,lockref);
} catch (MusicLockingException e) {
throw new MDBCServiceException(e.getMessage(), e);
}
@@ -2055,6 +2074,15 @@ public class MusicMixin implements MusicInterface {
}
}
+ @Override
+ public void releaseAllLocksForOwner(String ownerId, String keyspace, String table) throws MDBCServiceException {
+ try {
+ MusicCore.releaseAllLocksForOwner(ownerId, keyspace, table);
+ } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
+ throw new MDBCServiceException(e);
+ }
+ }
+
/**
* Get a list of ranges and their range dependencies
* @param range
@@ -2076,9 +2104,19 @@ public class MusicMixin implements MusicInterface {
@Override
public String createLock(LockRequest request) throws MDBCServiceException{
+ return createLock(request, null);
+ }
+
+ @Override
+ public String createLock(LockRequest request, String ownerId) throws MDBCServiceException{
String fullyQualifiedKey= music_ns+"."+ musicRangeInformationTableName + "." + request.getId();
boolean isWrite = (request.getLockType()==SQLOperationType.WRITE);
- String lockId = MusicCore.createLockReference(fullyQualifiedKey, isWrite);
+ String lockId;
+ try {
+ lockId = MusicCore.createLockReference(fullyQualifiedKey, isWrite?LockType.WRITE:LockType.READ, ownerId);
+ } catch (MusicLockingException e) {
+ throw new MDBCServiceException(e);
+ }
return lockId;
}
@@ -2558,7 +2596,7 @@ public class MusicMixin implements MusicInterface {
}
@Override
- public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) {
+ public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) throws MDBCServiceException {
String cql = String.format("INSERT INTO %s.%s (mdbcnode, mridigest, digestindex) VALUES ("
+ this.myId + ", " + playbackPointer.getLeft() + ", " + playbackPointer.getRight() + ");",
music_ns, this.musicMdbcCheckpointsTableName);
@@ -2568,8 +2606,10 @@ public class MusicMixin implements MusicInterface {
MusicCore.nonKeyRelatedPut(pQueryObject,"eventual");
} catch (MusicServiceException e) {
logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to update the checkpoint location", e);
+ } catch (MusicQueryException e) {
+ throw new MDBCServiceException(e);
}
}
-
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java
index 6d2d4cf..a4706fd 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java
@@ -33,7 +33,7 @@ import net.sf.jsqlparser.statement.delete.Delete;
import net.sf.jsqlparser.statement.insert.Insert;
import net.sf.jsqlparser.statement.update.Update;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.zookeeper.KeeperException.UnimplementedException;
+//import org.apache.zookeeper.KeeperException.UnimplementedException;
import org.json.JSONObject;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
index 00180a0..ec32210 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
@@ -27,9 +27,11 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.tuple.Pair;
import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.exceptions.MusicDeadlockException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.Utils;
import org.onap.music.mdbc.mixins.DBInterface;
import org.onap.music.mdbc.mixins.LockRequest;
import org.onap.music.mdbc.mixins.LockResult;
@@ -275,8 +277,9 @@ public class OwnershipAndCheckpoint{
* @param r
* @param partitionIndex
* @param index
+ * @throws MDBCServiceException
*/
- private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, MusicTxDigestId txdigest) {
+ private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, MusicTxDigestId txdigest) throws MDBCServiceException {
dbi.updateCheckpointLocations(r, Pair.of(partitionIndex, txdigest.index));
mi.updateCheckpointLocations(r, Pair.of(partitionIndex, txdigest.index));
}
@@ -322,7 +325,11 @@ public class OwnershipAndCheckpoint{
*/
public OwnershipReturn own(MusicInterface mi, Set<Range> ranges,
DatabasePartition currPartition, UUID opId, SQLOperationType lockType) throws MDBCServiceException {
-
+ return own(mi, ranges, currPartition, opId, lockType, null);
+ }
+
+ public OwnershipReturn own(MusicInterface mi, Set<Range> ranges,
+ DatabasePartition currPartition, UUID opId, SQLOperationType lockType, String ownerId) throws MDBCServiceException {
if (ranges == null || ranges.isEmpty()) {
return null;
}
@@ -342,7 +349,19 @@ public class OwnershipAndCheckpoint{
while ( (toOwn.isDifferent(currentlyOwn) || !currentlyOwn.isOwned() ) &&
!timeout(opId)
) {
- takeOwnershipOfDag(mi, currPartition, opId, locksForOwnership, toOwn, lockType);
+ try {
+ takeOwnershipOfDag(mi, currPartition, opId, locksForOwnership, toOwn, lockType, ownerId);
+ } catch (MDBCServiceException e) {
+ MusicDeadlockException de = Utils.getDeadlockException(e);
+ if (de!=null) {
+// System.out.println("IN O&C.OWN, DETECTED DEADLOCK, REMOVING " + currPartition + ", RELEASING " + locksForOwnership);
+ locksForOwnership.remove(currPartition.getMRIIndex());
+ mi.releaseLocks(locksForOwnership);
+ stopOwnershipTimeoutClock(opId);
+ logger.error("Error when owning a range: Deadlock detected");
+ }
+ throw e;
+ }
currentlyOwn=toOwn;
//TODO instead of comparing dags, compare rows
rangesToOwnRows = extractRowsForRange(mi, rangesToOwn, false);
@@ -373,7 +392,7 @@ public class OwnershipAndCheckpoint{
* @throws MDBCServiceException
*/
private void takeOwnershipOfDag(MusicInterface mi, DatabasePartition partition, UUID opId,
- Map<UUID, LockResult> ownershipLocks, Dag toOwn, SQLOperationType lockType) throws MDBCServiceException {
+ Map<UUID, LockResult> ownershipLocks, Dag toOwn, SQLOperationType lockType, String ownerId) throws MDBCServiceException {
while(toOwn.hasNextToOwn()){
DagNode node = toOwn.nextToOwn();
@@ -394,7 +413,7 @@ public class OwnershipAndCheckpoint{
} else {
LockRequest request = new LockRequest(uuidToOwn,
new ArrayList<>(node.getRangeSet()), lockType);
- String lockId = mi.createLock(request);
+ String lockId = mi.createLock(request, ownerId);
LockResult result = null;
boolean owned = false;
while(!owned && !timeout(opId)){