diff options
author | Tschaen, Brendan <ctschaen@att.com> | 2019-09-25 14:54:46 -0400 |
---|---|---|
committer | Tschaen, Brendan <ctschaen@att.com> | 2019-09-26 13:52:01 -0400 |
commit | 90d35b7f55d1ea3eb6ccf8218d9ac42412fd0d90 (patch) | |
tree | 972d5110063f0f56405ba1c29f0c8534f17f01e8 /src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java | |
parent | c8adfc5ea25d6ffd45edd5213195ce0c4568b57f (diff) |
Read lock promotion
Change-Id: Ib2515c728503fb729e6ecc2e09973bbfa9e2e317
Issue-ID: MUSIC-508
Signed-off-by: Tschaen, Brendan <ctschaen@att.com>
Diffstat (limited to 'src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java')
-rw-r--r-- | src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java | 118 |
1 files changed, 106 insertions, 12 deletions
diff --git a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java index 0ec85077..10898476 100644 --- a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java +++ b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java @@ -36,9 +36,12 @@ import org.onap.music.exceptions.MusicServiceException; import org.onap.music.main.DeadlockDetectionUtil; import org.onap.music.main.DeadlockDetectionUtil.OwnershipType; import org.onap.music.main.MusicUtil; - +import org.onap.music.main.ResultType; +import org.onap.music.main.ReturnType; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.extras.codecs.enums.EnumNameCodec; /* * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state. @@ -126,7 +129,7 @@ public class CassaLockStore { table = table_prepend_name+table; String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, " - + "writeLock boolean, owner text, PRIMARY KEY ((key), lockReference) ) " + + "lockType text, owner text, PRIMARY KEY ((key), lockReference) ) " + "WITH CLUSTERING ORDER BY (lockReference ASC);"; PreparedQueryObject queryObject = new PreparedQueryObject(); @@ -176,13 +179,14 @@ public class CassaLockStore { logger.info(EELFLoggerDelegate.applicationLogger, "Created lock reference for " + keyspace + "." + lockTable + "." + lockName + ":" + lockRef); - + queryObject = new PreparedQueryObject(); + String insQuery = "BEGIN BATCH" + " UPDATE " + keyspace + "." + lockTable + " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" + " INSERT INTO " + keyspace + "." + lockTable + - "(key, lockReference, createTime, acquireTime, writeLock, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;"; + "(key, lockReference, createTime, acquireTime, lockType, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;"; queryObject.addValue(lockRef); queryObject.addValue(lockName); @@ -193,7 +197,7 @@ public class CassaLockStore { queryObject.addValue(lockRef); queryObject.addValue(String.valueOf(lockEpochMillis)); queryObject.addValue("0"); - queryObject.addValue(locktype==LockType.WRITE ? true : false ); + queryObject.addValue(locktype); queryObject.addValue(owner); queryObject.appendQueryString(insQuery); boolean pResult = dsHandle.executePut(queryObject, "critical"); @@ -285,7 +289,7 @@ public class CassaLockStore { String lockReference = "" + row.getLong("lockReference"); String createTime = row.getString("createTime"); String acquireTime = row.getString("acquireTime"); - LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ; + LockType locktype = row.get("lockType", LockType.class); String owner = row.getString("owner"); return new LockObject(true, lockReference, createTime, acquireTime, locktype, owner); @@ -313,7 +317,7 @@ public class CassaLockStore { return lockHolders; } lockReference.append(lock).append(row.getLong("lockReference")); - if (row.isNull("writeLock") || row.getBool("writeLock")) { + if (row.get("lockType", LockType.class)!=LockType.WRITE) { if (topOfQueue) { lockHolders.add(lockReference.toString()); break; @@ -356,7 +360,7 @@ public class CassaLockStore { boolean topOfQueue = true; for (Row row : rs) { String lockReference = "" + row.getLong("lockReference"); - if (row.isNull("writeLock") || row.getBool("writeLock")) { + if (row.get("lockType", LockType.class)==LockType.WRITE) { if (topOfQueue && lockRef.equals(lockReference)) { return true; } else { @@ -404,7 +408,7 @@ public class CassaLockStore { String lockReference = "" + row.getLong("lockReference"); String createTime = row.getString("createTime"); String acquireTime = row.getString("acquireTime"); - LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ; + LockType locktype = row.get("lockType", LockType.class); boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef); String owner = row.getString("owner"); @@ -456,11 +460,9 @@ public class CassaLockStore { public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) { table = table_prepend_name + table; - PreparedQueryObject queryObject = new PreparedQueryObject(); Long lockReferenceL = Long.parseLong(lockReference); String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis() + "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;"; - queryObject.appendQueryString(updateQuery); //cannot use executePut because we need to ignore music timestamp adjustments for lock store dsHandle.getSession().execute(updateQuery); @@ -473,7 +475,8 @@ public class CassaLockStore { String lockTable = table_prepend_name + table; PreparedQueryObject queryObject = new PreparedQueryObject(); queryObject.appendQueryString("SELECT key, acquiretime, owner FROM " + keyspace + "." + lockTable); - queryObject.appendQueryString(" WHERE writelock = True ALLOW FILTERING"); + queryObject.appendQueryString(" WHERE lockType = ? ALLOW FILTERING"); + queryObject.addValue(LockType.WRITE); DeadlockDetectionUtil ddu = new DeadlockDetectionUtil(); @@ -506,5 +509,96 @@ public class CassaLockStore { return toRet; } + public ReturnType promoteLock(String keyspace, String table, String key, String lockRef) + throws MusicLockingException, MusicServiceException, MusicQueryException { + String lockqtable = table_prepend_name + table; + String selectQuery = "select * from " + keyspace + "." + lockqtable + " where key=?;"; + + PreparedQueryObject queryObject = new PreparedQueryObject(); + queryObject.appendQueryString(selectQuery); + queryObject.addValue(key); + ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject); + + long refToPromote = Long.parseLong(lockRef); + + boolean topOfQueue = true; + boolean readBlock = false; + boolean seenLockToPromote = false; + boolean promotionOngoing = false; + long readBlockStart = 0; + long readBlockEnd = 0; + + + for (Row row : rs) { + long ref = row.getLong("lockreference"); + LockType lockType = row.get("lockType", LockType.class); + + if (refToPromote==ref) { + if (promotionOngoing) { + return new ReturnType(ResultType.FAILURE, "Can't promote, already promoting another lockref."); + } + seenLockToPromote = true; + if (!topOfQueue) { + readBlockStart = ref; + readBlockEnd = ref; + break; + } + } else if (!seenLockToPromote && refToPromote<ref) { + return new ReturnType(ResultType.FAILURE, "Lockref does not exist."); + } + + if (lockType==LockType.READ || lockType==LockType.PROMOTING) { + if (!readBlock) { + readBlockStart = ref; + readBlock = true; + } + if (readBlock) { + readBlockEnd = ref; + } + if (lockType==LockType.PROMOTING) { + promotionOngoing = true; + } + } + + if (lockType==LockType.WRITE) { + if (refToPromote==ref) { + return new ReturnType(ResultType.FAILURE, "Lockref is already write."); + } + if (readBlock) { + readBlock = false; + promotionOngoing = false; + if (seenLockToPromote) { + break; + } + //can no longer be lock holder after this + topOfQueue = false; + } + } + } + + if (readBlockStart<=refToPromote && refToPromote<=readBlockEnd) { + if (readBlockStart==refToPromote && refToPromote==readBlockEnd) { + promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.WRITE); + return new ReturnType(ResultType.SUCCESS, "Lock has successfully been upgraded."); + } + promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.PROMOTING); + return new ReturnType(ResultType.FAILURE, "Your lock upgrade is in progress. Check again to see if successful."); + } + + //shouldn't reach here? + return new ReturnType(ResultType.FAILURE,"Promotion failed."); + } + + private void promoteLockTo(String keyspace, String table, String key, String lockRef, LockType newLockType) + throws MusicServiceException, MusicQueryException { + PreparedQueryObject queryObject = + new PreparedQueryObject("UPDATE " + keyspace + "." + table + " SET lockType=? WHERE key='" + key + + "' AND lockReference = " + lockRef + " IF EXISTS;", newLockType); + + //cannot use executePut because we need to ignore music timestamp adjustments for lock store + dsHandle.executePut(queryObject, MusicUtil.QUORUM); + } + + } |