aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/org/onap/music/datastore/MusicDataStore.java24
-rw-r--r--src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java6
-rw-r--r--src/main/java/org/onap/music/service/impl/MusicCassaCore.java70
3 files changed, 72 insertions, 28 deletions
diff --git a/src/main/java/org/onap/music/datastore/MusicDataStore.java b/src/main/java/org/onap/music/datastore/MusicDataStore.java
index a10f31e3..7b4e6700 100644
--- a/src/main/java/org/onap/music/datastore/MusicDataStore.java
+++ b/src/main/java/org/onap/music/datastore/MusicDataStore.java
@@ -52,6 +52,7 @@ public class MusicDataStore {
public static final String CONSISTENCY_LEVEL_ONE = "ONE";
public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
+ public static final String CONSISTENCY_LEVEL_SERIAL = "SERIAL";
private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
@@ -391,6 +392,9 @@ public class MusicDataStore {
else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
}
+ else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_SERIAL)) {
+ statement.setConsistencyLevel(ConsistencyLevel.SERIAL);
+ }
results = session.execute(statement);
@@ -403,11 +407,11 @@ public class MusicDataStore {
/**
* This method performs DDL operations on Cassandra using consistency level ONE.
- *
+ *
* @param queryObject Object containing cassandra prepared query and values.
*/
public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
- throws MusicServiceException, MusicQueryException {
+ throws MusicServiceException, MusicQueryException {
TimeMeasureInstance.instance().enter("executeOneConsistencyGet");
try {
return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
@@ -418,6 +422,22 @@ public class MusicDataStore {
}
/**
+ * This method performs DDL operations on Cassandra using consistency level ONE.
+ *
+ * @param queryObject Object containing cassandra prepared query and values.
+ */
+ public ResultSet executeSerialConsistencyGet(PreparedQueryObject queryObject)
+ throws MusicServiceException, MusicQueryException {
+ TimeMeasureInstance.instance().enter("executeOneConsistencyGet");
+ try {
+ return executeGet(queryObject, CONSISTENCY_LEVEL_SERIAL);
+ }
+ finally {
+ TimeMeasureInstance.instance().exit();
+ }
+ }
+
+ /**
*
* This method performs DDL operation on Cassandra using consistency level QUORUM.
*
diff --git a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
index 51a78264..58163a8f 100644
--- a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
+++ b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
@@ -201,7 +201,7 @@ public class CassaLockStore {
* @param keyspace of the application.
* @param table of the application.
* @param key is the primary key of the application table
- * @return the UUID lock reference.
+ * @return the lock reference.
* @throws MusicServiceException
* @throws MusicQueryException
*/
@@ -217,11 +217,13 @@ public class CassaLockStore {
queryObject.appendQueryString(selectQuery);
ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
Row row = results.one();
+ if (row == null)
+ return null;
String lockReference = "" + row.getLong("lockReference");
String createTime = row.getString("createTime");
String acquireTime = row.getString("acquireTime");
- return new LockObject(lockReference, createTime,acquireTime);
+ return new LockObject(lockReference, createTime, acquireTime);
}
finally {
TimeMeasureInstance.instance().exit();
diff --git a/src/main/java/org/onap/music/service/impl/MusicCassaCore.java b/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
index 79662822..253fe51f 100644
--- a/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
+++ b/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
@@ -141,9 +141,13 @@ public class MusicCassaCore implements MusicCoreService {
String primaryKeyValue = splitString[2];
LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
-
- /* Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the
- * reference*/
+
+ // No information about lock holder
+ if (currentLockHolderObject == null)
+ return;
+
+ // Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the
+ // reference
long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.acquireTime), Long.parseLong(currentLockHolderObject.createTime));
if((System.currentTimeMillis() - referenceTime) > leasePeriod) {
@@ -158,24 +162,33 @@ public class MusicCassaCore implements MusicCoreService {
TimeMeasureInstance.instance().enter("isTopOfLockStore");
try {
// return failure to lock holders too early or already evicted from the lock store
- String topOfLockStoreS =
- getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
+ LockObject lockObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
+
+ // TODO: differentiate between "not yet" and "no longer" in return value, so that the caller knows
+ // whether it should retry or not
+ if (lockObject == null) {
+ logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not in local lock queue yet");
+ return new ReturnType(ResultType.FAILURE, lockReference+" is not in local lock queue yet");
+ }
+
+ String topOfLockStoreS = lockObject.lockRef;
long topOfLockStoreL = Long.parseLong(topOfLockStoreS);
long lockReferenceL = Long.parseLong(lockReference);
+ // TODO: differentiate between "not yet" and "no longer" in return value, so that the caller knows
+ // whether it should retry or not
if (lockReferenceL > topOfLockStoreL) {
- // only need to check if this is a read lock....
- if (getLockingServiceHandle().isTopOfLockQueue(keyspace, table, primaryKeyValue,
- lockReference)) {
- return new ReturnType(ResultType.SUCCESS, lockReference + " can read the values");
- }
+ // only need to check if this is a read lock....
+ if (getLockingServiceHandle().isTopOfLockQueue(keyspace, table, primaryKeyValue,
+ lockReference)) {
+ return new ReturnType(ResultType.SUCCESS, lockReference + " can read the values");
+ }
logger.info(EELFLoggerDelegate.applicationLogger,
lockReference + " is not the lock holder yet");
return new ReturnType(ResultType.FAILURE,
lockReference + " is not the lock holder yet");
}
-
if (lockReferenceL < topOfLockStoreL) {
logger.info(EELFLoggerDelegate.applicationLogger,
lockReference + " is no longer/or was never in the lock store queue");
@@ -201,9 +214,9 @@ public class MusicCassaCore implements MusicCoreService {
ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference);
if (result.getResult().equals(ResultType.FAILURE))
- return result;//not top of the lock store q
+ return result; // not top of the lock store q
- //check to see if the value of the key has to be synced in case there was a forceful release
+ // check to see if the value of the key has to be synced in case there was a forceful release
String syncTable = keyspace + ".unsyncedKeys_" + table;
String query = "select * from " + syncTable + " where key='" + fullyQualifiedKey + "';";
PreparedQueryObject readQueryObject = new PreparedQueryObject();
@@ -353,13 +366,14 @@ public class MusicCassaCore implements MusicCoreService {
* @param fullyQualifiedKey lockName
* @return
*/
- public String whoseTurnIsIt(String fullyQualifiedKey) {
+ public String whoseTurnIsIt(String fullyQualifiedKey) {
String[] splitString = fullyQualifiedKey.split("\\.");
String keyspace = splitString[0];
String table = splitString[1];
String primaryKeyValue = splitString[2];
try {
- return getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
+ LockObject lockObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
+ return lockObject != null ? lockObject.lockRef : null;
} catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
}
@@ -476,7 +490,7 @@ public class MusicCassaCore implements MusicCoreService {
try {
ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference);
if (result.getResult().equals(ResultType.FAILURE))
- return result;//not top of the lock store q
+ return result; // not top of the lock store q
if (conditionInfo != null)
try {
@@ -573,7 +587,7 @@ public class MusicCassaCore implements MusicCoreService {
try {
ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference);
if(result.getResult().equals(ResultType.FAILURE))
- return null;//not top of the lock store q
+ return null; // not top of the lock store q
results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
} catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
@@ -601,16 +615,24 @@ public class MusicCassaCore implements MusicCoreService {
String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
String lockReference = createLockReference(fullyQualifiedKey);
long lockCreationTime = System.currentTimeMillis();
- ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockReference);
+ ReturnType lockAcqResult;
+
+ int tries = 0;
+ do {
+ lockAcqResult = acquireLock(fullyQualifiedKey, lockReference);
+ tries++;
+ } while (!lockAcqResult.getResult().equals(ResultType.SUCCESS));
+
long lockAcqTime = System.currentTimeMillis();
- if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
- logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockReference);
- voluntaryReleaseLock(fullyQualifiedKey, lockReference);
- return lockAcqResult;
- }
+// if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+// logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockReference);
+// voluntaryReleaseLock(fullyQualifiedKey, lockReference);
+// return lockAcqResult;
+// }
- logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockReference);
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "acquired lock with id " + lockReference + " after " + tries + "tries");
ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
queryObject, lockReference, conditionInfo);
long criticalPutTime = System.currentTimeMillis();