aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java')
-rw-r--r--src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java118
1 files changed, 106 insertions, 12 deletions
diff --git a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
index 0ec85077..10898476 100644
--- a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
+++ b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
@@ -36,9 +36,12 @@ import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.main.DeadlockDetectionUtil;
import org.onap.music.main.DeadlockDetectionUtil.OwnershipType;
import org.onap.music.main.MusicUtil;
-
+import org.onap.music.main.ResultType;
+import org.onap.music.main.ReturnType;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
/*
* This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state.
@@ -126,7 +129,7 @@ public class CassaLockStore {
table = table_prepend_name+table;
String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
+ " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, "
- + "writeLock boolean, owner text, PRIMARY KEY ((key), lockReference) ) "
+ + "lockType text, owner text, PRIMARY KEY ((key), lockReference) ) "
+ "WITH CLUSTERING ORDER BY (lockReference ASC);";
PreparedQueryObject queryObject = new PreparedQueryObject();
@@ -176,13 +179,14 @@ public class CassaLockStore {
logger.info(EELFLoggerDelegate.applicationLogger,
"Created lock reference for " + keyspace + "." + lockTable + "." + lockName + ":" + lockRef);
-
+
queryObject = new PreparedQueryObject();
+
String insQuery = "BEGIN BATCH" +
" UPDATE " + keyspace + "." + lockTable +
" SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
" INSERT INTO " + keyspace + "." + lockTable +
- "(key, lockReference, createTime, acquireTime, writeLock, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
+ "(key, lockReference, createTime, acquireTime, lockType, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
queryObject.addValue(lockRef);
queryObject.addValue(lockName);
@@ -193,7 +197,7 @@ public class CassaLockStore {
queryObject.addValue(lockRef);
queryObject.addValue(String.valueOf(lockEpochMillis));
queryObject.addValue("0");
- queryObject.addValue(locktype==LockType.WRITE ? true : false );
+ queryObject.addValue(locktype);
queryObject.addValue(owner);
queryObject.appendQueryString(insQuery);
boolean pResult = dsHandle.executePut(queryObject, "critical");
@@ -285,7 +289,7 @@ public class CassaLockStore {
String lockReference = "" + row.getLong("lockReference");
String createTime = row.getString("createTime");
String acquireTime = row.getString("acquireTime");
- LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
+ LockType locktype = row.get("lockType", LockType.class);
String owner = row.getString("owner");
return new LockObject(true, lockReference, createTime, acquireTime, locktype, owner);
@@ -313,7 +317,7 @@ public class CassaLockStore {
return lockHolders;
}
lockReference.append(lock).append(row.getLong("lockReference"));
- if (row.isNull("writeLock") || row.getBool("writeLock")) {
+ if (row.get("lockType", LockType.class)!=LockType.WRITE) {
if (topOfQueue) {
lockHolders.add(lockReference.toString());
break;
@@ -356,7 +360,7 @@ public class CassaLockStore {
boolean topOfQueue = true;
for (Row row : rs) {
String lockReference = "" + row.getLong("lockReference");
- if (row.isNull("writeLock") || row.getBool("writeLock")) {
+ if (row.get("lockType", LockType.class)==LockType.WRITE) {
if (topOfQueue && lockRef.equals(lockReference)) {
return true;
} else {
@@ -404,7 +408,7 @@ public class CassaLockStore {
String lockReference = "" + row.getLong("lockReference");
String createTime = row.getString("createTime");
String acquireTime = row.getString("acquireTime");
- LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
+ LockType locktype = row.get("lockType", LockType.class);
boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef);
String owner = row.getString("owner");
@@ -456,11 +460,9 @@ public class CassaLockStore {
public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) {
table = table_prepend_name + table;
- PreparedQueryObject queryObject = new PreparedQueryObject();
Long lockReferenceL = Long.parseLong(lockReference);
String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis()
+ "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;";
- queryObject.appendQueryString(updateQuery);
//cannot use executePut because we need to ignore music timestamp adjustments for lock store
dsHandle.getSession().execute(updateQuery);
@@ -473,7 +475,8 @@ public class CassaLockStore {
String lockTable = table_prepend_name + table;
PreparedQueryObject queryObject = new PreparedQueryObject();
queryObject.appendQueryString("SELECT key, acquiretime, owner FROM " + keyspace + "." + lockTable);
- queryObject.appendQueryString(" WHERE writelock = True ALLOW FILTERING");
+ queryObject.appendQueryString(" WHERE lockType = ? ALLOW FILTERING");
+ queryObject.addValue(LockType.WRITE);
DeadlockDetectionUtil ddu = new DeadlockDetectionUtil();
@@ -506,5 +509,96 @@ public class CassaLockStore {
return toRet;
}
+ public ReturnType promoteLock(String keyspace, String table, String key, String lockRef)
+ throws MusicLockingException, MusicServiceException, MusicQueryException {
+ String lockqtable = table_prepend_name + table;
+ String selectQuery = "select * from " + keyspace + "." + lockqtable + " where key=?;";
+
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ queryObject.appendQueryString(selectQuery);
+ queryObject.addValue(key);
+ ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
+
+ long refToPromote = Long.parseLong(lockRef);
+
+ boolean topOfQueue = true;
+ boolean readBlock = false;
+ boolean seenLockToPromote = false;
+ boolean promotionOngoing = false;
+ long readBlockStart = 0;
+ long readBlockEnd = 0;
+
+
+ for (Row row : rs) {
+ long ref = row.getLong("lockreference");
+ LockType lockType = row.get("lockType", LockType.class);
+
+ if (refToPromote==ref) {
+ if (promotionOngoing) {
+ return new ReturnType(ResultType.FAILURE, "Can't promote, already promoting another lockref.");
+ }
+ seenLockToPromote = true;
+ if (!topOfQueue) {
+ readBlockStart = ref;
+ readBlockEnd = ref;
+ break;
+ }
+ } else if (!seenLockToPromote && refToPromote<ref) {
+ return new ReturnType(ResultType.FAILURE, "Lockref does not exist.");
+ }
+
+ if (lockType==LockType.READ || lockType==LockType.PROMOTING) {
+ if (!readBlock) {
+ readBlockStart = ref;
+ readBlock = true;
+ }
+ if (readBlock) {
+ readBlockEnd = ref;
+ }
+ if (lockType==LockType.PROMOTING) {
+ promotionOngoing = true;
+ }
+ }
+
+ if (lockType==LockType.WRITE) {
+ if (refToPromote==ref) {
+ return new ReturnType(ResultType.FAILURE, "Lockref is already write.");
+ }
+ if (readBlock) {
+ readBlock = false;
+ promotionOngoing = false;
+ if (seenLockToPromote) {
+ break;
+ }
+ //can no longer be lock holder after this
+ topOfQueue = false;
+ }
+ }
+ }
+
+ if (readBlockStart<=refToPromote && refToPromote<=readBlockEnd) {
+ if (readBlockStart==refToPromote && refToPromote==readBlockEnd) {
+ promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.WRITE);
+ return new ReturnType(ResultType.SUCCESS, "Lock has successfully been upgraded.");
+ }
+ promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.PROMOTING);
+ return new ReturnType(ResultType.FAILURE, "Your lock upgrade is in progress. Check again to see if successful.");
+ }
+
+ //shouldn't reach here?
+ return new ReturnType(ResultType.FAILURE,"Promotion failed.");
+ }
+
+ private void promoteLockTo(String keyspace, String table, String key, String lockRef, LockType newLockType)
+ throws MusicServiceException, MusicQueryException {
+ PreparedQueryObject queryObject =
+ new PreparedQueryObject("UPDATE " + keyspace + "." + table + " SET lockType=? WHERE key='" + key
+ + "' AND lockReference = " + lockRef + " IF EXISTS;", newLockType);
+
+ //cannot use executePut because we need to ignore music timestamp adjustments for lock store
+ dsHandle.executePut(queryObject, MusicUtil.QUORUM);
+ }
+
+
}