diff options
Diffstat (limited to 'src/main/java/org/onap/music/service/impl')
-rw-r--r-- | src/main/java/org/onap/music/service/impl/MusicCassaCore.java | 308 |
1 files changed, 175 insertions, 133 deletions
diff --git a/src/main/java/org/onap/music/service/impl/MusicCassaCore.java b/src/main/java/org/onap/music/service/impl/MusicCassaCore.java index f897c181..79662822 100644 --- a/src/main/java/org/onap/music/service/impl/MusicCassaCore.java +++ b/src/main/java/org/onap/music/service/impl/MusicCassaCore.java @@ -54,6 +54,7 @@ import com.datastax.driver.core.DataType; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.TableMetadata; +import org.onap.music.util.TimeMeasureInstance; /** @@ -102,22 +103,28 @@ public class MusicCassaCore implements MusicCoreService { } public String createLockReference(String fullyQualifiedKey, boolean isWriteLock) { - String[] splitString = fullyQualifiedKey.split("\\."); - String keyspace = splitString[0]; - String table = splitString[1]; - String lockName = splitString[2]; - - logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName); - long start = System.currentTimeMillis(); - String lockReference = null; + TimeMeasureInstance.instance().enter("createLockReference"); try { - lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, isWriteLock); - } catch (MusicLockingException | MusicServiceException | MusicQueryException e) { - e.printStackTrace(); - } - long end = System.currentTimeMillis(); - logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms"); - return lockReference; + String[] splitString = fullyQualifiedKey.split("\\."); + String keyspace = splitString[0]; + String table = splitString[1]; + String lockName = splitString[2]; + + logger.info(EELFLoggerDelegate.applicationLogger, "Creating lock reference for lock name:" + lockName); + long start = System.currentTimeMillis(); + String lockReference = null; + try { + lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, isWriteLock); + } catch (MusicLockingException | MusicServiceException | MusicQueryException e) { + e.printStackTrace(); + } + long end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger, "Time taken to create lock reference:" + (end - start) + " ms"); + return lockReference; + } + finally { + TimeMeasureInstance.instance().exit(); + } } @@ -146,16 +153,17 @@ public class MusicCassaCore implements MusicCoreService { } private static ReturnType isTopOfLockStore(String keyspace, String table, - String primaryKeyValue, String lockReference) - throws MusicLockingException, MusicQueryException, MusicServiceException { - - // return failure to lock holders too early or already evicted from the lock store - String topOfLockStoreS = - getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef; - long topOfLockStoreL = Long.parseLong(topOfLockStoreS); - long lockReferenceL = Long.parseLong(lockReference); + String primaryKeyValue, String lockReference) + throws MusicLockingException, MusicQueryException, MusicServiceException { + TimeMeasureInstance.instance().enter("isTopOfLockStore"); + try { + // return failure to lock holders too early or already evicted from the lock store + String topOfLockStoreS = + getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef; + long topOfLockStoreL = Long.parseLong(topOfLockStoreS); + long lockReferenceL = Long.parseLong(lockReference); - if (lockReferenceL > topOfLockStoreL) { + if (lockReferenceL > topOfLockStoreL) { // only need to check if this is a read lock.... if (getLockingServiceHandle().isTopOfLockQueue(keyspace, table, primaryKeyValue, lockReference)) { @@ -165,55 +173,65 @@ public class MusicCassaCore implements MusicCoreService { lockReference + " is not the lock holder yet"); return new ReturnType(ResultType.FAILURE, lockReference + " is not the lock holder yet"); - } + } - if (lockReferenceL < topOfLockStoreL) { + if (lockReferenceL < topOfLockStoreL) { logger.info(EELFLoggerDelegate.applicationLogger, lockReference + " is no longer/or was never in the lock store queue"); return new ReturnType(ResultType.FAILURE, lockReference + " is no longer/or was never in the lock store queue"); - } + } - return new ReturnType(ResultType.SUCCESS, lockReference + " is top of lock store"); + return new ReturnType(ResultType.SUCCESS, lockReference + " is top of lock store"); + } + finally { + TimeMeasureInstance.instance().exit(); + } } - + public ReturnType acquireLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException, MusicQueryException, MusicServiceException { - String[] splitString = fullyQualifiedKey.split("\\."); - String keyspace = splitString[0]; - String table = splitString[1]; - String primaryKeyValue = splitString[2]; + TimeMeasureInstance.instance().enter("acquireLock"); + try { + String[] splitString = fullyQualifiedKey.split("\\."); + String keyspace = splitString[0]; + String table = splitString[1]; + String primaryKeyValue = splitString[2]; - ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference); - - if(result.getResult().equals(ResultType.FAILURE)) - return result;//not top of the lock store q - - //check to see if the value of the key has to be synced in case there was a forceful release - String syncTable = keyspace+".unsyncedKeys_"+table; - String query = "select * from "+syncTable+" where key='"+fullyQualifiedKey+"';"; - PreparedQueryObject readQueryObject = new PreparedQueryObject(); - readQueryObject.appendQueryString(query); - ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject); - if (results.all().size() != 0) { - logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!"); - try { - syncQuorum(keyspace, table, primaryKeyValue); - } catch (Exception e) { - StringWriter sw = new StringWriter(); - logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); - String exceptionAsString = sw.toString(); - return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString); - } - String cleanQuery = "delete from music_internal.unsynced_keys where key='"+fullyQualifiedKey+"';"; - PreparedQueryObject deleteQueryObject = new PreparedQueryObject(); - deleteQueryObject.appendQueryString(cleanQuery); - MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical"); - } - - getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockReference); - - return new ReturnType(ResultType.SUCCESS, lockReference+" is the lock holder for the key"); + ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference); + + if (result.getResult().equals(ResultType.FAILURE)) + return result;//not top of the lock store q + + //check to see if the value of the key has to be synced in case there was a forceful release + String syncTable = keyspace + ".unsyncedKeys_" + table; + String query = "select * from " + syncTable + " where key='" + fullyQualifiedKey + "';"; + PreparedQueryObject readQueryObject = new PreparedQueryObject(); + readQueryObject.appendQueryString(query); + ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject); + if (results.all().size() != 0) { + logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!"); + try { + syncQuorum(keyspace, table, primaryKeyValue); + } catch (Exception e) { + StringWriter sw = new StringWriter(); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), "[ERR506E] Failed to aquire lock ", ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + String exceptionAsString = sw.toString(); + return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString); + } + String cleanQuery = "delete from music_internal.unsynced_keys where key='" + fullyQualifiedKey + "';"; + PreparedQueryObject deleteQueryObject = new PreparedQueryObject(); + deleteQueryObject.appendQueryString(cleanQuery); + MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical"); + } + + getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockReference); + + return new ReturnType(ResultType.SUCCESS, lockReference + " is the lock holder for the key"); + } + finally { + TimeMeasureInstance.instance().exit(); + } } @@ -359,23 +377,35 @@ public class MusicCassaCore implements MusicCoreService { } public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) { - long start = System.currentTimeMillis(); - String[] splitString = fullyQualifiedKey.split("\\."); - String keyspace = splitString[0]; - String table = splitString[1]; - String primaryKeyValue = splitString[2]; + TimeMeasureInstance.instance().enter("destroyLockRef"); try { - getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference); - } catch (MusicLockingException | MusicServiceException | MusicQueryException e) { - logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockReference ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); - } - long end = System.currentTimeMillis(); - logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms"); - return getMusicLockState(fullyQualifiedKey); + long start = System.currentTimeMillis(); + String[] splitString = fullyQualifiedKey.split("\\."); + String keyspace = splitString[0]; + String table = splitString[1]; + String primaryKeyValue = splitString[2]; + try { + getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference); + } catch (MusicLockingException | MusicServiceException | MusicQueryException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.DESTROYLOCK + lockReference, ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + } + long end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger, "Time taken to destroy lock reference:" + (end - start) + " ms"); + return getMusicLockState(fullyQualifiedKey); + } + finally { + TimeMeasureInstance.instance().exit(); + } } public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException{ - return destroyLockRef(fullyQualifiedKey, lockReference); + TimeMeasureInstance.instance().enter("voluntaryReleaseLock"); + try { + return destroyLockRef(fullyQualifiedKey, lockReference); + } + finally { + TimeMeasureInstance.instance().exit(); + } } public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException, MusicServiceException, MusicQueryException{ @@ -384,14 +414,14 @@ public class MusicCassaCore implements MusicCoreService { String table = splitString[1]; //leave a signal that this key could potentially be unsynchronized - String syncTable = keyspace+".unsyncedKeys_"+table; + String syncTable = keyspace+".unsyncedKeys_"+table; PreparedQueryObject queryObject = new PreparedQueryObject(); String values = "(?)"; queryObject.addValue(fullyQualifiedKey); String insQuery = "insert into "+syncTable+" (key) values "+values+";"; queryObject.appendQueryString(insQuery); - MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical"); - + MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical"); + //now release the lock return destroyLockRef(fullyQualifiedKey, lockReference); } @@ -440,41 +470,47 @@ public class MusicCassaCore implements MusicCoreService { */ public ReturnType criticalPut(String keyspace, String table, String primaryKeyValue, PreparedQueryObject queryObject, String lockReference, Condition conditionInfo) { - long start = System.currentTimeMillis(); + TimeMeasureInstance.instance().enter("executePut"); try { - ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference); - if(result.getResult().equals(ResultType.FAILURE)) - return result;//not top of the lock store q - - if (conditionInfo != null) + long start = System.currentTimeMillis(); try { - if (conditionInfo.testCondition() == false) - return new ReturnType(ResultType.FAILURE, - "Lock acquired but the condition is not true"); - } catch (Exception e) { - return new ReturnType(ResultType.FAILURE, - "Exception thrown while checking the condition, check its sanctity:\n" - + e.getMessage()); + ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference); + if (result.getResult().equals(ResultType.FAILURE)) + return result;//not top of the lock store q + + if (conditionInfo != null) + try { + if (conditionInfo.testCondition() == false) + return new ReturnType(ResultType.FAILURE, + "Lock acquired but the condition is not true"); + } catch (Exception e) { + return new ReturnType(ResultType.FAILURE, + "Exception thrown while checking the condition, check its sanctity:\n" + + e.getMessage()); + } + + String query = queryObject.getQuery(); + long timeOfWrite = System.currentTimeMillis(); + long lockOrdinal = Long.parseLong(lockReference); + long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite); + // TODO: use Statement instead of modifying query + query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET"); + queryObject.replaceQueryString(query); + MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle(); + dsHandle.executePut(queryObject, MusicUtil.CRITICAL); + long end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger, "Time taken for the critical put:" + (end - start) + " ms"); + } catch (MusicQueryException | MusicServiceException | MusicLockingException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage()); + return new ReturnType(ResultType.FAILURE, + "Exception thrown while doing the critical put\n" + + e.getMessage()); } - - String query = queryObject.getQuery(); - long timeOfWrite = System.currentTimeMillis(); - long lockOrdinal = Long.parseLong(lockReference); - long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite); - // TODO: use Statement instead of modifying query - query = query.replaceFirst("SET", "USING TIMESTAMP "+ ts + " SET"); - queryObject.replaceQueryString(query); - MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle(); - dsHandle.executePut(queryObject, MusicUtil.CRITICAL); - long end = System.currentTimeMillis(); - logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms"); - }catch (MusicQueryException | MusicServiceException | MusicLockingException e) { - logger.error(EELFLoggerDelegate.errorLogger,e.getMessage()); - return new ReturnType(ResultType.FAILURE, - "Exception thrown while doing the critical put\n" - + e.getMessage()); + return new ReturnType(ResultType.SUCCESS, "Update performed"); + } + finally { + TimeMeasureInstance.instance().exit(); } - return new ReturnType(ResultType.SUCCESS, "Update performed"); } @@ -559,31 +595,37 @@ public class MusicCassaCore implements MusicCoreService { */ public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey, PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException, MusicQueryException, MusicServiceException { - long start = System.currentTimeMillis(); - String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey; - String lockReference = createLockReference(fullyQualifiedKey); - long lockCreationTime = System.currentTimeMillis(); - ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockReference); - long lockAcqTime = System.currentTimeMillis(); + TimeMeasureInstance.instance().enter("atomicPut"); + try { + long start = System.currentTimeMillis(); + String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey; + String lockReference = createLockReference(fullyQualifiedKey); + long lockCreationTime = System.currentTimeMillis(); + ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockReference); + long lockAcqTime = System.currentTimeMillis(); + + if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) { + logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockReference); + voluntaryReleaseLock(fullyQualifiedKey, lockReference); + return lockAcqResult; + } - if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) { - logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockReference); - voluntaryReleaseLock(fullyQualifiedKey,lockReference); - return lockAcqResult; + logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockReference); + ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, + queryObject, lockReference, conditionInfo); + long criticalPutTime = System.currentTimeMillis(); + voluntaryReleaseLock(fullyQualifiedKey, lockReference); + long lockDeleteTime = System.currentTimeMillis(); + String timingInfo = "|lock creation time:" + (lockCreationTime - start) + + "|lock accquire time:" + (lockAcqTime - lockCreationTime) + + "|critical put time:" + (criticalPutTime - lockAcqTime) + + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|"; + criticalPutResult.setTimingInfo(timingInfo); + return criticalPutResult; + } + finally { + TimeMeasureInstance.instance().exit(); } - - logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockReference); - ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, - queryObject, lockReference, conditionInfo); - long criticalPutTime = System.currentTimeMillis(); - voluntaryReleaseLock(fullyQualifiedKey,lockReference); - long lockDeleteTime = System.currentTimeMillis(); - String timingInfo = "|lock creation time:" + (lockCreationTime - start) - + "|lock accquire time:" + (lockAcqTime - lockCreationTime) - + "|critical put time:" + (criticalPutTime - lockAcqTime) - + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|"; - criticalPutResult.setTimingInfo(timingInfo); - return criticalPutResult; } |