diff options
Diffstat (limited to 'mdbc-server/src/main/java/org/onap/music/mdbc')
8 files changed, 108 insertions, 19 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java index 42864ea..f662207 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java @@ -44,6 +44,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Executor; import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.exceptions.MusicDeadlockException; import org.onap.music.exceptions.QueryException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.logging.format.AppMessages; @@ -89,6 +90,7 @@ public class MdbcConnection implements Connection { private DatabasePartition partition; /** ranges needed for this transaction */ private Set<Range> rangesUsed; + private String ownerId = UUID.randomUUID().toString(); public MdbcConnection(String id, String url, Connection c, Properties info, MusicInterface mi, TxCommitProgress progressKeeper, DatabasePartition partition, StateManager statemanager) throws MDBCServiceException { @@ -609,7 +611,7 @@ public class MdbcConnection implements Connection { OwnershipAndCheckpoint ownAndCheck = statemanager.getOwnAndCheck(); UUID ownOpId = MDBCUtils.generateTimebasedUniqueKey(); try { - final OwnershipReturn ownershipReturn = ownAndCheck.own(mi, ranges, partition, ownOpId, lockType); + final OwnershipReturn ownershipReturn = ownAndCheck.own(mi, ranges, partition, ownOpId, lockType, ownerId); if(ownershipReturn==null){ return null; } @@ -624,8 +626,20 @@ public class MdbcConnection implements Connection { newPartition = new DatabasePartition(ownershipReturn.getRanges(), ownershipReturn.getRangeId(), ownershipReturn.getOwnerId()); } - } - finally{ + } catch (MDBCServiceException e) { + MusicDeadlockException de = Utils.getDeadlockException(e); + if (de!=null) { + //release all partitions + mi.releaseAllLocksForOwner(de.getOwner(), de.getKeyspace(), de.getTable()); + //rollback transaction + try { + rollback(); + } catch (SQLException e1) { + throw new MDBCServiceException("Failed to rollback transaction after detecting deadlock while taking ownership of table, which, wow", e1); + } + } + throw e; + } finally { ownAndCheck.stopOwnershipTimeoutClock(ownOpId); } return newPartition; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index 5b2b8df..04ac789 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -115,6 +115,7 @@ public class StateManager { //\fixme this might not be used, delete? try { info.load(this.getClass().getClassLoader().getResourceAsStream("music.properties")); + info.load(this.getClass().getClassLoader().getResourceAsStream("key.properties")); info.putAll(MDBCUtils.getMdbcProperties()); } catch (IOException e) { logger.error(EELFLoggerDelegate.errorLogger, e.getMessage()); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java index 496f48d..736e579 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java @@ -34,6 +34,7 @@ import java.io.InputStream; import java.util.*; import org.onap.music.mdbc.mixins.MusicInterface; import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.service.impl.MusicCassaCore; public class TestUtils { @@ -55,7 +56,7 @@ public class TestUtils { public static void unlockRow(String keyspace, String mriTableName, DatabasePartition partition) throws MusicLockingException { String fullyQualifiedMriKey = keyspace+"."+ mriTableName+"."+partition.getMRIIndex().toString(); - MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId()); + MusicLockState musicLockState = MusicCassaCore.getInstance().voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId()); } public static void createKeyspace(String keyspace, Session session) { @@ -142,7 +143,7 @@ public class TestUtils { } } - +/* public static void populateMusicUtilsWithProperties(Properties prop){ //TODO: Learn how to do this properly within music String[] propKeys = MusicUtil.getPropkeys(); @@ -207,6 +208,6 @@ public class TestUtils { } } - } +*/ } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java index 7a09dca..f4f4820 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Properties; + +import org.onap.music.exceptions.MusicDeadlockException; import org.onap.music.logging.EELFLoggerDelegate; public class Utils { @@ -77,4 +79,12 @@ public class Utils { } } } + + public static MusicDeadlockException getDeadlockException(Throwable t) { + while (t!=null) { + if (t instanceof MusicDeadlockException) return (MusicDeadlockException)t; + t = t.getCause(); + } + return null; + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java index 3afc726..a4e3e08 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java @@ -321,10 +321,13 @@ public interface MusicInterface { void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException; String createLock(LockRequest request) throws MDBCServiceException; + String createLock(LockRequest request, String ownerId) throws MDBCServiceException; LockResult acquireLock(LockRequest request, String lockId) throws MDBCServiceException; void releaseLocks(Map<UUID, LockResult> newLocks) throws MDBCServiceException; + public void releaseAllLocksForOwner(String owner, String keyspace, String table) throws MDBCServiceException; + /** * Combine previous musicrangeinformation rows for new partition, if necessary * @@ -362,8 +365,9 @@ public interface MusicInterface { * This is an eventual operation for minimal performance hits * @param r * @param playbackPointer + * @throws MDBCServiceException */ - public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer); + public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) throws MDBCServiceException; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index 20e1d5d..264b320 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -51,8 +51,10 @@ import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.exceptions.MusicLockingException; import org.onap.music.exceptions.MusicQueryException; import org.onap.music.exceptions.MusicServiceException; +import org.onap.music.lockingservice.cassandra.LockType; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.main.MusicCore; +import org.onap.music.main.CorePropertiesLoader; import org.onap.music.main.ResultType; import org.onap.music.main.ReturnType; import org.onap.music.mdbc.DatabasePartition; @@ -70,6 +72,7 @@ import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.RangeDependency; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.TxCommitProgress; +import org.onap.music.service.impl.MusicCassaCore; /** * This class provides the methods that MDBC needs to access Cassandra directly in order to provide persistence @@ -212,6 +215,8 @@ public class MusicMixin implements MusicInterface { } public MusicMixin(StateManager stateManager, String mdbcServerName, Properties info) throws MDBCServiceException { + CorePropertiesLoader.loadProperties(info); + // Default values -- should be overridden in the Properties // Default to using the host_ids of the various peers as the replica IDs (this is probably preferred) this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS); @@ -289,6 +294,8 @@ public class MusicMixin implements MusicInterface { throw new MDBCServiceException("Error creating namespace: "+keyspace+". Internal error:"+e.getErrorMessage(), e); } + } catch (MusicQueryException e) { + throw new MDBCServiceException(e); } } @@ -1213,7 +1220,11 @@ public class MusicMixin implements MusicInterface { newLockId = currentLockRef.get(pending.getKey()); success = (MusicCore.whoseTurnIsIt(newFullyQualifiedKey) == newLockId); } else { - newLockId = MusicCore.createLockReference(newFullyQualifiedKey); + try { + newLockId = MusicCore.createLockReference(newFullyQualifiedKey); + } catch (MusicLockingException e) { + throw new MDBCServiceException(e); + } ReturnType newLockReturn = acquireLock(fullyQualifiedKey, lockId); success = newLockReturn.getResult().compareTo(ResultType.SUCCESS) == 0; } @@ -1238,7 +1249,11 @@ public class MusicMixin implements MusicInterface { protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException { UUID mriIndex = partition.getMRIIndex(); String lockId; - lockId = MusicCore.createLockReference(fullyQualifiedKey); + try { + lockId = MusicCore.createLockReference(fullyQualifiedKey); + } catch (MusicLockingException e1) { + throw new MDBCServiceException(e1); + } if(lockId==null) { throw new MDBCServiceException("lock reference is null"); } @@ -1833,6 +1848,8 @@ public class MusicMixin implements MusicInterface { } catch (MusicServiceException e) { logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for digest id "+digestId.toString()+ "with error "+e.getErrorMessage()); throw new MDBCServiceException("Transaction Digest serialization for digest id "+digestId.toString(), e); + } catch (MusicQueryException e) { + throw new MDBCServiceException(e); } } @@ -1859,6 +1876,8 @@ public class MusicMixin implements MusicInterface { } catch (MusicServiceException e) { logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.transactionId.toString()+ "with error "+e.getErrorMessage()); throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.transactionId.toString(), e); + } catch (MusicQueryException e) { + throw new MDBCServiceException(e); } } @@ -2016,7 +2035,7 @@ public class MusicMixin implements MusicInterface { private void unlockKeyInMusic(String table, String key, String lockref) throws MDBCServiceException { String fullyQualifiedKey= music_ns+"."+ table+"."+key; try { - MusicCore.voluntaryReleaseLock(fullyQualifiedKey,lockref); + MusicCassaCore.getInstance().voluntaryReleaseLock(fullyQualifiedKey,lockref); } catch (MusicLockingException e) { throw new MDBCServiceException(e.getMessage(), e); } @@ -2055,6 +2074,15 @@ public class MusicMixin implements MusicInterface { } } + @Override + public void releaseAllLocksForOwner(String ownerId, String keyspace, String table) throws MDBCServiceException { + try { + MusicCore.releaseAllLocksForOwner(ownerId, keyspace, table); + } catch (MusicLockingException | MusicServiceException | MusicQueryException e) { + throw new MDBCServiceException(e); + } + } + /** * Get a list of ranges and their range dependencies * @param range @@ -2076,9 +2104,19 @@ public class MusicMixin implements MusicInterface { @Override public String createLock(LockRequest request) throws MDBCServiceException{ + return createLock(request, null); + } + + @Override + public String createLock(LockRequest request, String ownerId) throws MDBCServiceException{ String fullyQualifiedKey= music_ns+"."+ musicRangeInformationTableName + "." + request.getId(); boolean isWrite = (request.getLockType()==SQLOperationType.WRITE); - String lockId = MusicCore.createLockReference(fullyQualifiedKey, isWrite); + String lockId; + try { + lockId = MusicCore.createLockReference(fullyQualifiedKey, isWrite?LockType.WRITE:LockType.READ, ownerId); + } catch (MusicLockingException e) { + throw new MDBCServiceException(e); + } return lockId; } @@ -2558,7 +2596,7 @@ public class MusicMixin implements MusicInterface { } @Override - public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) { + public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) throws MDBCServiceException { String cql = String.format("INSERT INTO %s.%s (mdbcnode, mridigest, digestindex) VALUES (" + this.myId + ", " + playbackPointer.getLeft() + ", " + playbackPointer.getRight() + ");", music_ns, this.musicMdbcCheckpointsTableName); @@ -2568,8 +2606,10 @@ public class MusicMixin implements MusicInterface { MusicCore.nonKeyRelatedPut(pQueryObject,"eventual"); } catch (MusicServiceException e) { logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to update the checkpoint location", e); + } catch (MusicQueryException e) { + throw new MDBCServiceException(e); } } - + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java index 6d2d4cf..a4706fd 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java @@ -33,7 +33,7 @@ import net.sf.jsqlparser.statement.delete.Delete; import net.sf.jsqlparser.statement.insert.Insert; import net.sf.jsqlparser.statement.update.Update; import org.apache.commons.lang3.tuple.Pair; -import org.apache.zookeeper.KeeperException.UnimplementedException; +//import org.apache.zookeeper.KeeperException.UnimplementedException; import org.json.JSONObject; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java index 00180a0..ec32210 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java @@ -27,9 +27,11 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang3.tuple.Pair; import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.exceptions.MusicDeadlockException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.Utils; import org.onap.music.mdbc.mixins.DBInterface; import org.onap.music.mdbc.mixins.LockRequest; import org.onap.music.mdbc.mixins.LockResult; @@ -275,8 +277,9 @@ public class OwnershipAndCheckpoint{ * @param r * @param partitionIndex * @param index + * @throws MDBCServiceException */ - private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, MusicTxDigestId txdigest) { + private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, MusicTxDigestId txdigest) throws MDBCServiceException { dbi.updateCheckpointLocations(r, Pair.of(partitionIndex, txdigest.index)); mi.updateCheckpointLocations(r, Pair.of(partitionIndex, txdigest.index)); } @@ -322,7 +325,11 @@ public class OwnershipAndCheckpoint{ */ public OwnershipReturn own(MusicInterface mi, Set<Range> ranges, DatabasePartition currPartition, UUID opId, SQLOperationType lockType) throws MDBCServiceException { - + return own(mi, ranges, currPartition, opId, lockType, null); + } + + public OwnershipReturn own(MusicInterface mi, Set<Range> ranges, + DatabasePartition currPartition, UUID opId, SQLOperationType lockType, String ownerId) throws MDBCServiceException { if (ranges == null || ranges.isEmpty()) { return null; } @@ -342,7 +349,19 @@ public class OwnershipAndCheckpoint{ while ( (toOwn.isDifferent(currentlyOwn) || !currentlyOwn.isOwned() ) && !timeout(opId) ) { - takeOwnershipOfDag(mi, currPartition, opId, locksForOwnership, toOwn, lockType); + try { + takeOwnershipOfDag(mi, currPartition, opId, locksForOwnership, toOwn, lockType, ownerId); + } catch (MDBCServiceException e) { + MusicDeadlockException de = Utils.getDeadlockException(e); + if (de!=null) { +// System.out.println("IN O&C.OWN, DETECTED DEADLOCK, REMOVING " + currPartition + ", RELEASING " + locksForOwnership); + locksForOwnership.remove(currPartition.getMRIIndex()); + mi.releaseLocks(locksForOwnership); + stopOwnershipTimeoutClock(opId); + logger.error("Error when owning a range: Deadlock detected"); + } + throw e; + } currentlyOwn=toOwn; //TODO instead of comparing dags, compare rows rangesToOwnRows = extractRowsForRange(mi, rangesToOwn, false); @@ -373,7 +392,7 @@ public class OwnershipAndCheckpoint{ * @throws MDBCServiceException */ private void takeOwnershipOfDag(MusicInterface mi, DatabasePartition partition, UUID opId, - Map<UUID, LockResult> ownershipLocks, Dag toOwn, SQLOperationType lockType) throws MDBCServiceException { + Map<UUID, LockResult> ownershipLocks, Dag toOwn, SQLOperationType lockType, String ownerId) throws MDBCServiceException { while(toOwn.hasNextToOwn()){ DagNode node = toOwn.nextToOwn(); @@ -394,7 +413,7 @@ public class OwnershipAndCheckpoint{ } else { LockRequest request = new LockRequest(uuidToOwn, new ArrayList<>(node.getRangeSet()), lockType); - String lockId = mi.createLock(request); + String lockId = mi.createLock(request, ownerId); LockResult result = null; boolean owned = false; while(!owned && !timeout(opId)){ |