diff options
Diffstat (limited to 'music-core/src/main/java/org/onap')
5 files changed, 115 insertions, 36 deletions
diff --git a/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java b/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java index 97fc1d33..9ccff828 100755 --- a/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java +++ b/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java @@ -25,14 +25,8 @@ package org.onap.music.datastore; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.SocketException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Enumeration; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; import org.onap.music.eelf.logging.EELFLoggerDelegate; @@ -58,12 +52,9 @@ import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.TableMetadata; -import com.datastax.driver.core.TypeCodec; import com.datastax.driver.core.exceptions.AlreadyExistsException; import com.datastax.driver.core.exceptions.InvalidQueryException; -import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.datastax.driver.extras.codecs.enums.EnumNameCodec; -import com.datastax.driver.extras.codecs.enums.EnumOrdinalCodec; /** * @author nelson24 @@ -73,6 +64,7 @@ public class MusicDataStore { public static final String CONSISTENCY_LEVEL_ONE = "ONE"; public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM"; + public static final String CONSISTENCY_LEVEL_LOCAL_QUORUM = "LOCAL_QUORUM"; private Session session; private Cluster cluster; @@ -511,7 +503,18 @@ public class MusicDataStore { throws MusicServiceException, MusicQueryException { return executeGet(queryObject, CONSISTENCY_LEVEL_ONE); } - + + /** + * + * This method performs DDL operation on Cassandra using consistency level LOCAL_QUORUM. + * + * @param queryObject Object containing cassandra prepared query and values. + */ + public ResultSet executeLocalQuorumConsistencyGet(PreparedQueryObject queryObject) + throws MusicServiceException, MusicQueryException { + return executeGet(queryObject, CONSISTENCY_LEVEL_LOCAL_QUORUM); + } + /** * * This method performs DDL operation on Cassandra using consistency level QUORUM. @@ -522,5 +525,5 @@ public class MusicDataStore { throws MusicServiceException, MusicQueryException { return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM); } - + } diff --git a/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java index 10898476..edce3fff 100644 --- a/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java +++ b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java @@ -480,7 +480,7 @@ public class CassaLockStore { DeadlockDetectionUtil ddu = new DeadlockDetectionUtil(); - ResultSet rs = dsHandle.executeQuorumConsistencyGet(queryObject); + ResultSet rs = dsHandle.executeLocalQuorumConsistencyGet(queryObject); logger.debug("rs has " + rs.getAvailableWithoutFetching() + (rs.isFullyFetched()?"":" (or more!)") ); Iterator<Row> it = rs.iterator(); while (it.hasNext()) { diff --git a/music-core/src/main/java/org/onap/music/main/MusicCore.java b/music-core/src/main/java/org/onap/music/main/MusicCore.java index 658f2124..226dfb07 100644 --- a/music-core/src/main/java/org/onap/music/main/MusicCore.java +++ b/music-core/src/main/java/org/onap/music/main/MusicCore.java @@ -81,12 +81,12 @@ public class MusicCore { return musicCore.acquireLockWithLease(key, lockId, leasePeriod); } - public static String createLockReference(String fullyQualifiedKey) throws MusicLockingException { - return musicCore.createLockReference(fullyQualifiedKey); + public static String createLockReferenceAtomic(String fullyQualifiedKey) throws MusicLockingException { + return musicCore.createLockReferenceAtomic(fullyQualifiedKey); } - public static String createLockReference(String fullyQualifiedKey, LockType locktype) throws MusicLockingException { - return musicCore.createLockReference(fullyQualifiedKey, locktype); + public static String createLockReferenceAtomic(String fullyQualifiedKey, LockType locktype) throws MusicLockingException { + return musicCore.createLockReferenceAtomic(fullyQualifiedKey, locktype); } public static String createLockReference(String fullyQualifiedKey, LockType locktype, String owner) throws MusicLockingException { diff --git a/music-core/src/main/java/org/onap/music/service/MusicCoreService.java b/music-core/src/main/java/org/onap/music/service/MusicCoreService.java index b3226906..7629eae2 100644 --- a/music-core/src/main/java/org/onap/music/service/MusicCoreService.java +++ b/music-core/src/main/java/org/onap/music/service/MusicCoreService.java @@ -89,14 +89,14 @@ public interface MusicCoreService { * @param fullyQualifiedKey the key to create a lock on * @see {@link #creatLockReference(String, LockType)} */ - public String createLockReference(String fullyQualifiedKey) throws MusicLockingException; // lock name + public String createLockReferenceAtomic(String fullyQualifiedKey) throws MusicLockingException; // lock name /** * Create a lock ref in the music lock store * @param fullyQualifiedKey the key to create a lock on * @param locktype the type of lock create, see {@link LockType} */ - public String createLockReference(String fullyQualifiedKey, LockType locktype) throws MusicLockingException; + public String createLockReferenceAtomic(String fullyQualifiedKey, LockType locktype) throws MusicLockingException; /** * Create a lock ref in the music lock store diff --git a/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java b/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java index 0d2e3f0a..c7c7cddc 100644 --- a/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java +++ b/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java @@ -26,8 +26,11 @@ package org.onap.music.service.impl; import java.io.StringWriter; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.StringTokenizer; import javax.ws.rs.core.MultivaluedMap; @@ -71,6 +74,7 @@ public class MusicCassaCore implements MusicCoreService { private static CassaLockStore mLockHandle = null; private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class); private static MusicCassaCore musicCassaCoreInstance = null; + private static Set<String> set = Collections.synchronizedSet(new HashSet<String>()); private MusicCassaCore() { // not going to happen @@ -92,9 +96,6 @@ public class MusicCassaCore implements MusicCoreService { return musicCassaCoreInstance; } - - - public static CassaLockStore getLockingServiceHandle() throws MusicLockingException { logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle"); long start = System.currentTimeMillis(); @@ -112,24 +113,98 @@ public class MusicCassaCore implements MusicCoreService { return mLockHandle; } - public String createLockReference(String fullyQualifiedKey) throws MusicLockingException { - return createLockReference(fullyQualifiedKey, LockType.WRITE); + public String createLockReferenceAtomic(String fullyQualifiedKey) throws MusicLockingException { + return createLockReferenceAtomic(fullyQualifiedKey, LockType.WRITE); } - public String createLockReference(String fullyQualifiedKey, LockType locktype) throws MusicLockingException { - return createLockReference(fullyQualifiedKey, locktype, null); + /** + * This will be called for Atomic calls + * + */ + public String createLockReferenceAtomic(String fullyQualifiedKey, LockType locktype) throws MusicLockingException { + String[] splitString = fullyQualifiedKey.split("\\."); + if (splitString.length < 3) { + throw new MusicLockingException("Missing or incorrect lock details. Check table or key name."); + } + String keyspace = splitString[0]; + String table = splitString[1]; + String lockName = splitString[2]; + + logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName); + long start = 0L; + long end = 0L; + String lockReference = null; + LockObject peek = null; + + /** Lets check for an existing lock. + * This will allow us to limit the amount of requests going forward. + */ + start = System.currentTimeMillis(); + try { + peek = getLockingServiceHandle().peekLockQueue(keyspace, table, lockName); + } catch (MusicServiceException | MusicQueryException e) { + //logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(),e); + throw new MusicLockingException("Error getting lockholder info for key [" + lockName +"]:" + e.getMessage()); + } + + if(peek!=null && (peek.getLocktype()!=null && peek.getLocktype().equals(LockType.WRITE)) && peek.getAcquireTime()!=null && peek.getLockRef()!=null) { + long currentTime = System.currentTimeMillis(); + if((currentTime-Long.parseLong(peek.getAcquireTime()))<MusicUtil.getDefaultLockLeasePeriod()){ + //logger.info(EELFLoggerDelegate.applicationLogger,"Lock holder exists and lease not expired. Please try again for key="+lockName); + throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. Please try again."); + } + } + end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to check for lock reference for key [" + lockName + "]:" + (end - start) + " ms"); + + start = System.currentTimeMillis(); + /* We are Creating a Thread safe set and adding the key to the set. + * if a key exists then it wil be passed over and not go to the lock creation. + * If a key doesn't exist then it will set the value in the set and continue to create a lock. + * + * This will ensure that no 2 threads using the same key will be able to try to create a lock + * This wil in turn squash the amout of LWT Chatter in Cassandra an reduce the amount of + * WriteTimeoutExceptions being experiences on single keys. + */ + if ( set.add(fullyQualifiedKey)) { + try { + lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, locktype,null); + set.remove(fullyQualifiedKey); + } catch (MusicLockingException | MusicServiceException | MusicQueryException e) { + set.remove(fullyQualifiedKey); + throw new MusicLockingException(e.getMessage()); + } catch (Exception e) { + set.remove(fullyQualifiedKey); + e.printStackTrace(); + logger.error(EELFLoggerDelegate.applicationLogger,"Exception in creatLockEnforced:"+ e.getMessage(),e); + throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. " + e.getMessage()); + } + } else { + throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. Please try again."); + } + end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.debugLogger,"### Set = " + set); + logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference for key [" + lockName + "]:" + (end - start) + " ms"); + return lockReference; + + //return createLockReference(fullyQualifiedKey, locktype, null); } public String createLockReference(String fullyQualifiedKey, LockType locktype, String owner) throws MusicLockingException { String[] splitString = fullyQualifiedKey.split("\\."); + if (splitString.length < 3) { + throw new MusicLockingException("Missing or incorrect lock details. Check table or key name."); + } String keyspace = splitString[0]; String table = splitString[1]; String lockName = splitString[2]; logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName); - long start = System.currentTimeMillis(); + long start = 0L; + long end = 0L; String lockReference = null; + /* Check for a Deadlock */ try { boolean deadlock = getLockingServiceHandle().checkForDeadlock(keyspace, table, lockName, locktype, owner, false); if (deadlock) { @@ -144,18 +219,21 @@ public class MusicCassaCore implements MusicCoreService { logger.error(EELFLoggerDelegate.applicationLogger, e); throw new MusicLockingException("Unable to check for deadlock. " + e.getMessage(), e); } - + end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to check for deadlock for key [" + lockName + "]:" + (end - start) + " ms"); + + start = System.currentTimeMillis(); try { lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, locktype, owner); } catch (MusicLockingException | MusicServiceException | MusicQueryException e) { - logger.error(EELFLoggerDelegate.applicationLogger, e); - throw new MusicLockingException("Unable to create lock reference. " + e.getMessage(), e); + logger.info(EELFLoggerDelegate.applicationLogger,e.getMessage(),e); + throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. Please try again: " + e.getMessage()); } catch (Exception e) { - logger.error(EELFLoggerDelegate.applicationLogger, e); + logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage(),e); throw new MusicLockingException("Unable to create lock reference. " + e.getMessage(), e); } - long end = System.currentTimeMillis(); - logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms"); + end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference for key [" + lockName + "]:" + (end - start) + " ms"); return lockReference; } @@ -164,7 +242,6 @@ public class MusicCassaCore implements MusicCoreService { String keyspace = splitString[0].substring(1);//remove '$' String table = splitString[1]; String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$")); - String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$")); String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end logger.info(EELFLoggerDelegate.applicationLogger,"Attempting to promote lock " + lockId); @@ -733,7 +810,7 @@ public class MusicCassaCore implements MusicCoreService { throws MusicLockingException, MusicQueryException, MusicServiceException { long start = System.currentTimeMillis(); String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey; - String lockId = createLockReference(fullyQualifiedKey, LockType.WRITE); + String lockId = createLockReferenceAtomic(fullyQualifiedKey, LockType.WRITE); long lockCreationTime = System.currentTimeMillis(); ReturnType lockAcqResult = null; logger.info(EELFLoggerDelegate.applicationLogger, @@ -803,8 +880,7 @@ public class MusicCassaCore implements MusicCoreService { public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey, PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException { String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey; - String lockId = createLockReference(fullyQualifiedKey, LockType.READ); - long leasePeriod = MusicUtil.getDefaultLockLeasePeriod(); + String lockId = createLockReferenceAtomic(fullyQualifiedKey, LockType.READ); ReturnType lockAcqResult = null; ResultSet result = null; logger.info(EELFLoggerDelegate.applicationLogger, "Acquiring lock for atomicGet() : " + queryObject.getQuery()); |