aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMohammad Salehe <salehe@cs.toronto.edu>2018-11-07 11:09:50 -0500
committerMohammad Salehe <salehe@cs.toronto.edu>2018-11-20 22:42:37 -0500
commitd8574a1d02a90ed25aa1651f310261bb90098171 (patch)
tree76533fe809cf2659fb59c4ec9ab2f14c56acd765
parenta1f03a639a2ba38ec5c19e96c3c9d703fa048a8f (diff)
Implement sequential lock references
Use guard column for sequential lock references Move v2sTimeStampInMicroseconds as a static method to MusicUtil Use v2sTimeStamp in CassaDataStore.executePut Change-Id: I48b817c4bfe04ec50f5ad6e7cdc91b34fd607feb Issue-ID: MUSIC-148 Signed-off-by: Mohammad Salehe <salehe@cs.toronto.edu>
-rw-r--r--src/main/java/org/onap/music/datastore/CassaDataStore.java19
-rw-r--r--src/main/java/org/onap/music/datastore/CassaLockStore.java38
-rw-r--r--src/main/java/org/onap/music/main/MusicCore.java80
-rwxr-xr-xsrc/main/java/org/onap/music/main/MusicUtil.java19
4 files changed, 78 insertions, 78 deletions
diff --git a/src/main/java/org/onap/music/datastore/CassaDataStore.java b/src/main/java/org/onap/music/datastore/CassaDataStore.java
index 14934f6e..ec0b2581 100644
--- a/src/main/java/org/onap/music/datastore/CassaDataStore.java
+++ b/src/main/java/org/onap/music/datastore/CassaDataStore.java
@@ -32,6 +32,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
+import com.datastax.driver.core.*;
import org.onap.music.eelf.logging.EELFLoggerDelegate;
import org.onap.music.eelf.logging.format.AppMessages;
import org.onap.music.eelf.logging.format.ErrorSeverity;
@@ -39,18 +40,7 @@ import org.onap.music.eelf.logging.format.ErrorTypes;
import org.onap.music.exceptions.MusicQueryException;
import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.main.MusicUtil;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.ColumnDefinitions.Definition;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.DataType;
-import com.datastax.driver.core.KeyspaceMetadata;
-import com.datastax.driver.core.Metadata;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.exceptions.AlreadyExistsException;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
@@ -373,7 +363,6 @@ public class CassaDataStore {
try {
preparedInsert = session.prepare(queryObject.getQuery());
-
} catch(InvalidQueryException iqe) {
logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
throw new MusicQueryException(iqe.getMessage());
@@ -391,9 +380,11 @@ public class CassaDataStore {
preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
}
- ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray()));
- result = rs.wasApplied();
+ BoundStatement boundStatement = preparedInsert.bind(queryObject.getValues().toArray());
+ boundStatement.setDefaultTimestamp(MusicUtil.v2sTimeStampInMicroseconds(0, System.currentTimeMillis()));
+ ResultSet rs = session.execute(boundStatement);
+ result = rs.wasApplied();
}
catch (AlreadyExistsException ae) {
logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
diff --git a/src/main/java/org/onap/music/datastore/CassaLockStore.java b/src/main/java/org/onap/music/datastore/CassaLockStore.java
index e03a1c07..67e96533 100644
--- a/src/main/java/org/onap/music/datastore/CassaLockStore.java
+++ b/src/main/java/org/onap/music/datastore/CassaLockStore.java
@@ -1,5 +1,6 @@
package org.onap.music.datastore;
+import java.util.List;
import java.util.UUID;
import org.onap.music.eelf.logging.EELFLoggerDelegate;
@@ -77,31 +78,41 @@ public class CassaLockStore {
public String genLockRefandEnQueue(String keyspace, String table, String lockName) throws MusicServiceException, MusicQueryException {
logger.info(EELFLoggerDelegate.applicationLogger,
"Create lock reference for " + keyspace + "." + table + "." + lockName);
- table = "lockQ_" + table;
- long lockEpochMillis = System.currentTimeMillis();
- long lockRef = lockEpochMillis;
+ table = "lockQ_" + table;
- logger.info(EELFLoggerDelegate.applicationLogger,
- "Created lock reference for " + keyspace + "." + table + "." + lockName + ":" + lockRef);
- PreparedQueryObject queryObject = new PreparedQueryObject();
- String defaultQuery = " UPDATE " + keyspace + "." + table + " SET guard=-1 WHERE key=? IF guard = NULL;";
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ String selectQuery = "SELECT guard FROM " + keyspace + "." + table + " WHERE key=?;";
- queryObject.addValue(lockName);
- queryObject.appendQueryString(defaultQuery);
- boolean dqResult = dsHandle.executePut(queryObject, "critical");
-// System.out.println("dqResult: " + dqResult);
+ queryObject.addValue(lockName);
+ queryObject.appendQueryString(selectQuery);
+ ResultSet gqResult = dsHandle.executeEventualGet(queryObject);
+ List<Row> latestGuardRow = gqResult.all();
+
+ long prevGuard = 0;
+ long lockRef = 1;
+ if (latestGuardRow.size() > 0) {
+ prevGuard = latestGuardRow.get(0).getLong(0);
+ lockRef = prevGuard + 1;
+ }
+ long lockEpochMillis = System.currentTimeMillis();
+
+// System.out.println("guard(" + lockName + "): " + prevGuard + "->" + lockRef);
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "Created lock reference for " + keyspace + "." + table + "." + lockName + ":" + lockRef);
queryObject = new PreparedQueryObject();
String insQuery = "BEGIN BATCH" +
- " UPDATE " + keyspace + "." + table + " SET guard=? WHERE key=? IF guard < ?;" +
+ " UPDATE " + keyspace + "." + table +
+ " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
" INSERT INTO " + keyspace + "." + table +
"(key, lockReference, createTime, acquireTime) VALUES (?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
queryObject.addValue(lockRef);
queryObject.addValue(lockName);
- queryObject.addValue(lockRef);
+ if (prevGuard != 0)
+ queryObject.addValue(prevGuard);
queryObject.addValue(lockName);
queryObject.addValue(lockRef);
@@ -109,7 +120,6 @@ public class CassaLockStore {
queryObject.addValue("0");
queryObject.appendQueryString(insQuery);
boolean pResult = dsHandle.executePut(queryObject, "critical");
-// System.out.println("pResult: " + pResult);
return String.valueOf(lockRef);
}
diff --git a/src/main/java/org/onap/music/main/MusicCore.java b/src/main/java/org/onap/music/main/MusicCore.java
index d7c5bcec..cf2a47ee 100644
--- a/src/main/java/org/onap/music/main/MusicCore.java
+++ b/src/main/java/org/onap/music/main/MusicCore.java
@@ -26,7 +26,6 @@ import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;
-import java.util.UUID;
import org.onap.music.datastore.CassaDataStore;
import org.onap.music.datastore.CassaLockStore;
@@ -520,9 +519,14 @@ public class MusicCore {
}
String query = queryObject.getQuery();
- query = query.replaceFirst("SET", "using TIMESTAMP "+ v2sTimeStampInMicroseconds(lockReference, System.currentTimeMillis())+ " SET");
+ long timeOfWrite = System.currentTimeMillis();
+ long lockOrdinal = Long.parseLong(lockReference);
+ long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
+ // TODO: use Statement instead of modifying query
+ query = query.replaceFirst("SET", "USING TIMESTAMP "+ ts + " SET");
queryObject.replaceQueryString(query);
- getDSHandle().executePut(queryObject, MusicUtil.CRITICAL);
+ CassaDataStore dsHandle = getDSHandle();
+ dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
long end = System.currentTimeMillis();
logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
}catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
@@ -617,29 +621,32 @@ public class MusicCore {
public static ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException, MusicQueryException, MusicServiceException {
long start = System.currentTimeMillis();
+
String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
String lockReference = createLockReference(fullyQualifiedKey);
long lockCreationTime = System.currentTimeMillis();
+
ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockReference);
long lockAcqTime = System.currentTimeMillis();
- if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
- logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockReference);
- ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
- queryObject, lockReference, conditionInfo);
- long criticalPutTime = System.currentTimeMillis();
- voluntaryReleaseLock(fullyQualifiedKey,lockReference);
- long lockDeleteTime = System.currentTimeMillis();
- String timingInfo = "|lock creation time:" + (lockCreationTime - start)
- + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
- + "|critical put time:" + (criticalPutTime - lockAcqTime)
- + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
- criticalPutResult.setTimingInfo(timingInfo);
- return criticalPutResult;
- } else {
+
+ if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockReference);
voluntaryReleaseLock(fullyQualifiedKey,lockReference);
return lockAcqResult;
}
+
+ logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockReference);
+ ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
+ queryObject, lockReference, conditionInfo);
+ long criticalPutTime = System.currentTimeMillis();
+ voluntaryReleaseLock(fullyQualifiedKey,lockReference);
+ long lockDeleteTime = System.currentTimeMillis();
+ String timingInfo = "|lock creation time:" + (lockCreationTime - start)
+ + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
+ + "|critical put time:" + (criticalPutTime - lockAcqTime)
+ + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
+ criticalPutResult.setTimingInfo(timingInfo);
+ return criticalPutResult;
}
@@ -789,38 +796,11 @@ public class MusicCore {
resultMap.put("keyspace",keyspace);
return resultMap;
}
-
-
- /**
- * Given the time of write for an update in a critical section, this method provides a transformed timestamp
- * that ensures that a previous lock holder who is still alive can never corrupt a later critical section.
- * The main idea is to us the lock reference to clearly demarcate the timestamps across critical sections.
- * @param the UUID lock reference associated with the write.
- * @param the long timeOfWrite which is the actual time at which the write took place
- * @throws MusicServiceException
- * @throws MusicQueryException
- */
- private static long v2sTimeStampInMicroseconds(String lockReference, long timeOfWrite) throws MusicServiceException, MusicQueryException{
- long lockEpochMillis = Long.parseLong(lockReference);
-
- long lockEternityMillis = lockEpochMillis - MusicUtil.MusicEternityEpochMillis;
-
- long ts = lockEternityMillis * MusicUtil.MaxCriticalSectionDurationMillis
- + (timeOfWrite - lockEpochMillis);
-
- return ts;
-
-// long test = (lockReferenceUUID.timestamp()-MusicUtil.MusicEternityEpochMillis);
-// long timeStamp = (lockReferenceUUID.timestamp()-MusicUtil.MusicEternityEpochMillis)*MusicUtil.MaxCriticalSectionDurationMillis
-// +timeOfWrite;
-// return timeStamp;
-
-// return timeOfWrite*1000;
- }
- public static void main(String[] args) {
- String x = "axe top";
- x = x.replaceFirst("top", "sword");
- System.out.print(x); //returns sword pickaxe
- }
+
+// public static void main(String[] args) {
+// String x = "axe top";
+// x = x.replaceFirst("top", "sword");
+// System.out.print(x); //returns sword pickaxe
+// }
}
diff --git a/src/main/java/org/onap/music/main/MusicUtil.java b/src/main/java/org/onap/music/main/MusicUtil.java
index 1cfd5fbf..a12a090e 100755
--- a/src/main/java/org/onap/music/main/MusicUtil.java
+++ b/src/main/java/org/onap/music/main/MusicUtil.java
@@ -46,6 +46,8 @@ import org.onap.music.eelf.logging.EELFLoggerDelegate;
import com.datastax.driver.core.DataType;
import com.sun.jersey.core.util.Base64;
+import org.onap.music.exceptions.MusicQueryException;
+import org.onap.music.exceptions.MusicServiceException;
/**
* @author nelson24
@@ -99,6 +101,8 @@ public class MusicUtil {
public static ConcurrentMap<String, Long> zkNodeMap = new ConcurrentHashMap<>();
public static final long MusicEternityEpochMillis = 1533081600000L; // Wednesday, August 1, 2018 12:00:00 AM
+
+ public static final long MaxLockReferenceTimePart = 1000000000000L; // millis after eternity (eq sometime in 2050)
public static final long MaxCriticalSectionDurationMillis = 1L * 24 * 60 * 60 * 1000; // 1 day
@@ -596,4 +600,19 @@ public class MusicUtil {
MusicUtil.setCassPwd(prop.getProperty("cassandra.password"));
}
+ /**
+ * Given the time of write for an update in a critical section, this method provides a transformed timestamp
+ * that ensures that a previous lock holder who is still alive can never corrupt a later critical section.
+ * The main idea is to us the lock reference to clearly demarcate the timestamps across critical sections.
+ * @param the UUID lock reference associated with the write.
+ * @param the long timeOfWrite which is the actual time at which the write took place
+ * @throws MusicServiceException
+ * @throws MusicQueryException
+ */
+ public static long v2sTimeStampInMicroseconds(long ordinal, long timeOfWrite) throws MusicServiceException, MusicQueryException {
+ // TODO: use acquire time instead of music eternity epoch
+ long ts = ordinal * MaxLockReferenceTimePart + (timeOfWrite - MusicEternityEpochMillis);
+
+ return ts;
+ }
}