From d8574a1d02a90ed25aa1651f310261bb90098171 Mon Sep 17 00:00:00 2001 From: Mohammad Salehe Date: Wed, 7 Nov 2018 11:09:50 -0500 Subject: Implement sequential lock references Use guard column for sequential lock references Move v2sTimeStampInMicroseconds as a static method to MusicUtil Use v2sTimeStamp in CassaDataStore.executePut Change-Id: I48b817c4bfe04ec50f5ad6e7cdc91b34fd607feb Issue-ID: MUSIC-148 Signed-off-by: Mohammad Salehe --- .../org/onap/music/datastore/CassaDataStore.java | 19 ++--- .../org/onap/music/datastore/CassaLockStore.java | 38 ++++++---- src/main/java/org/onap/music/main/MusicCore.java | 80 ++++++++-------------- src/main/java/org/onap/music/main/MusicUtil.java | 19 +++++ 4 files changed, 78 insertions(+), 78 deletions(-) diff --git a/src/main/java/org/onap/music/datastore/CassaDataStore.java b/src/main/java/org/onap/music/datastore/CassaDataStore.java index 14934f6e..ec0b2581 100644 --- a/src/main/java/org/onap/music/datastore/CassaDataStore.java +++ b/src/main/java/org/onap/music/datastore/CassaDataStore.java @@ -32,6 +32,7 @@ import java.util.Iterator; import java.util.Map; import java.util.UUID; +import com.datastax.driver.core.*; import org.onap.music.eelf.logging.EELFLoggerDelegate; import org.onap.music.eelf.logging.format.AppMessages; import org.onap.music.eelf.logging.format.ErrorSeverity; @@ -39,18 +40,7 @@ import org.onap.music.eelf.logging.format.ErrorTypes; import org.onap.music.exceptions.MusicQueryException; import org.onap.music.exceptions.MusicServiceException; import org.onap.music.main.MusicUtil; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ColumnDefinitions; import com.datastax.driver.core.ColumnDefinitions.Definition; -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.DataType; -import com.datastax.driver.core.KeyspaceMetadata; -import com.datastax.driver.core.Metadata; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.TableMetadata; import com.datastax.driver.core.exceptions.AlreadyExistsException; import com.datastax.driver.core.exceptions.InvalidQueryException; import com.datastax.driver.core.exceptions.NoHostAvailableException; @@ -373,7 +363,6 @@ public class CassaDataStore { try { preparedInsert = session.prepare(queryObject.getQuery()); - } catch(InvalidQueryException iqe) { logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); throw new MusicQueryException(iqe.getMessage()); @@ -391,9 +380,11 @@ public class CassaDataStore { preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE); } - ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray())); - result = rs.wasApplied(); + BoundStatement boundStatement = preparedInsert.bind(queryObject.getValues().toArray()); + boundStatement.setDefaultTimestamp(MusicUtil.v2sTimeStampInMicroseconds(0, System.currentTimeMillis())); + ResultSet rs = session.execute(boundStatement); + result = rs.wasApplied(); } catch (AlreadyExistsException ae) { logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); diff --git a/src/main/java/org/onap/music/datastore/CassaLockStore.java b/src/main/java/org/onap/music/datastore/CassaLockStore.java index e03a1c07..67e96533 100644 --- a/src/main/java/org/onap/music/datastore/CassaLockStore.java +++ b/src/main/java/org/onap/music/datastore/CassaLockStore.java @@ -1,5 +1,6 @@ package org.onap.music.datastore; +import java.util.List; import java.util.UUID; import org.onap.music.eelf.logging.EELFLoggerDelegate; @@ -77,31 +78,41 @@ public class CassaLockStore { public String genLockRefandEnQueue(String keyspace, String table, String lockName) throws MusicServiceException, MusicQueryException { logger.info(EELFLoggerDelegate.applicationLogger, "Create lock reference for " + keyspace + "." + table + "." + lockName); - table = "lockQ_" + table; - long lockEpochMillis = System.currentTimeMillis(); - long lockRef = lockEpochMillis; + table = "lockQ_" + table; - logger.info(EELFLoggerDelegate.applicationLogger, - "Created lock reference for " + keyspace + "." + table + "." + lockName + ":" + lockRef); - PreparedQueryObject queryObject = new PreparedQueryObject(); - String defaultQuery = " UPDATE " + keyspace + "." + table + " SET guard=-1 WHERE key=? IF guard = NULL;"; + PreparedQueryObject queryObject = new PreparedQueryObject(); + String selectQuery = "SELECT guard FROM " + keyspace + "." + table + " WHERE key=?;"; - queryObject.addValue(lockName); - queryObject.appendQueryString(defaultQuery); - boolean dqResult = dsHandle.executePut(queryObject, "critical"); -// System.out.println("dqResult: " + dqResult); + queryObject.addValue(lockName); + queryObject.appendQueryString(selectQuery); + ResultSet gqResult = dsHandle.executeEventualGet(queryObject); + List latestGuardRow = gqResult.all(); + + long prevGuard = 0; + long lockRef = 1; + if (latestGuardRow.size() > 0) { + prevGuard = latestGuardRow.get(0).getLong(0); + lockRef = prevGuard + 1; + } + long lockEpochMillis = System.currentTimeMillis(); + +// System.out.println("guard(" + lockName + "): " + prevGuard + "->" + lockRef); + logger.info(EELFLoggerDelegate.applicationLogger, + "Created lock reference for " + keyspace + "." + table + "." + lockName + ":" + lockRef); queryObject = new PreparedQueryObject(); String insQuery = "BEGIN BATCH" + - " UPDATE " + keyspace + "." + table + " SET guard=? WHERE key=? IF guard < ?;" + + " UPDATE " + keyspace + "." + table + + " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" + " INSERT INTO " + keyspace + "." + table + "(key, lockReference, createTime, acquireTime) VALUES (?,?,?,?) IF NOT EXISTS; APPLY BATCH;"; queryObject.addValue(lockRef); queryObject.addValue(lockName); - queryObject.addValue(lockRef); + if (prevGuard != 0) + queryObject.addValue(prevGuard); queryObject.addValue(lockName); queryObject.addValue(lockRef); @@ -109,7 +120,6 @@ public class CassaLockStore { queryObject.addValue("0"); queryObject.appendQueryString(insQuery); boolean pResult = dsHandle.executePut(queryObject, "critical"); -// System.out.println("pResult: " + pResult); return String.valueOf(lockRef); } diff --git a/src/main/java/org/onap/music/main/MusicCore.java b/src/main/java/org/onap/music/main/MusicCore.java index d7c5bcec..cf2a47ee 100644 --- a/src/main/java/org/onap/music/main/MusicCore.java +++ b/src/main/java/org/onap/music/main/MusicCore.java @@ -26,7 +26,6 @@ import java.io.StringWriter; import java.util.HashMap; import java.util.Map; import java.util.StringTokenizer; -import java.util.UUID; import org.onap.music.datastore.CassaDataStore; import org.onap.music.datastore.CassaLockStore; @@ -520,9 +519,14 @@ public class MusicCore { } String query = queryObject.getQuery(); - query = query.replaceFirst("SET", "using TIMESTAMP "+ v2sTimeStampInMicroseconds(lockReference, System.currentTimeMillis())+ " SET"); + long timeOfWrite = System.currentTimeMillis(); + long lockOrdinal = Long.parseLong(lockReference); + long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite); + // TODO: use Statement instead of modifying query + query = query.replaceFirst("SET", "USING TIMESTAMP "+ ts + " SET"); queryObject.replaceQueryString(query); - getDSHandle().executePut(queryObject, MusicUtil.CRITICAL); + CassaDataStore dsHandle = getDSHandle(); + dsHandle.executePut(queryObject, MusicUtil.CRITICAL); long end = System.currentTimeMillis(); logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms"); }catch (MusicQueryException | MusicServiceException | MusicLockingException e) { @@ -617,29 +621,32 @@ public class MusicCore { public static ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey, PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException, MusicQueryException, MusicServiceException { long start = System.currentTimeMillis(); + String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey; String lockReference = createLockReference(fullyQualifiedKey); long lockCreationTime = System.currentTimeMillis(); + ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockReference); long lockAcqTime = System.currentTimeMillis(); - if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) { - logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockReference); - ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, - queryObject, lockReference, conditionInfo); - long criticalPutTime = System.currentTimeMillis(); - voluntaryReleaseLock(fullyQualifiedKey,lockReference); - long lockDeleteTime = System.currentTimeMillis(); - String timingInfo = "|lock creation time:" + (lockCreationTime - start) - + "|lock accquire time:" + (lockAcqTime - lockCreationTime) - + "|critical put time:" + (criticalPutTime - lockAcqTime) - + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|"; - criticalPutResult.setTimingInfo(timingInfo); - return criticalPutResult; - } else { + + if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) { logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockReference); voluntaryReleaseLock(fullyQualifiedKey,lockReference); return lockAcqResult; } + + logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockReference); + ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, + queryObject, lockReference, conditionInfo); + long criticalPutTime = System.currentTimeMillis(); + voluntaryReleaseLock(fullyQualifiedKey,lockReference); + long lockDeleteTime = System.currentTimeMillis(); + String timingInfo = "|lock creation time:" + (lockCreationTime - start) + + "|lock accquire time:" + (lockAcqTime - lockCreationTime) + + "|critical put time:" + (criticalPutTime - lockAcqTime) + + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|"; + criticalPutResult.setTimingInfo(timingInfo); + return criticalPutResult; } @@ -789,38 +796,11 @@ public class MusicCore { resultMap.put("keyspace",keyspace); return resultMap; } - - - /** - * Given the time of write for an update in a critical section, this method provides a transformed timestamp - * that ensures that a previous lock holder who is still alive can never corrupt a later critical section. - * The main idea is to us the lock reference to clearly demarcate the timestamps across critical sections. - * @param the UUID lock reference associated with the write. - * @param the long timeOfWrite which is the actual time at which the write took place - * @throws MusicServiceException - * @throws MusicQueryException - */ - private static long v2sTimeStampInMicroseconds(String lockReference, long timeOfWrite) throws MusicServiceException, MusicQueryException{ - long lockEpochMillis = Long.parseLong(lockReference); - - long lockEternityMillis = lockEpochMillis - MusicUtil.MusicEternityEpochMillis; - - long ts = lockEternityMillis * MusicUtil.MaxCriticalSectionDurationMillis - + (timeOfWrite - lockEpochMillis); - - return ts; - -// long test = (lockReferenceUUID.timestamp()-MusicUtil.MusicEternityEpochMillis); -// long timeStamp = (lockReferenceUUID.timestamp()-MusicUtil.MusicEternityEpochMillis)*MusicUtil.MaxCriticalSectionDurationMillis -// +timeOfWrite; -// return timeStamp; - -// return timeOfWrite*1000; - } - public static void main(String[] args) { - String x = "axe top"; - x = x.replaceFirst("top", "sword"); - System.out.print(x); //returns sword pickaxe - } + +// public static void main(String[] args) { +// String x = "axe top"; +// x = x.replaceFirst("top", "sword"); +// System.out.print(x); //returns sword pickaxe +// } } diff --git a/src/main/java/org/onap/music/main/MusicUtil.java b/src/main/java/org/onap/music/main/MusicUtil.java index 1cfd5fbf..a12a090e 100755 --- a/src/main/java/org/onap/music/main/MusicUtil.java +++ b/src/main/java/org/onap/music/main/MusicUtil.java @@ -46,6 +46,8 @@ import org.onap.music.eelf.logging.EELFLoggerDelegate; import com.datastax.driver.core.DataType; import com.sun.jersey.core.util.Base64; +import org.onap.music.exceptions.MusicQueryException; +import org.onap.music.exceptions.MusicServiceException; /** * @author nelson24 @@ -99,6 +101,8 @@ public class MusicUtil { public static ConcurrentMap zkNodeMap = new ConcurrentHashMap<>(); public static final long MusicEternityEpochMillis = 1533081600000L; // Wednesday, August 1, 2018 12:00:00 AM + + public static final long MaxLockReferenceTimePart = 1000000000000L; // millis after eternity (eq sometime in 2050) public static final long MaxCriticalSectionDurationMillis = 1L * 24 * 60 * 60 * 1000; // 1 day @@ -596,4 +600,19 @@ public class MusicUtil { MusicUtil.setCassPwd(prop.getProperty("cassandra.password")); } + /** + * Given the time of write for an update in a critical section, this method provides a transformed timestamp + * that ensures that a previous lock holder who is still alive can never corrupt a later critical section. + * The main idea is to us the lock reference to clearly demarcate the timestamps across critical sections. + * @param the UUID lock reference associated with the write. + * @param the long timeOfWrite which is the actual time at which the write took place + * @throws MusicServiceException + * @throws MusicQueryException + */ + public static long v2sTimeStampInMicroseconds(long ordinal, long timeOfWrite) throws MusicServiceException, MusicQueryException { + // TODO: use acquire time instead of music eternity epoch + long ts = ordinal * MaxLockReferenceTimePart + (timeOfWrite - MusicEternityEpochMillis); + + return ts; + } } -- cgit 1.2.3-korg