aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
diff options
context:
space:
mode:
authorsrupane kondreddy <sk5300@research.att.com>2018-11-28 18:20:21 -0500
committersrupane kondreddy <sk5300@research.att.com>2018-11-28 18:56:05 -0500
commit2cf0f8dfe0ac7e147614fcf6d0265c0ab64e9326 (patch)
treeb5f9bb633542285f68675d0a3ef54824ae870336 /src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
parent6e7f430bcf94d7f7bd4cd81b886d610b358c024e (diff)
music as a service impl v3
Change-Id: I7f96eaa48afed96d62ad7662139167e8a17d00d7 Issue-ID: MUSIC-204 Signed-off-by: srupane kondreddy <sk5300@research.att.com>
Diffstat (limited to 'src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java')
-rw-r--r--src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java232
1 files changed, 232 insertions, 0 deletions
diff --git a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
new file mode 100644
index 00000000..68423699
--- /dev/null
+++ b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
@@ -0,0 +1,232 @@
+package org.onap.music.lockingservice.cassandra;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.onap.music.datastore.MusicDataStore;
+import org.onap.music.datastore.PreparedQueryObject;
+import org.onap.music.eelf.logging.EELFLoggerDelegate;
+import org.onap.music.exceptions.MusicQueryException;
+import org.onap.music.exceptions.MusicServiceException;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+
+/*
+ * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state.
+ */
+
+public class CassaLockStore {
+
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class);
+ private static String table_prepend_name = "lockQ_";
+
+ public class LockObject{
+ public String lockRef;
+ public String createTime;
+ public String acquireTime;
+ public LockObject(String lockRef, String createTime, String acquireTime) {
+ this.lockRef = lockRef;
+ this.acquireTime = acquireTime;
+ this.createTime = createTime;
+
+ }
+ }
+ MusicDataStore dsHandle;
+ public CassaLockStore() {
+ dsHandle = new MusicDataStore();
+ }
+
+ public CassaLockStore(MusicDataStore dsHandle) {
+ this.dsHandle=dsHandle;
+ }
+
+
+ /**
+ *
+ * This method creates a shadow locking table for every main table in Cassandra. This table tracks all information regarding locks.
+ * @param keyspace of the application.
+ * @param table of the application.
+ * @return true if the operation was successful.
+ * @throws MusicServiceException
+ * @throws MusicQueryException
+ */
+ public boolean createLockQueue(String keyspace, String table) throws MusicServiceException, MusicQueryException {
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "Create lock queue/table for " + keyspace+"."+table);
+ table = table_prepend_name+table;
+ String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
+ + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, PRIMARY KEY ((key), lockReference) ) "
+ + "WITH CLUSTERING ORDER BY (lockReference ASC);";
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+
+ queryObject.appendQueryString(tabQuery);
+ boolean result;
+ result = dsHandle.executePut(queryObject, "eventual");
+ return result;
+ }
+
+ /**
+ * This method creates a lock reference for each invocation. The lock references are monotonically increasing timestamps.
+ * @param keyspace of the locks.
+ * @param table of the locks.
+ * @param lockName is the primary key of the lock table
+ * @return the UUID lock reference.
+ * @throws MusicServiceException
+ * @throws MusicQueryException
+ */
+ public String genLockRefandEnQueue(String keyspace, String table, String lockName) throws MusicServiceException, MusicQueryException {
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "Create lock reference for " + keyspace + "." + table + "." + lockName);
+ table = table_prepend_name+table;
+
+
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ String selectQuery = "SELECT guard FROM " + keyspace + "." + table + " WHERE key=?;";
+
+ queryObject.addValue(lockName);
+ queryObject.appendQueryString(selectQuery);
+ ResultSet gqResult = dsHandle.executeOneConsistencyGet(queryObject);
+ List<Row> latestGuardRow = gqResult.all();
+
+ long prevGuard = 0;
+ long lockRef = 1;
+ if (latestGuardRow.size() > 0) {
+ prevGuard = latestGuardRow.get(0).getLong(0);
+ lockRef = prevGuard + 1;
+ }
+
+ long lockEpochMillis = System.currentTimeMillis();
+
+// System.out.println("guard(" + lockName + "): " + prevGuard + "->" + lockRef);
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "Created lock reference for " + keyspace + "." + table + "." + lockName + ":" + lockRef);
+
+ queryObject = new PreparedQueryObject();
+ String insQuery = "BEGIN BATCH" +
+ " UPDATE " + keyspace + "." + table +
+ " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
+ " INSERT INTO " + keyspace + "." + table +
+ "(key, lockReference, createTime, acquireTime) VALUES (?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
+
+ queryObject.addValue(lockRef);
+ queryObject.addValue(lockName);
+ if (prevGuard != 0)
+ queryObject.addValue(prevGuard);
+
+ queryObject.addValue(lockName);
+ queryObject.addValue(lockRef);
+ queryObject.addValue(String.valueOf(lockEpochMillis));
+ queryObject.addValue("0");
+ queryObject.appendQueryString(insQuery);
+ boolean pResult = dsHandle.executePut(queryObject, "critical");
+ return String.valueOf(lockRef);
+ }
+
+ /**
+ * Returns a result set containing the list of clients waiting for a particular lock
+ * @param keyspace
+ * @param table
+ * @param key
+ * @return list of lockrefs in the queue
+ * @throws MusicServiceException
+ * @throws MusicQueryException
+ */
+ public List<String> getLockQueue(String keyspace, String table, String key)
+ throws MusicServiceException, MusicQueryException {
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "Getting the queue for " + keyspace+"."+table+"."+key);
+ table = table_prepend_name+table;
+ String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "';";
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ queryObject.appendQueryString(selectQuery);
+ ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
+ ArrayList<String> lockQueue = new ArrayList<>();
+ for (Row row: rs) {
+ lockQueue.add(Long.toString(row.getLong("lockReference")));
+ }
+ return lockQueue;
+ }
+
+
+ /**
+ * Returns a result set containing the list of clients waiting for a particular lock
+ * @param keyspace
+ * @param table
+ * @param key
+ * @return size of lockrefs queue
+ * @throws MusicServiceException
+ * @throws MusicQueryException
+ */
+ public long getLockQueueSize(String keyspace, String table, String key)
+ throws MusicServiceException, MusicQueryException {
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "Getting the queue size for " + keyspace+"."+table+"."+key);
+ table = table_prepend_name+table;
+ String selectQuery = "select count(*) from " + keyspace + "." + table + " where key='" + key + "';";
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ queryObject.appendQueryString(selectQuery);
+ ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
+ System.err.println(rs);
+
+ return rs.one().getLong("count");
+ }
+
+
+ /**
+ * This method returns the top of lock table/queue for the key.
+ * @param keyspace of the application.
+ * @param table of the application.
+ * @param key is the primary key of the application table
+ * @return the UUID lock reference.
+ * @throws MusicServiceException
+ * @throws MusicQueryException
+ */
+ public LockObject peekLockQueue(String keyspace, String table, String key) throws MusicServiceException, MusicQueryException{
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "Peek in lock table for " + keyspace+"."+table+"."+key);
+ table = table_prepend_name+table;
+ String selectQuery = "select * from "+keyspace+"."+table+" where key='"+key+"' LIMIT 1;";
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ queryObject.appendQueryString(selectQuery);
+ ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
+ Row row = results.one();
+ String lockReference = "" + row.getLong("lockReference");
+ String createTime = row.getString("createTime");
+ String acquireTime = row.getString("acquireTime");
+
+ return new LockObject(lockReference, createTime,acquireTime);
+ }
+
+
+ /**
+ * This method removes the lock ref from the lock table/queue for the key.
+ * @param keyspace of the application.
+ * @param table of the application.
+ * @param key is the primary key of the application table
+ * @param lockReference the lock reference that needs to be dequeued.
+ * @throws MusicServiceException
+ * @throws MusicQueryException
+ */
+ public void deQueueLockRef(String keyspace, String table, String key, String lockReference) throws MusicServiceException, MusicQueryException{
+ table = table_prepend_name+table;
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ Long lockReferenceL = Long.parseLong(lockReference);
+ String deleteQuery = "delete from "+keyspace+"."+table+" where key='"+key+"' AND lockReference ="+lockReferenceL+" IF EXISTS;";
+ queryObject.appendQueryString(deleteQuery);
+ dsHandle.executePut(queryObject, "critical");
+ }
+
+
+ public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) throws MusicServiceException, MusicQueryException{
+ table = table_prepend_name+table;
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ Long lockReferenceL = Long.parseLong(lockReference);
+ String updateQuery = "update "+keyspace+"."+table+" set acquireTime='"+ System.currentTimeMillis()+"' where key='"+key+"' AND lockReference = "+lockReferenceL+" IF EXISTS;";
+ queryObject.appendQueryString(updateQuery);
+ dsHandle.executePut(queryObject, "eventual");
+
+ }
+
+
+}