From 85662cac4cfd2276ced5777c2547ad6df1d67eac Mon Sep 17 00:00:00 2001 From: "Tschaen, Brendan" Date: Wed, 12 Dec 2018 13:47:54 -0500 Subject: Read/Write locking implementation Change-Id: I31fedd52e138c848bf12ed0be27c348f4f96bcb5 Issue-ID: MUSIC-262 Signed-off-by: Tschaen, Brendan --- .../lockingservice/cassandra/CassaLockStore.java | 56 ++++++++++++++++++++-- src/main/java/org/onap/music/main/MusicCore.java | 4 ++ .../org/onap/music/service/MusicCoreService.java | 3 ++ .../onap/music/service/impl/MusicCassaCore.java | 47 +++++++++++------- .../onap/music/unittests/MusicLockStoreTest.java | 41 ++++++++++++++++ 5 files changed, 131 insertions(+), 20 deletions(-) diff --git a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java index 9b5793fa..8065bf61 100644 --- a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java +++ b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java @@ -56,7 +56,7 @@ public class CassaLockStore { "Create lock queue/table for " + keyspace+"."+table); table = table_prepend_name+table; String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table - + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, PRIMARY KEY ((key), lockReference) ) " + + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, writeLock boolean, PRIMARY KEY ((key), lockReference) ) " + "WITH CLUSTERING ORDER BY (lockReference ASC);"; PreparedQueryObject queryObject = new PreparedQueryObject(); @@ -75,7 +75,24 @@ public class CassaLockStore { * @throws MusicServiceException * @throws MusicQueryException */ - public String genLockRefandEnQueue(String keyspace, String table, String lockName) throws MusicServiceException, MusicQueryException { + public String genLockRefandEnQueue(String keyspace, String table, String lockName) + throws MusicServiceException, MusicQueryException { + return genLockRefandEnQueue(keyspace, table, lockName, true); + } + + + /** + * This method creates a lock reference for each invocation. The lock references are monotonically increasing timestamps. + * @param keyspace of the locks. + * @param table of the locks. + * @param lockName is the primary key of the lock table + * @param isWriteLock true if this lock needs to be a write lock + * @return the UUID lock reference. + * @throws MusicServiceException + * @throws MusicQueryException + */ + public String genLockRefandEnQueue(String keyspace, String table, String lockName, boolean isWriteLock) + throws MusicServiceException, MusicQueryException { logger.info(EELFLoggerDelegate.applicationLogger, "Create lock reference for " + keyspace + "." + table + "." + lockName); table = table_prepend_name+table; @@ -107,7 +124,7 @@ public class CassaLockStore { " UPDATE " + keyspace + "." + table + " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" + " INSERT INTO " + keyspace + "." + table + - "(key, lockReference, createTime, acquireTime) VALUES (?,?,?,?) IF NOT EXISTS; APPLY BATCH;"; + "(key, lockReference, createTime, acquireTime, writeLock) VALUES (?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;"; queryObject.addValue(lockRef); queryObject.addValue(lockName); @@ -118,6 +135,7 @@ public class CassaLockStore { queryObject.addValue(lockRef); queryObject.addValue(String.valueOf(lockEpochMillis)); queryObject.addValue("0"); + queryObject.addValue(isWriteLock); queryObject.appendQueryString(insQuery); boolean pResult = dsHandle.executePut(queryObject, "critical"); return String.valueOf(lockRef); @@ -180,7 +198,8 @@ public class CassaLockStore { * @throws MusicServiceException * @throws MusicQueryException */ - public LockObject peekLockQueue(String keyspace, String table, String key) throws MusicServiceException, MusicQueryException{ + public LockObject peekLockQueue(String keyspace, String table, String key) + throws MusicServiceException, MusicQueryException { logger.info(EELFLoggerDelegate.applicationLogger, "Peek in lock table for " + keyspace+"."+table+"."+key); table = table_prepend_name+table; @@ -196,6 +215,35 @@ public class CassaLockStore { return new LockObject(lockReference, createTime,acquireTime); } + public boolean isTopOfLockQueue(String keyspace, String table, String key, String lockRef) + throws MusicServiceException, MusicQueryException { + logger.info(EELFLoggerDelegate.applicationLogger, + "Checking in lock table for " + keyspace + "." + table + "." + key); + table = table_prepend_name + table; + String selectQuery = + "select * from " + keyspace + "." + table + " where key='" + key + "';"; + PreparedQueryObject queryObject = new PreparedQueryObject(); + queryObject.appendQueryString(selectQuery); + ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject); + + boolean topOfQueue = true; + for (Row row : rs) { + if (row.getBool("writeLock")) { + if (topOfQueue && lockRef.equals("" + row.getLong("lockReference"))) { + return true; + } else { + return false; + } + } + if (lockRef.equals("" + row.getLong("lockReference"))) { + return true; + } + topOfQueue = false; + } + logger.info(EELFLoggerDelegate.applicationLogger, "Could not find " + lockRef + + " in the lock queue. It has expired and no longer exists."); + return false; + } /** * This method removes the lock ref from the lock table/queue for the key. diff --git a/src/main/java/org/onap/music/main/MusicCore.java b/src/main/java/org/onap/music/main/MusicCore.java index e0a8ce6e..5a77df70 100644 --- a/src/main/java/org/onap/music/main/MusicCore.java +++ b/src/main/java/org/onap/music/main/MusicCore.java @@ -79,6 +79,10 @@ private static MusicCoreService musicCore = MusicCassaCore.getInstance(); return musicCore.createLockReference(fullyQualifiedKey); } + public static String createLockReference(String fullyQualifiedKey, boolean isWriteLock) { + return musicCore.createLockReference(fullyQualifiedKey, isWriteLock); + } + public static MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException, MusicServiceException, MusicQueryException{ return musicCore.forciblyReleaseLock(fullyQualifiedKey, lockReference); } diff --git a/src/main/java/org/onap/music/service/MusicCoreService.java b/src/main/java/org/onap/music/service/MusicCoreService.java index 3efda274..7074c6d9 100644 --- a/src/main/java/org/onap/music/service/MusicCoreService.java +++ b/src/main/java/org/onap/music/service/MusicCoreService.java @@ -72,6 +72,8 @@ public interface MusicCoreService { // Core Music Locking Service Methods public String createLockReference(String fullyQualifiedKey); // lock name + + public String createLockReference(String fullyQualifiedKey, boolean writeLock); public ReturnType acquireLockWithLease(String key, String lockReference, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException; // key,lock id,time @@ -100,4 +102,5 @@ public interface MusicCoreService { public long getLockQueueSize(String fullyQualifiedKey) throws MusicServiceException, MusicQueryException, MusicLockingException; + } diff --git a/src/main/java/org/onap/music/service/impl/MusicCassaCore.java b/src/main/java/org/onap/music/service/impl/MusicCassaCore.java index 9bbb3a9a..fcf02807 100644 --- a/src/main/java/org/onap/music/service/impl/MusicCassaCore.java +++ b/src/main/java/org/onap/music/service/impl/MusicCassaCore.java @@ -97,8 +97,11 @@ public class MusicCassaCore implements MusicCoreService { } - public String createLockReference(String fullyQualifiedKey) { + return createLockReference(fullyQualifiedKey, true); + } + + public String createLockReference(String fullyQualifiedKey, boolean isWriteLock) { String[] splitString = fullyQualifiedKey.split("\\."); String keyspace = splitString[0]; String table = splitString[1]; @@ -108,7 +111,7 @@ public class MusicCassaCore implements MusicCoreService { long start = System.currentTimeMillis(); String lockReference = null; try { - lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName); + lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, isWriteLock); } catch (MusicLockingException | MusicServiceException | MusicQueryException e) { e.printStackTrace(); } @@ -142,25 +145,37 @@ public class MusicCassaCore implements MusicCoreService { } } - private static ReturnType isTopOfLockStore(String keyspace, String table, String primaryKeyValue, String lockReference) throws MusicLockingException, MusicQueryException, MusicServiceException { - - //return failure to lock holders too early or already evicted from the lock store - String topOfLockStoreS = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef; + private static ReturnType isTopOfLockStore(String keyspace, String table, + String primaryKeyValue, String lockReference) + throws MusicLockingException, MusicQueryException, MusicServiceException { + + // return failure to lock holders too early or already evicted from the lock store + String topOfLockStoreS = + getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef; long topOfLockStoreL = Long.parseLong(topOfLockStoreS); long lockReferenceL = Long.parseLong(lockReference); - if(lockReferenceL > topOfLockStoreL) { - logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not the lock holder yet"); - return new ReturnType(ResultType.FAILURE, lockReference+" is not the lock holder yet"); + if (lockReferenceL > topOfLockStoreL) { + // only need to check if this is a read lock.... + if (getLockingServiceHandle().isTopOfLockQueue(keyspace, table, primaryKeyValue, + lockReference)) { + return new ReturnType(ResultType.SUCCESS, lockReference + " can read the values"); + } + logger.info(EELFLoggerDelegate.applicationLogger, + lockReference + " is not the lock holder yet"); + return new ReturnType(ResultType.FAILURE, + lockReference + " is not the lock holder yet"); + } + + + if (lockReferenceL < topOfLockStoreL) { + logger.info(EELFLoggerDelegate.applicationLogger, + lockReference + " is no longer/or was never in the lock store queue"); + return new ReturnType(ResultType.FAILURE, + lockReference + " is no longer/or was never in the lock store queue"); } - - if(lockReferenceL < topOfLockStoreL) { - logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is no longer/or was never in the lock store queue"); - return new ReturnType(ResultType.FAILURE, lockReference+" is no longer/or was never in the lock store queue"); - } - - return new ReturnType(ResultType.SUCCESS, lockReference+" is top of lock store"); + return new ReturnType(ResultType.SUCCESS, lockReference + " is top of lock store"); } public ReturnType acquireLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException, MusicQueryException, MusicServiceException { diff --git a/src/test/java/org/onap/music/unittests/MusicLockStoreTest.java b/src/test/java/org/onap/music/unittests/MusicLockStoreTest.java index e57b32ea..f8186278 100644 --- a/src/test/java/org/onap/music/unittests/MusicLockStoreTest.java +++ b/src/test/java/org/onap/music/unittests/MusicLockStoreTest.java @@ -22,7 +22,9 @@ package org.onap.music.unittests; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import java.util.List; @@ -125,4 +127,43 @@ public class MusicLockStoreTest { } assertEquals(21, lockStore.getLockQueueSize(CassandraCQL.keyspace, CassandraCQL.table, "test")); } + + @Test + public void Test_testCreateReadLock() throws MusicServiceException, MusicQueryException { + lockStore.createLockQueue(CassandraCQL.keyspace, CassandraCQL.table); + String readLockRef1 = lockStore.genLockRefandEnQueue(CassandraCQL.keyspace, + CassandraCQL.table, "test", false); + assertEquals(readLockRef1, + lockStore.peekLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test").lockRef); + assertTrue(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test", + readLockRef1)); + + String readLockRef2 = lockStore.genLockRefandEnQueue(CassandraCQL.keyspace, + CassandraCQL.table, "test", false); + assertTrue(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test", + readLockRef2)); + + String writelockRef3 = + lockStore.genLockRefandEnQueue(CassandraCQL.keyspace, CassandraCQL.table, "test"); + String writelockRef4 = + lockStore.genLockRefandEnQueue(CassandraCQL.keyspace, CassandraCQL.table, "test"); + assertFalse(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test", + writelockRef3)); + + lockStore.deQueueLockRef(CassandraCQL.keyspace, CassandraCQL.table, "test", readLockRef1); + assertTrue(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test", + readLockRef2)); + + lockStore.deQueueLockRef(CassandraCQL.keyspace, CassandraCQL.table, "test", readLockRef2); + assertTrue(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test", + writelockRef3)); + assertFalse(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test", + writelockRef4)); + + lockStore.deQueueLockRef(CassandraCQL.keyspace, CassandraCQL.table, "test", writelockRef3); + assertTrue(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test", + writelockRef4)); + assertFalse(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test", + readLockRef1)); + } } -- cgit 1.2.3-korg