From 4f0f883f3781e291fd10ad485efd5f850052cb66 Mon Sep 17 00:00:00 2001 From: Mohammad Salehe Date: Wed, 19 Dec 2018 22:31:11 -0500 Subject: Add retry logic for acquireLock in atomicPut acquireLock might fail right after generating a new lock reference because we are not making a SERIAL get. atomicPut should keep retrying until it acquires the lock. Change-Id: I7b0f85a0d0229e28a56cdd41ec69fcde8d8398fe Issue-ID: MUSIC-148 Signed-off-by: Mohammad Salehe --- .../org/onap/music/datastore/MusicDataStore.java | 24 +++++++- .../lockingservice/cassandra/CassaLockStore.java | 6 +- .../onap/music/service/impl/MusicCassaCore.java | 70 ++++++++++++++-------- 3 files changed, 72 insertions(+), 28 deletions(-) diff --git a/src/main/java/org/onap/music/datastore/MusicDataStore.java b/src/main/java/org/onap/music/datastore/MusicDataStore.java index a10f31e3..7b4e6700 100644 --- a/src/main/java/org/onap/music/datastore/MusicDataStore.java +++ b/src/main/java/org/onap/music/datastore/MusicDataStore.java @@ -52,6 +52,7 @@ public class MusicDataStore { public static final String CONSISTENCY_LEVEL_ONE = "ONE"; public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM"; + public static final String CONSISTENCY_LEVEL_SERIAL = "SERIAL"; private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class); @@ -391,6 +392,9 @@ public class MusicDataStore { else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) { statement.setConsistencyLevel(ConsistencyLevel.QUORUM); } + else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_SERIAL)) { + statement.setConsistencyLevel(ConsistencyLevel.SERIAL); + } results = session.execute(statement); @@ -403,11 +407,11 @@ public class MusicDataStore { /** * This method performs DDL operations on Cassandra using consistency level ONE. - * + * * @param queryObject Object containing cassandra prepared query and values. */ public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject) - throws MusicServiceException, MusicQueryException { + throws MusicServiceException, MusicQueryException { TimeMeasureInstance.instance().enter("executeOneConsistencyGet"); try { return executeGet(queryObject, CONSISTENCY_LEVEL_ONE); @@ -417,6 +421,22 @@ public class MusicDataStore { } } + /** + * This method performs DDL operations on Cassandra using consistency level ONE. + * + * @param queryObject Object containing cassandra prepared query and values. + */ + public ResultSet executeSerialConsistencyGet(PreparedQueryObject queryObject) + throws MusicServiceException, MusicQueryException { + TimeMeasureInstance.instance().enter("executeOneConsistencyGet"); + try { + return executeGet(queryObject, CONSISTENCY_LEVEL_SERIAL); + } + finally { + TimeMeasureInstance.instance().exit(); + } + } + /** * * This method performs DDL operation on Cassandra using consistency level QUORUM. diff --git a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java index 51a78264..58163a8f 100644 --- a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java +++ b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java @@ -201,7 +201,7 @@ public class CassaLockStore { * @param keyspace of the application. * @param table of the application. * @param key is the primary key of the application table - * @return the UUID lock reference. + * @return the lock reference. * @throws MusicServiceException * @throws MusicQueryException */ @@ -217,11 +217,13 @@ public class CassaLockStore { queryObject.appendQueryString(selectQuery); ResultSet results = dsHandle.executeOneConsistencyGet(queryObject); Row row = results.one(); + if (row == null) + return null; String lockReference = "" + row.getLong("lockReference"); String createTime = row.getString("createTime"); String acquireTime = row.getString("acquireTime"); - return new LockObject(lockReference, createTime,acquireTime); + return new LockObject(lockReference, createTime, acquireTime); } finally { TimeMeasureInstance.instance().exit(); diff --git a/src/main/java/org/onap/music/service/impl/MusicCassaCore.java b/src/main/java/org/onap/music/service/impl/MusicCassaCore.java index 79662822..253fe51f 100644 --- a/src/main/java/org/onap/music/service/impl/MusicCassaCore.java +++ b/src/main/java/org/onap/music/service/impl/MusicCassaCore.java @@ -141,9 +141,13 @@ public class MusicCassaCore implements MusicCoreService { String primaryKeyValue = splitString[2]; LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue); - - /* Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the - * reference*/ + + // No information about lock holder + if (currentLockHolderObject == null) + return; + + // Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the + // reference long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.acquireTime), Long.parseLong(currentLockHolderObject.createTime)); if((System.currentTimeMillis() - referenceTime) > leasePeriod) { @@ -158,24 +162,33 @@ public class MusicCassaCore implements MusicCoreService { TimeMeasureInstance.instance().enter("isTopOfLockStore"); try { // return failure to lock holders too early or already evicted from the lock store - String topOfLockStoreS = - getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef; + LockObject lockObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue); + + // TODO: differentiate between "not yet" and "no longer" in return value, so that the caller knows + // whether it should retry or not + if (lockObject == null) { + logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not in local lock queue yet"); + return new ReturnType(ResultType.FAILURE, lockReference+" is not in local lock queue yet"); + } + + String topOfLockStoreS = lockObject.lockRef; long topOfLockStoreL = Long.parseLong(topOfLockStoreS); long lockReferenceL = Long.parseLong(lockReference); + // TODO: differentiate between "not yet" and "no longer" in return value, so that the caller knows + // whether it should retry or not if (lockReferenceL > topOfLockStoreL) { - // only need to check if this is a read lock.... - if (getLockingServiceHandle().isTopOfLockQueue(keyspace, table, primaryKeyValue, - lockReference)) { - return new ReturnType(ResultType.SUCCESS, lockReference + " can read the values"); - } + // only need to check if this is a read lock.... + if (getLockingServiceHandle().isTopOfLockQueue(keyspace, table, primaryKeyValue, + lockReference)) { + return new ReturnType(ResultType.SUCCESS, lockReference + " can read the values"); + } logger.info(EELFLoggerDelegate.applicationLogger, lockReference + " is not the lock holder yet"); return new ReturnType(ResultType.FAILURE, lockReference + " is not the lock holder yet"); } - if (lockReferenceL < topOfLockStoreL) { logger.info(EELFLoggerDelegate.applicationLogger, lockReference + " is no longer/or was never in the lock store queue"); @@ -201,9 +214,9 @@ public class MusicCassaCore implements MusicCoreService { ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference); if (result.getResult().equals(ResultType.FAILURE)) - return result;//not top of the lock store q + return result; // not top of the lock store q - //check to see if the value of the key has to be synced in case there was a forceful release + // check to see if the value of the key has to be synced in case there was a forceful release String syncTable = keyspace + ".unsyncedKeys_" + table; String query = "select * from " + syncTable + " where key='" + fullyQualifiedKey + "';"; PreparedQueryObject readQueryObject = new PreparedQueryObject(); @@ -353,13 +366,14 @@ public class MusicCassaCore implements MusicCoreService { * @param fullyQualifiedKey lockName * @return */ - public String whoseTurnIsIt(String fullyQualifiedKey) { + public String whoseTurnIsIt(String fullyQualifiedKey) { String[] splitString = fullyQualifiedKey.split("\\."); String keyspace = splitString[0]; String table = splitString[1]; String primaryKeyValue = splitString[2]; try { - return getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef; + LockObject lockObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue); + return lockObject != null ? lockObject.lockRef : null; } catch (MusicLockingException | MusicServiceException | MusicQueryException e) { logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); } @@ -476,7 +490,7 @@ public class MusicCassaCore implements MusicCoreService { try { ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference); if (result.getResult().equals(ResultType.FAILURE)) - return result;//not top of the lock store q + return result; // not top of the lock store q if (conditionInfo != null) try { @@ -573,7 +587,7 @@ public class MusicCassaCore implements MusicCoreService { try { ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference); if(result.getResult().equals(ResultType.FAILURE)) - return null;//not top of the lock store q + return null; // not top of the lock store q results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject); } catch (MusicQueryException | MusicServiceException | MusicLockingException e) { logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR); @@ -601,16 +615,24 @@ public class MusicCassaCore implements MusicCoreService { String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey; String lockReference = createLockReference(fullyQualifiedKey); long lockCreationTime = System.currentTimeMillis(); - ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockReference); + ReturnType lockAcqResult; + + int tries = 0; + do { + lockAcqResult = acquireLock(fullyQualifiedKey, lockReference); + tries++; + } while (!lockAcqResult.getResult().equals(ResultType.SUCCESS)); + long lockAcqTime = System.currentTimeMillis(); - if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) { - logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockReference); - voluntaryReleaseLock(fullyQualifiedKey, lockReference); - return lockAcqResult; - } +// if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) { +// logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockReference); +// voluntaryReleaseLock(fullyQualifiedKey, lockReference); +// return lockAcqResult; +// } - logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockReference); + logger.info(EELFLoggerDelegate.applicationLogger, + "acquired lock with id " + lockReference + " after " + tries + "tries"); ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, queryObject, lockReference, conditionInfo); long criticalPutTime = System.currentTimeMillis(); -- cgit 1.2.3-korg