aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java')
-rw-r--r--src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java300
1 files changed, 229 insertions, 71 deletions
diff --git a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
index 7ff6a4d5..3c3f7160 100644
--- a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
+++ b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
@@ -45,19 +45,8 @@ public class CassaLockStore {
private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class);
private static String table_prepend_name = "lockQ_";
+ private MusicDataStore dsHandle;
- public class LockObject{
- public String lockRef;
- public String createTime;
- public String acquireTime;
- public LockObject(String lockRef, String createTime, String acquireTime) {
- this.lockRef = lockRef;
- this.acquireTime = acquireTime;
- this.createTime = createTime;
-
- }
- }
- MusicDataStore dsHandle;
public CassaLockStore() {
dsHandle = new MusicDataStore();
}
@@ -65,7 +54,50 @@ public class CassaLockStore {
public CassaLockStore(MusicDataStore dsHandle) {
this.dsHandle=dsHandle;
}
-
+ public class LockObject{
+ private boolean isLockOwner;
+ private String lockRef;
+ private String createTime;
+ private String acquireTime;
+ private LockType locktype;
+ public LockObject(boolean isLockOwner, String lockRef, String createTime, String acquireTime, LockType locktype) {
+ this.setIsLockOwner(isLockOwner);
+ this.setLockRef(lockRef);
+ this.setAcquireTime(acquireTime);
+ this.setCreateTime(createTime);
+ this.setLocktype(locktype);
+ }
+ public boolean getIsLockOwner() {
+ return isLockOwner;
+ }
+ public void setIsLockOwner(boolean isLockOwner) {
+ this.isLockOwner = isLockOwner;
+ }
+ public String getAcquireTime() {
+ return acquireTime;
+ }
+ public void setAcquireTime(String acquireTime) {
+ this.acquireTime = acquireTime;
+ }
+ public String getCreateTime() {
+ return createTime;
+ }
+ public void setCreateTime(String createTime) {
+ this.createTime = createTime;
+ }
+ public String getLockRef() {
+ return lockRef;
+ }
+ public void setLockRef(String lockRef) {
+ this.lockRef = lockRef;
+ }
+ public LockType getLocktype() {
+ return locktype;
+ }
+ public void setLocktype(LockType locktype) {
+ this.locktype = locktype;
+ }
+ }
/**
*
@@ -81,7 +113,8 @@ public class CassaLockStore {
"Create lock queue/table for " + keyspace+"."+table);
table = table_prepend_name+table;
String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
- + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, PRIMARY KEY ((key), lockReference) ) "
+ + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, "
+ + "writeLock boolean, PRIMARY KEY ((key), lockReference) ) "
+ "WITH CLUSTERING ORDER BY (lockReference ASC);";
PreparedQueryObject queryObject = new PreparedQueryObject();
@@ -100,16 +133,16 @@ public class CassaLockStore {
* @throws MusicServiceException
* @throws MusicQueryException
*/
- public String genLockRefandEnQueue(String keyspace, String table, String lockName) throws MusicServiceException, MusicQueryException, MusicLockingException {
- return genLockRefandEnQueue(keyspace, table, lockName, 0);
+ public String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype) throws MusicServiceException, MusicQueryException, MusicLockingException {
+ return genLockRefandEnQueue(keyspace, table, lockName, locktype, 0);
}
- private String genLockRefandEnQueue(String keyspace, String table, String lockName, int count) throws MusicServiceException, MusicQueryException, MusicLockingException {
+ private String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, int count) throws MusicServiceException, MusicQueryException, MusicLockingException {
logger.info(EELFLoggerDelegate.applicationLogger,
- "Create lock reference for " + keyspace + "." + table + "." + lockName);
+ "Create " + locktype + " lock reference for " + keyspace + "." + table + "." + lockName);
String lockTable ="";
lockTable = table_prepend_name+table;
-
+
PreparedQueryObject queryObject = new PreparedQueryObject();
@@ -137,7 +170,7 @@ public class CassaLockStore {
" UPDATE " + keyspace + "." + lockTable +
" SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
" INSERT INTO " + keyspace + "." + lockTable +
- "(key, lockReference, createTime, acquireTime) VALUES (?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
+ "(key, lockReference, createTime, acquireTime, writeLock) VALUES (?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
queryObject.addValue(lockRef);
queryObject.addValue(lockName);
@@ -148,23 +181,25 @@ public class CassaLockStore {
queryObject.addValue(lockRef);
queryObject.addValue(String.valueOf(lockEpochMillis));
queryObject.addValue("0");
+ queryObject.addValue(locktype==LockType.WRITE ? true : false );
queryObject.appendQueryString(insQuery);
boolean pResult = dsHandle.executePut(queryObject, "critical");
- if (!pResult) {//couldn't create lock ref, retry
+ if (!pResult) {// couldn't create lock ref, retry
count++;
- if (count>MusicUtil.getRetryCount()) {
+ if (count > MusicUtil.getRetryCount()) {
logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to create lock reference");
throw new MusicLockingException("Unable to create lock reference");
}
- return genLockRefandEnQueue(keyspace, table, lockName, count);
+ return genLockRefandEnQueue(keyspace, table, lockName, locktype, count);
}
return "$" + keyspace + "." + table + "." + lockName + "$" + String.valueOf(lockRef);
}
-
+
/**
* Returns a result set containing the list of clients waiting for a particular lock
+ *
* @param keyspace
* @param table
* @param key
@@ -175,22 +210,23 @@ public class CassaLockStore {
public List<String> getLockQueue(String keyspace, String table, String key)
throws MusicServiceException, MusicQueryException {
logger.info(EELFLoggerDelegate.applicationLogger,
- "Getting the queue for " + keyspace+"."+table+"."+key);
- table = table_prepend_name+table;
+ "Getting the queue for " + keyspace + "." + table + "." + key);
+ table = table_prepend_name + table;
String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "';";
PreparedQueryObject queryObject = new PreparedQueryObject();
queryObject.appendQueryString(selectQuery);
ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
ArrayList<String> lockQueue = new ArrayList<>();
- for (Row row: rs) {
+ for (Row row : rs) {
lockQueue.add(Long.toString(row.getLong("lockReference")));
}
return lockQueue;
}
-
-
+
+
/**
* Returns a result set containing the list of clients waiting for a particular lock
+ *
* @param keyspace
* @param table
* @param key
@@ -201,90 +237,212 @@ public class CassaLockStore {
public long getLockQueueSize(String keyspace, String table, String key)
throws MusicServiceException, MusicQueryException {
logger.info(EELFLoggerDelegate.applicationLogger,
- "Getting the queue size for " + keyspace+"."+table+"."+key);
- table = table_prepend_name+table;
+ "Getting the queue size for " + keyspace + "." + table + "." + key);
+ table = table_prepend_name + table;
String selectQuery = "select count(*) from " + keyspace + "." + table + " where key='" + key + "';";
PreparedQueryObject queryObject = new PreparedQueryObject();
queryObject.appendQueryString(selectQuery);
- ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
- return rs.one().getLong("count");
- }
+ ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
+ return rs.one().getLong("count");
+ }
/**
* This method returns the top of lock table/queue for the key.
+ *
* @param keyspace of the application.
* @param table of the application.
* @param key is the primary key of the application table
- * @return the UUID lock reference. Returns null if there is no owner or the lock doesn't exist
+ * @return the UUID lock reference. Returns LockObject.isLockOwner=false if there is no owner or the
+ * lock doesn't exist
* @throws MusicServiceException
* @throws MusicQueryException
*/
- public LockObject peekLockQueue(String keyspace, String table, String key) throws MusicServiceException, MusicQueryException{
+ public LockObject peekLockQueue(String keyspace, String table, String key)
+ throws MusicServiceException, MusicQueryException {
logger.info(EELFLoggerDelegate.applicationLogger,
- "Peek in lock table for " + keyspace+"."+table+"."+key);
- table = table_prepend_name+table;
- String selectQuery = "select * from "+keyspace+"."+table+" where key='"+key+"' LIMIT 1;";
+ "Peek in lock table for " + keyspace + "." + table + "." + key);
+ table = table_prepend_name + table;
+ String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "' LIMIT 1;";
PreparedQueryObject queryObject = new PreparedQueryObject();
queryObject.appendQueryString(selectQuery);
ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
Row row = results.one();
- if (row==null || row.isNull("lockReference")) {
+ if (row == null || row.isNull("lockReference")) {
+ return new LockObject(false, null, null, null, null);
+ }
+ String lockReference = "" + row.getLong("lockReference");
+ String createTime = row.getString("createTime");
+ String acquireTime = row.getString("acquireTime");
+ LockType locktype = row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
+
+ return new LockObject(true, lockReference, createTime, acquireTime, locktype);
+ }
+
+ public List<String> getCurrentLockHolders(String keyspace, String table, String key)
+ throws MusicServiceException, MusicQueryException {
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "Getting lockholders in lock table for " + keyspace + "." + table + "." + key);
+ table = table_prepend_name + table;
+ String selectQuery = "select * from " + keyspace + "." + table + " where key=?;";
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ queryObject.appendQueryString(selectQuery);
+ queryObject.addValue(key);
+ ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
+
+ List<String> lockHolders = new ArrayList<>();
+ boolean topOfQueue = true;
+ for (Row row : rs) {
+ String lockReference = "" + row.getLong("lockReference");
+ if (row.getBool("writeLock")) {
+ if (topOfQueue) {
+ lockHolders.add(lockReference);
+ break;
+ } else {
+ break;
+ }
+ }
+ // read lock
+ lockHolders.add(lockReference);
+
+ topOfQueue = false;
+ }
+ return lockHolders;
+ }
+
+ /**
+ * Determine if the lock is a valid current lock holder.
+ *
+ * @param keyspace
+ * @param table
+ * @param key
+ * @param lockRef
+ * @return true if lockRef is a lock owner of key
+ * @throws MusicServiceException
+ * @throws MusicQueryException
+ */
+ public boolean isLockOwner(String keyspace, String table, String key, String lockRef)
+ throws MusicServiceException, MusicQueryException {
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "Checking in lock table for " + keyspace + "." + table + "." + key);
+ table = table_prepend_name + table;
+ String selectQuery =
+ "select * from " + keyspace + "." + table + " where key=?;";
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ queryObject.appendQueryString(selectQuery);
+ queryObject.addValue(key);
+ ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
+
+ boolean topOfQueue = true;
+ for (Row row : rs) {
+ String lockReference = "" + row.getLong("lockReference");
+ if (row.getBool("writeLock")) {
+ if (topOfQueue && lockRef.equals(lockReference)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ if (lockRef.equals(lockReference)) {
+ return true;
+ }
+ topOfQueue = false;
+ }
+ logger.info(EELFLoggerDelegate.applicationLogger, "Could not find " + lockRef
+ + " in the lock queue. It has expired and no longer exists.");
+ return false;
+ }
+
+ /**
+ * Determine if the lock is a valid current lock holder.
+ *
+ * @param keyspace
+ * @param table
+ * @param key
+ * @param lockRef
+ * @return true if lockRef is a lock owner of key
+ * @throws MusicServiceException
+ * @throws MusicQueryException
+ */
+ public LockObject getLockInfo(String keyspace, String table, String key, String lockRef)
+ throws MusicServiceException, MusicQueryException {
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "Checking in lock table for " + keyspace + "." + table + "." + key);
+ String lockQ_table = table_prepend_name + table;
+ String selectQuery =
+ "select * from " + keyspace + "." + lockQ_table + " where key=? and lockReference=?;";
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ queryObject.appendQueryString(selectQuery);
+ queryObject.addValue(key);
+ queryObject.addValue(Long.parseLong(lockRef));
+ ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
+ Row row = rs.one();
+ if (row == null || row.isNull("lockReference")) {
return null;
}
+
String lockReference = "" + row.getLong("lockReference");
String createTime = row.getString("createTime");
String acquireTime = row.getString("acquireTime");
+ LockType locktype = row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
+ boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef);
- return new LockObject(lockReference, createTime,acquireTime);
+ return new LockObject(isLockOwner, lockReference, createTime, acquireTime, locktype);
}
-
-
+
+
+
/**
- * This method removes the lock ref from the lock table/queue for the key.
- * @param keyspace of the application.
- * @param table of the application.
+ * This method removes the lock ref from the lock table/queue for the key.
+ *
+ * @param keyspace of the application.
+ * @param table of the application.
* @param key is the primary key of the application table
* @param lockReference the lock reference that needs to be dequeued.
* @throws MusicServiceException
* @throws MusicQueryException
- * @throws MusicLockingException
- */
- public void deQueueLockRef(String keyspace, String table, String key, String lockReference, int n) throws MusicServiceException, MusicQueryException, MusicLockingException{
- String prependTable = table_prepend_name+table;
+ * @throws MusicLockingException
+ */
+ public void deQueueLockRef(String keyspace, String table, String key, String lockReference, int n)
+ throws MusicServiceException, MusicQueryException, MusicLockingException {
+ String prependTable = table_prepend_name + table;
PreparedQueryObject queryObject = new PreparedQueryObject();
- Long lockReferenceL = Long.parseLong(lockReference.substring(lockReference.lastIndexOf("$")+1));
- String deleteQuery = "delete from "+keyspace+"."+prependTable+" where key='"+key+"' AND lockReference ="+lockReferenceL+" IF EXISTS;";
+ Long lockReferenceL = Long.parseLong(lockReference.substring(lockReference.lastIndexOf("$") + 1));
+ String deleteQuery = "delete from " + keyspace + "." + prependTable + " where key='" + key
+ + "' AND lockReference =" + lockReferenceL + " IF EXISTS;";
queryObject.appendQueryString(deleteQuery);
logger.info(EELFLoggerDelegate.applicationLogger, "Removing lock for key: "+key+ " and reference: "+lockReference);
try {
- dsHandle.executePut(queryObject, "critical");
- logger.info(EELFLoggerDelegate.applicationLogger, "Lock removed for key: "+key+ " and reference: "+lockReference);
- }catch(MusicServiceException ex) {
- logger.error(logger, ex.getMessage(),ex);
- logger.error(EELFLoggerDelegate.applicationLogger,"Exception while deQueueLockRef for lockname: " + key + " reference:" +lockReference);
- if(n>1) {
+ dsHandle.executePut(queryObject, "critical");
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "Lock removed for key: " + key + " and reference: " + lockReference);
+ } catch (MusicServiceException ex) {
+ logger.error(logger, ex.getMessage(), ex);
+ logger.error(EELFLoggerDelegate.applicationLogger,
+ "Exception while deQueueLockRef for lockname: " + key + " reference:" + lockReference);
+ if (n > 1) {
logger.info(EELFLoggerDelegate.applicationLogger, "Trying again...");
- deQueueLockRef(keyspace, table, key, lockReference, n-1);
- }
- else {
- logger.error(EELFLoggerDelegate.applicationLogger,"deQueueLockRef failed for lockname: " + key + " reference:" +lockReference);
- logger.error(logger, ex.getMessage(),ex);
- throw new MusicLockingException("Error while deQueueLockRef: "+ex.getMessage());
+ deQueueLockRef(keyspace, table, key, lockReference, n - 1);
+ } else {
+ logger.error(EELFLoggerDelegate.applicationLogger,
+ "deQueueLockRef failed for lockname: " + key + " reference:" + lockReference);
+ logger.error(logger, ex.getMessage(), ex);
+ throw new MusicLockingException("Error while deQueueLockRef: " + ex.getMessage());
}
}
}
-
- public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) throws MusicServiceException, MusicQueryException{
- table = table_prepend_name+table;
+
+ public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference)
+ throws MusicServiceException, MusicQueryException {
+ table = table_prepend_name + table;
PreparedQueryObject queryObject = new PreparedQueryObject();
Long lockReferenceL = Long.parseLong(lockReference);
- String updateQuery = "update "+keyspace+"."+table+" set acquireTime='"+ System.currentTimeMillis()+"' where key='"+key+"' AND lockReference = "+lockReferenceL+" IF EXISTS;";
+ String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis()
+ + "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;";
queryObject.appendQueryString(updateQuery);
- dsHandle.executePut(queryObject, "eventual");
+ dsHandle.executePut(queryObject, "eventual");
- }
-
+ }
}