aboutsummaryrefslogtreecommitdiffstats
path: root/music-core
diff options
context:
space:
mode:
authorThomas Nelson (arthurdent3) <nelson24@att.com>2019-10-02 18:09:23 -0400
committerThomas Nelson (arthurdent3) <nelson24@att.com>2019-10-03 17:28:55 -0400
commit48b02dfdc78a6412d568a14b015ccb9439a1dbb5 (patch)
treeae9dfd7d47d643fcd7e1b4206b525ee0f98f1550 /music-core
parenta681a9e295dc2b8f35dd42251f795d0079471ac0 (diff)
Update locking to use Threadsafe set
Had to create whole new method for Atomic Lock Creation. Added thread safe set to contol lock creation per key. Updated the deadlock to use local_quorum Removed some uneeded import Updated some logging. Issue-ID: MUSIC-512 Signed-off-by: Thomas Nelson (arthurdent3) <nelson24@att.com> Change-Id: I7e1a4c34de5dc9a0e90adf30d1f4d1bac698ceae Signed-off-by: Thomas Nelson (arthurdent3) <nelson24@att.com>
Diffstat (limited to 'music-core')
-rwxr-xr-xmusic-core/src/main/java/org/onap/music/datastore/MusicDataStore.java25
-rw-r--r--music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java2
-rw-r--r--music-core/src/main/java/org/onap/music/main/MusicCore.java8
-rw-r--r--music-core/src/main/java/org/onap/music/service/MusicCoreService.java4
-rw-r--r--music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java112
5 files changed, 115 insertions, 36 deletions
diff --git a/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java b/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java
index 97fc1d33..9ccff828 100755
--- a/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java
+++ b/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java
@@ -25,14 +25,8 @@
package org.onap.music.datastore;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Enumeration;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import org.onap.music.eelf.logging.EELFLoggerDelegate;
@@ -58,12 +52,9 @@ import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.TableMetadata;
-import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.core.exceptions.AlreadyExistsException;
import com.datastax.driver.core.exceptions.InvalidQueryException;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
-import com.datastax.driver.extras.codecs.enums.EnumOrdinalCodec;
/**
* @author nelson24
@@ -73,6 +64,7 @@ public class MusicDataStore {
public static final String CONSISTENCY_LEVEL_ONE = "ONE";
public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
+ public static final String CONSISTENCY_LEVEL_LOCAL_QUORUM = "LOCAL_QUORUM";
private Session session;
private Cluster cluster;
@@ -511,7 +503,18 @@ public class MusicDataStore {
throws MusicServiceException, MusicQueryException {
return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
}
-
+
+ /**
+ *
+ * This method performs DDL operation on Cassandra using consistency level LOCAL_QUORUM.
+ *
+ * @param queryObject Object containing cassandra prepared query and values.
+ */
+ public ResultSet executeLocalQuorumConsistencyGet(PreparedQueryObject queryObject)
+ throws MusicServiceException, MusicQueryException {
+ return executeGet(queryObject, CONSISTENCY_LEVEL_LOCAL_QUORUM);
+ }
+
/**
*
* This method performs DDL operation on Cassandra using consistency level QUORUM.
@@ -522,5 +525,5 @@ public class MusicDataStore {
throws MusicServiceException, MusicQueryException {
return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM);
}
-
+
}
diff --git a/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
index 10898476..edce3fff 100644
--- a/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
+++ b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
@@ -480,7 +480,7 @@ public class CassaLockStore {
DeadlockDetectionUtil ddu = new DeadlockDetectionUtil();
- ResultSet rs = dsHandle.executeQuorumConsistencyGet(queryObject);
+ ResultSet rs = dsHandle.executeLocalQuorumConsistencyGet(queryObject);
logger.debug("rs has " + rs.getAvailableWithoutFetching() + (rs.isFullyFetched()?"":" (or more!)") );
Iterator<Row> it = rs.iterator();
while (it.hasNext()) {
diff --git a/music-core/src/main/java/org/onap/music/main/MusicCore.java b/music-core/src/main/java/org/onap/music/main/MusicCore.java
index 658f2124..226dfb07 100644
--- a/music-core/src/main/java/org/onap/music/main/MusicCore.java
+++ b/music-core/src/main/java/org/onap/music/main/MusicCore.java
@@ -81,12 +81,12 @@ public class MusicCore {
return musicCore.acquireLockWithLease(key, lockId, leasePeriod);
}
- public static String createLockReference(String fullyQualifiedKey) throws MusicLockingException {
- return musicCore.createLockReference(fullyQualifiedKey);
+ public static String createLockReferenceAtomic(String fullyQualifiedKey) throws MusicLockingException {
+ return musicCore.createLockReferenceAtomic(fullyQualifiedKey);
}
- public static String createLockReference(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
- return musicCore.createLockReference(fullyQualifiedKey, locktype);
+ public static String createLockReferenceAtomic(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
+ return musicCore.createLockReferenceAtomic(fullyQualifiedKey, locktype);
}
public static String createLockReference(String fullyQualifiedKey, LockType locktype, String owner) throws MusicLockingException {
diff --git a/music-core/src/main/java/org/onap/music/service/MusicCoreService.java b/music-core/src/main/java/org/onap/music/service/MusicCoreService.java
index b3226906..7629eae2 100644
--- a/music-core/src/main/java/org/onap/music/service/MusicCoreService.java
+++ b/music-core/src/main/java/org/onap/music/service/MusicCoreService.java
@@ -89,14 +89,14 @@ public interface MusicCoreService {
* @param fullyQualifiedKey the key to create a lock on
* @see {@link #creatLockReference(String, LockType)}
*/
- public String createLockReference(String fullyQualifiedKey) throws MusicLockingException; // lock name
+ public String createLockReferenceAtomic(String fullyQualifiedKey) throws MusicLockingException; // lock name
/**
* Create a lock ref in the music lock store
* @param fullyQualifiedKey the key to create a lock on
* @param locktype the type of lock create, see {@link LockType}
*/
- public String createLockReference(String fullyQualifiedKey, LockType locktype) throws MusicLockingException;
+ public String createLockReferenceAtomic(String fullyQualifiedKey, LockType locktype) throws MusicLockingException;
/**
* Create a lock ref in the music lock store
diff --git a/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java b/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
index 0d2e3f0a..c7c7cddc 100644
--- a/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
+++ b/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
@@ -26,8 +26,11 @@
package org.onap.music.service.impl;
import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.StringTokenizer;
import javax.ws.rs.core.MultivaluedMap;
@@ -71,6 +74,7 @@ public class MusicCassaCore implements MusicCoreService {
private static CassaLockStore mLockHandle = null;
private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
private static MusicCassaCore musicCassaCoreInstance = null;
+ private static Set<String> set = Collections.synchronizedSet(new HashSet<String>());
private MusicCassaCore() {
// not going to happen
@@ -92,9 +96,6 @@ public class MusicCassaCore implements MusicCoreService {
return musicCassaCoreInstance;
}
-
-
-
public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
long start = System.currentTimeMillis();
@@ -112,24 +113,98 @@ public class MusicCassaCore implements MusicCoreService {
return mLockHandle;
}
- public String createLockReference(String fullyQualifiedKey) throws MusicLockingException {
- return createLockReference(fullyQualifiedKey, LockType.WRITE);
+ public String createLockReferenceAtomic(String fullyQualifiedKey) throws MusicLockingException {
+ return createLockReferenceAtomic(fullyQualifiedKey, LockType.WRITE);
}
- public String createLockReference(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
- return createLockReference(fullyQualifiedKey, locktype, null);
+ /**
+ * This will be called for Atomic calls
+ *
+ */
+ public String createLockReferenceAtomic(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
+ String[] splitString = fullyQualifiedKey.split("\\.");
+ if (splitString.length < 3) {
+ throw new MusicLockingException("Missing or incorrect lock details. Check table or key name.");
+ }
+ String keyspace = splitString[0];
+ String table = splitString[1];
+ String lockName = splitString[2];
+
+ logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
+ long start = 0L;
+ long end = 0L;
+ String lockReference = null;
+ LockObject peek = null;
+
+ /** Lets check for an existing lock.
+ * This will allow us to limit the amount of requests going forward.
+ */
+ start = System.currentTimeMillis();
+ try {
+ peek = getLockingServiceHandle().peekLockQueue(keyspace, table, lockName);
+ } catch (MusicServiceException | MusicQueryException e) {
+ //logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(),e);
+ throw new MusicLockingException("Error getting lockholder info for key [" + lockName +"]:" + e.getMessage());
+ }
+
+ if(peek!=null && (peek.getLocktype()!=null && peek.getLocktype().equals(LockType.WRITE)) && peek.getAcquireTime()!=null && peek.getLockRef()!=null) {
+ long currentTime = System.currentTimeMillis();
+ if((currentTime-Long.parseLong(peek.getAcquireTime()))<MusicUtil.getDefaultLockLeasePeriod()){
+ //logger.info(EELFLoggerDelegate.applicationLogger,"Lock holder exists and lease not expired. Please try again for key="+lockName);
+ throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. Please try again.");
+ }
+ }
+ end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to check for lock reference for key [" + lockName + "]:" + (end - start) + " ms");
+
+ start = System.currentTimeMillis();
+ /* We are Creating a Thread safe set and adding the key to the set.
+ * if a key exists then it wil be passed over and not go to the lock creation.
+ * If a key doesn't exist then it will set the value in the set and continue to create a lock.
+ *
+ * This will ensure that no 2 threads using the same key will be able to try to create a lock
+ * This wil in turn squash the amout of LWT Chatter in Cassandra an reduce the amount of
+ * WriteTimeoutExceptions being experiences on single keys.
+ */
+ if ( set.add(fullyQualifiedKey)) {
+ try {
+ lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, locktype,null);
+ set.remove(fullyQualifiedKey);
+ } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
+ set.remove(fullyQualifiedKey);
+ throw new MusicLockingException(e.getMessage());
+ } catch (Exception e) {
+ set.remove(fullyQualifiedKey);
+ e.printStackTrace();
+ logger.error(EELFLoggerDelegate.applicationLogger,"Exception in creatLockEnforced:"+ e.getMessage(),e);
+ throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. " + e.getMessage());
+ }
+ } else {
+ throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. Please try again.");
+ }
+ end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.debugLogger,"### Set = " + set);
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference for key [" + lockName + "]:" + (end - start) + " ms");
+ return lockReference;
+
+ //return createLockReference(fullyQualifiedKey, locktype, null);
}
public String createLockReference(String fullyQualifiedKey, LockType locktype, String owner) throws MusicLockingException {
String[] splitString = fullyQualifiedKey.split("\\.");
+ if (splitString.length < 3) {
+ throw new MusicLockingException("Missing or incorrect lock details. Check table or key name.");
+ }
String keyspace = splitString[0];
String table = splitString[1];
String lockName = splitString[2];
logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
- long start = System.currentTimeMillis();
+ long start = 0L;
+ long end = 0L;
String lockReference = null;
+ /* Check for a Deadlock */
try {
boolean deadlock = getLockingServiceHandle().checkForDeadlock(keyspace, table, lockName, locktype, owner, false);
if (deadlock) {
@@ -144,18 +219,21 @@ public class MusicCassaCore implements MusicCoreService {
logger.error(EELFLoggerDelegate.applicationLogger, e);
throw new MusicLockingException("Unable to check for deadlock. " + e.getMessage(), e);
}
-
+ end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to check for deadlock for key [" + lockName + "]:" + (end - start) + " ms");
+
+ start = System.currentTimeMillis();
try {
lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, locktype, owner);
} catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
- logger.error(EELFLoggerDelegate.applicationLogger, e);
- throw new MusicLockingException("Unable to create lock reference. " + e.getMessage(), e);
+ logger.info(EELFLoggerDelegate.applicationLogger,e.getMessage(),e);
+ throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. Please try again: " + e.getMessage());
} catch (Exception e) {
- logger.error(EELFLoggerDelegate.applicationLogger, e);
+ logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage(),e);
throw new MusicLockingException("Unable to create lock reference. " + e.getMessage(), e);
}
- long end = System.currentTimeMillis();
- logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
+ end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference for key [" + lockName + "]:" + (end - start) + " ms");
return lockReference;
}
@@ -164,7 +242,6 @@ public class MusicCassaCore implements MusicCoreService {
String keyspace = splitString[0].substring(1);//remove '$'
String table = splitString[1];
String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
- String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
logger.info(EELFLoggerDelegate.applicationLogger,"Attempting to promote lock " + lockId);
@@ -733,7 +810,7 @@ public class MusicCassaCore implements MusicCoreService {
throws MusicLockingException, MusicQueryException, MusicServiceException {
long start = System.currentTimeMillis();
String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
- String lockId = createLockReference(fullyQualifiedKey, LockType.WRITE);
+ String lockId = createLockReferenceAtomic(fullyQualifiedKey, LockType.WRITE);
long lockCreationTime = System.currentTimeMillis();
ReturnType lockAcqResult = null;
logger.info(EELFLoggerDelegate.applicationLogger,
@@ -803,8 +880,7 @@ public class MusicCassaCore implements MusicCoreService {
public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
- String lockId = createLockReference(fullyQualifiedKey, LockType.READ);
- long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
+ String lockId = createLockReferenceAtomic(fullyQualifiedKey, LockType.READ);
ReturnType lockAcqResult = null;
ResultSet result = null;
logger.info(EELFLoggerDelegate.applicationLogger, "Acquiring lock for atomicGet() : " + queryObject.getQuery());