aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMohammad Salehe <salehe@cs.toronto.edu>2018-12-19 22:31:11 -0500
committerMohammad Salehe <salehe@cs.toronto.edu>2018-12-22 15:07:11 -0500
commit4f0f883f3781e291fd10ad485efd5f850052cb66 (patch)
tree1e57c1362de91c0a13592699f8a4b0e219878dba
parent71336645ddc340487ca2dc0053e46a6387b4e542 (diff)
Add retry logic for acquireLock in atomicPut
acquireLock might fail right after generating a new lock reference because we are not making a SERIAL get. atomicPut should keep retrying until it acquires the lock. Change-Id: I7b0f85a0d0229e28a56cdd41ec69fcde8d8398fe Issue-ID: MUSIC-148 Signed-off-by: Mohammad Salehe <salehe@cs.toronto.edu>
-rw-r--r--src/main/java/org/onap/music/datastore/MusicDataStore.java24
-rw-r--r--src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java6
-rw-r--r--src/main/java/org/onap/music/service/impl/MusicCassaCore.java70
3 files changed, 72 insertions, 28 deletions
diff --git a/src/main/java/org/onap/music/datastore/MusicDataStore.java b/src/main/java/org/onap/music/datastore/MusicDataStore.java
index a10f31e3..7b4e6700 100644
--- a/src/main/java/org/onap/music/datastore/MusicDataStore.java
+++ b/src/main/java/org/onap/music/datastore/MusicDataStore.java
@@ -52,6 +52,7 @@ public class MusicDataStore {
public static final String CONSISTENCY_LEVEL_ONE = "ONE";
public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
+ public static final String CONSISTENCY_LEVEL_SERIAL = "SERIAL";
private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
@@ -391,6 +392,9 @@ public class MusicDataStore {
else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
}
+ else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_SERIAL)) {
+ statement.setConsistencyLevel(ConsistencyLevel.SERIAL);
+ }
results = session.execute(statement);
@@ -403,11 +407,11 @@ public class MusicDataStore {
/**
* This method performs DDL operations on Cassandra using consistency level ONE.
- *
+ *
* @param queryObject Object containing cassandra prepared query and values.
*/
public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
- throws MusicServiceException, MusicQueryException {
+ throws MusicServiceException, MusicQueryException {
TimeMeasureInstance.instance().enter("executeOneConsistencyGet");
try {
return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
@@ -418,6 +422,22 @@ public class MusicDataStore {
}
/**
+ * This method performs DDL operations on Cassandra using consistency level ONE.
+ *
+ * @param queryObject Object containing cassandra prepared query and values.
+ */
+ public ResultSet executeSerialConsistencyGet(PreparedQueryObject queryObject)
+ throws MusicServiceException, MusicQueryException {
+ TimeMeasureInstance.instance().enter("executeOneConsistencyGet");
+ try {
+ return executeGet(queryObject, CONSISTENCY_LEVEL_SERIAL);
+ }
+ finally {
+ TimeMeasureInstance.instance().exit();
+ }
+ }
+
+ /**
*
* This method performs DDL operation on Cassandra using consistency level QUORUM.
*
diff --git a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
index 51a78264..58163a8f 100644
--- a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
+++ b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
@@ -201,7 +201,7 @@ public class CassaLockStore {
* @param keyspace of the application.
* @param table of the application.
* @param key is the primary key of the application table
- * @return the UUID lock reference.
+ * @return the lock reference.
* @throws MusicServiceException
* @throws MusicQueryException
*/
@@ -217,11 +217,13 @@ public class CassaLockStore {
queryObject.appendQueryString(selectQuery);
ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
Row row = results.one();
+ if (row == null)
+ return null;
String lockReference = "" + row.getLong("lockReference");
String createTime = row.getString("createTime");
String acquireTime = row.getString("acquireTime");
- return new LockObject(lockReference, createTime,acquireTime);
+ return new LockObject(lockReference, createTime, acquireTime);
}
finally {
TimeMeasureInstance.instance().exit();
diff --git a/src/main/java/org/onap/music/service/impl/MusicCassaCore.java b/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
index 79662822..253fe51f 100644
--- a/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
+++ b/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
@@ -141,9 +141,13 @@ public class MusicCassaCore implements MusicCoreService {
String primaryKeyValue = splitString[2];
LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
-
- /* Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the
- * reference*/
+
+ // No information about lock holder
+ if (currentLockHolderObject == null)
+ return;
+
+ // Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the
+ // reference
long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.acquireTime), Long.parseLong(currentLockHolderObject.createTime));
if((System.currentTimeMillis() - referenceTime) > leasePeriod) {
@@ -158,24 +162,33 @@ public class MusicCassaCore implements MusicCoreService {
TimeMeasureInstance.instance().enter("isTopOfLockStore");
try {
// return failure to lock holders too early or already evicted from the lock store
- String topOfLockStoreS =
- getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
+ LockObject lockObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
+
+ // TODO: differentiate between "not yet" and "no longer" in return value, so that the caller knows
+ // whether it should retry or not
+ if (lockObject == null) {
+ logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not in local lock queue yet");
+ return new ReturnType(ResultType.FAILURE, lockReference+" is not in local lock queue yet");
+ }
+
+ String topOfLockStoreS = lockObject.lockRef;
long topOfLockStoreL = Long.parseLong(topOfLockStoreS);
long lockReferenceL = Long.parseLong(lockReference);
+ // TODO: differentiate between "not yet" and "no longer" in return value, so that the caller knows
+ // whether it should retry or not
if (lockReferenceL > topOfLockStoreL) {
- // only need to check if this is a read lock....
- if (getLockingServiceHandle().isTopOfLockQueue(keyspace, table, primaryKeyValue,
- lockReference)) {
- return new ReturnType(ResultType.SUCCESS, lockReference + " can read the values");
- }
+ // only need to check if this is a read lock....
+ if (getLockingServiceHandle().isTopOfLockQueue(keyspace, table, primaryKeyValue,
+ lockReference)) {
+ return new ReturnType(ResultType.SUCCESS, lockReference + " can read the values");
+ }
logger.info(EELFLoggerDelegate.applicationLogger,
lockReference + " is not the lock holder yet");
return new ReturnType(ResultType.FAILURE,
lockReference + " is not the lock holder yet");
}
-
if (lockReferenceL < topOfLockStoreL) {
logger.info(EELFLoggerDelegate.applicationLogger,
lockReference + " is no longer/or was never in the lock store queue");
@@ -201,9 +214,9 @@ public class MusicCassaCore implements MusicCoreService {
ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference);
if (result.getResult().equals(ResultType.FAILURE))
- return result;//not top of the lock store q
+ return result; // not top of the lock store q
- //check to see if the value of the key has to be synced in case there was a forceful release
+ // check to see if the value of the key has to be synced in case there was a forceful release
String syncTable = keyspace + ".unsyncedKeys_" + table;
String query = "select * from " + syncTable + " where key='" + fullyQualifiedKey + "';";
PreparedQueryObject readQueryObject = new PreparedQueryObject();
@@ -353,13 +366,14 @@ public class MusicCassaCore implements MusicCoreService {
* @param fullyQualifiedKey lockName
* @return
*/
- public String whoseTurnIsIt(String fullyQualifiedKey) {
+ public String whoseTurnIsIt(String fullyQualifiedKey) {
String[] splitString = fullyQualifiedKey.split("\\.");
String keyspace = splitString[0];
String table = splitString[1];
String primaryKeyValue = splitString[2];
try {
- return getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
+ LockObject lockObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
+ return lockObject != null ? lockObject.lockRef : null;
} catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
}
@@ -476,7 +490,7 @@ public class MusicCassaCore implements MusicCoreService {
try {
ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference);
if (result.getResult().equals(ResultType.FAILURE))
- return result;//not top of the lock store q
+ return result; // not top of the lock store q
if (conditionInfo != null)
try {
@@ -573,7 +587,7 @@ public class MusicCassaCore implements MusicCoreService {
try {
ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference);
if(result.getResult().equals(ResultType.FAILURE))
- return null;//not top of the lock store q
+ return null; // not top of the lock store q
results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
} catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
@@ -601,16 +615,24 @@ public class MusicCassaCore implements MusicCoreService {
String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
String lockReference = createLockReference(fullyQualifiedKey);
long lockCreationTime = System.currentTimeMillis();
- ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockReference);
+ ReturnType lockAcqResult;
+
+ int tries = 0;
+ do {
+ lockAcqResult = acquireLock(fullyQualifiedKey, lockReference);
+ tries++;
+ } while (!lockAcqResult.getResult().equals(ResultType.SUCCESS));
+
long lockAcqTime = System.currentTimeMillis();
- if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
- logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockReference);
- voluntaryReleaseLock(fullyQualifiedKey, lockReference);
- return lockAcqResult;
- }
+// if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+// logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockReference);
+// voluntaryReleaseLock(fullyQualifiedKey, lockReference);
+// return lockAcqResult;
+// }
- logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockReference);
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "acquired lock with id " + lockReference + " after " + tries + "tries");
ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
queryObject, lockReference, conditionInfo);
long criticalPutTime = System.currentTimeMillis();