aboutsummaryrefslogtreecommitdiffstats
path: root/music-core
diff options
context:
space:
mode:
Diffstat (limited to 'music-core')
-rwxr-xr-xmusic-core/src/main/java/org/onap/music/datastore/MusicDataStore.java25
-rw-r--r--music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java2
-rw-r--r--music-core/src/main/java/org/onap/music/main/MusicCore.java8
-rw-r--r--music-core/src/main/java/org/onap/music/service/MusicCoreService.java4
-rw-r--r--music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java112
5 files changed, 115 insertions, 36 deletions
diff --git a/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java b/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java
index 97fc1d33..9ccff828 100755
--- a/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java
+++ b/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java
@@ -25,14 +25,8 @@
package org.onap.music.datastore;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Enumeration;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import org.onap.music.eelf.logging.EELFLoggerDelegate;
@@ -58,12 +52,9 @@ import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.TableMetadata;
-import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.core.exceptions.AlreadyExistsException;
import com.datastax.driver.core.exceptions.InvalidQueryException;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
-import com.datastax.driver.extras.codecs.enums.EnumOrdinalCodec;
/**
* @author nelson24
@@ -73,6 +64,7 @@ public class MusicDataStore {
public static final String CONSISTENCY_LEVEL_ONE = "ONE";
public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
+ public static final String CONSISTENCY_LEVEL_LOCAL_QUORUM = "LOCAL_QUORUM";
private Session session;
private Cluster cluster;
@@ -511,7 +503,18 @@ public class MusicDataStore {
throws MusicServiceException, MusicQueryException {
return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
}
-
+
+ /**
+ *
+ * This method performs DDL operation on Cassandra using consistency level LOCAL_QUORUM.
+ *
+ * @param queryObject Object containing cassandra prepared query and values.
+ */
+ public ResultSet executeLocalQuorumConsistencyGet(PreparedQueryObject queryObject)
+ throws MusicServiceException, MusicQueryException {
+ return executeGet(queryObject, CONSISTENCY_LEVEL_LOCAL_QUORUM);
+ }
+
/**
*
* This method performs DDL operation on Cassandra using consistency level QUORUM.
@@ -522,5 +525,5 @@ public class MusicDataStore {
throws MusicServiceException, MusicQueryException {
return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM);
}
-
+
}
diff --git a/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
index 10898476..edce3fff 100644
--- a/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
+++ b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
@@ -480,7 +480,7 @@ public class CassaLockStore {
DeadlockDetectionUtil ddu = new DeadlockDetectionUtil();
- ResultSet rs = dsHandle.executeQuorumConsistencyGet(queryObject);
+ ResultSet rs = dsHandle.executeLocalQuorumConsistencyGet(queryObject);
logger.debug("rs has " + rs.getAvailableWithoutFetching() + (rs.isFullyFetched()?"":" (or more!)") );
Iterator<Row> it = rs.iterator();
while (it.hasNext()) {
diff --git a/music-core/src/main/java/org/onap/music/main/MusicCore.java b/music-core/src/main/java/org/onap/music/main/MusicCore.java
index 658f2124..226dfb07 100644
--- a/music-core/src/main/java/org/onap/music/main/MusicCore.java
+++ b/music-core/src/main/java/org/onap/music/main/MusicCore.java
@@ -81,12 +81,12 @@ public class MusicCore {
return musicCore.acquireLockWithLease(key, lockId, leasePeriod);
}
- public static String createLockReference(String fullyQualifiedKey) throws MusicLockingException {
- return musicCore.createLockReference(fullyQualifiedKey);
+ public static String createLockReferenceAtomic(String fullyQualifiedKey) throws MusicLockingException {
+ return musicCore.createLockReferenceAtomic(fullyQualifiedKey);
}
- public static String createLockReference(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
- return musicCore.createLockReference(fullyQualifiedKey, locktype);
+ public static String createLockReferenceAtomic(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
+ return musicCore.createLockReferenceAtomic(fullyQualifiedKey, locktype);
}
public static String createLockReference(String fullyQualifiedKey, LockType locktype, String owner) throws MusicLockingException {
diff --git a/music-core/src/main/java/org/onap/music/service/MusicCoreService.java b/music-core/src/main/java/org/onap/music/service/MusicCoreService.java
index b3226906..7629eae2 100644
--- a/music-core/src/main/java/org/onap/music/service/MusicCoreService.java
+++ b/music-core/src/main/java/org/onap/music/service/MusicCoreService.java
@@ -89,14 +89,14 @@ public interface MusicCoreService {
* @param fullyQualifiedKey the key to create a lock on
* @see {@link #creatLockReference(String, LockType)}
*/
- public String createLockReference(String fullyQualifiedKey) throws MusicLockingException; // lock name
+ public String createLockReferenceAtomic(String fullyQualifiedKey) throws MusicLockingException; // lock name
/**
* Create a lock ref in the music lock store
* @param fullyQualifiedKey the key to create a lock on
* @param locktype the type of lock create, see {@link LockType}
*/
- public String createLockReference(String fullyQualifiedKey, LockType locktype) throws MusicLockingException;
+ public String createLockReferenceAtomic(String fullyQualifiedKey, LockType locktype) throws MusicLockingException;
/**
* Create a lock ref in the music lock store
diff --git a/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java b/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
index 0d2e3f0a..c7c7cddc 100644
--- a/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
+++ b/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
@@ -26,8 +26,11 @@
package org.onap.music.service.impl;
import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.StringTokenizer;
import javax.ws.rs.core.MultivaluedMap;
@@ -71,6 +74,7 @@ public class MusicCassaCore implements MusicCoreService {
private static CassaLockStore mLockHandle = null;
private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
private static MusicCassaCore musicCassaCoreInstance = null;
+ private static Set<String> set = Collections.synchronizedSet(new HashSet<String>());
private MusicCassaCore() {
// not going to happen
@@ -92,9 +96,6 @@ public class MusicCassaCore implements MusicCoreService {
return musicCassaCoreInstance;
}
-
-
-
public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
long start = System.currentTimeMillis();
@@ -112,24 +113,98 @@ public class MusicCassaCore implements MusicCoreService {
return mLockHandle;
}
- public String createLockReference(String fullyQualifiedKey) throws MusicLockingException {
- return createLockReference(fullyQualifiedKey, LockType.WRITE);
+ public String createLockReferenceAtomic(String fullyQualifiedKey) throws MusicLockingException {
+ return createLockReferenceAtomic(fullyQualifiedKey, LockType.WRITE);
}
- public String createLockReference(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
- return createLockReference(fullyQualifiedKey, locktype, null);
+ /**
+ * This will be called for Atomic calls
+ *
+ */
+ public String createLockReferenceAtomic(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
+ String[] splitString = fullyQualifiedKey.split("\\.");
+ if (splitString.length < 3) {
+ throw new MusicLockingException("Missing or incorrect lock details. Check table or key name.");
+ }
+ String keyspace = splitString[0];
+ String table = splitString[1];
+ String lockName = splitString[2];
+
+ logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
+ long start = 0L;
+ long end = 0L;
+ String lockReference = null;
+ LockObject peek = null;
+
+ /** Lets check for an existing lock.
+ * This will allow us to limit the amount of requests going forward.
+ */
+ start = System.currentTimeMillis();
+ try {
+ peek = getLockingServiceHandle().peekLockQueue(keyspace, table, lockName);
+ } catch (MusicServiceException | MusicQueryException e) {
+ //logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(),e);
+ throw new MusicLockingException("Error getting lockholder info for key [" + lockName +"]:" + e.getMessage());
+ }
+
+ if(peek!=null && (peek.getLocktype()!=null && peek.getLocktype().equals(LockType.WRITE)) && peek.getAcquireTime()!=null && peek.getLockRef()!=null) {
+ long currentTime = System.currentTimeMillis();
+ if((currentTime-Long.parseLong(peek.getAcquireTime()))<MusicUtil.getDefaultLockLeasePeriod()){
+ //logger.info(EELFLoggerDelegate.applicationLogger,"Lock holder exists and lease not expired. Please try again for key="+lockName);
+ throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. Please try again.");
+ }
+ }
+ end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to check for lock reference for key [" + lockName + "]:" + (end - start) + " ms");
+
+ start = System.currentTimeMillis();
+ /* We are Creating a Thread safe set and adding the key to the set.
+ * if a key exists then it wil be passed over and not go to the lock creation.
+ * If a key doesn't exist then it will set the value in the set and continue to create a lock.
+ *
+ * This will ensure that no 2 threads using the same key will be able to try to create a lock
+ * This wil in turn squash the amout of LWT Chatter in Cassandra an reduce the amount of
+ * WriteTimeoutExceptions being experiences on single keys.
+ */
+ if ( set.add(fullyQualifiedKey)) {
+ try {
+ lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, locktype,null);
+ set.remove(fullyQualifiedKey);
+ } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
+ set.remove(fullyQualifiedKey);
+ throw new MusicLockingException(e.getMessage());
+ } catch (Exception e) {
+ set.remove(fullyQualifiedKey);
+ e.printStackTrace();
+ logger.error(EELFLoggerDelegate.applicationLogger,"Exception in creatLockEnforced:"+ e.getMessage(),e);
+ throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. " + e.getMessage());
+ }
+ } else {
+ throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. Please try again.");
+ }
+ end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.debugLogger,"### Set = " + set);
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference for key [" + lockName + "]:" + (end - start) + " ms");
+ return lockReference;
+
+ //return createLockReference(fullyQualifiedKey, locktype, null);
}
public String createLockReference(String fullyQualifiedKey, LockType locktype, String owner) throws MusicLockingException {
String[] splitString = fullyQualifiedKey.split("\\.");
+ if (splitString.length < 3) {
+ throw new MusicLockingException("Missing or incorrect lock details. Check table or key name.");
+ }
String keyspace = splitString[0];
String table = splitString[1];
String lockName = splitString[2];
logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
- long start = System.currentTimeMillis();
+ long start = 0L;
+ long end = 0L;
String lockReference = null;
+ /* Check for a Deadlock */
try {
boolean deadlock = getLockingServiceHandle().checkForDeadlock(keyspace, table, lockName, locktype, owner, false);
if (deadlock) {
@@ -144,18 +219,21 @@ public class MusicCassaCore implements MusicCoreService {
logger.error(EELFLoggerDelegate.applicationLogger, e);
throw new MusicLockingException("Unable to check for deadlock. " + e.getMessage(), e);
}
-
+ end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to check for deadlock for key [" + lockName + "]:" + (end - start) + " ms");
+
+ start = System.currentTimeMillis();
try {
lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, locktype, owner);
} catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
- logger.error(EELFLoggerDelegate.applicationLogger, e);
- throw new MusicLockingException("Unable to create lock reference. " + e.getMessage(), e);
+ logger.info(EELFLoggerDelegate.applicationLogger,e.getMessage(),e);
+ throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. Please try again: " + e.getMessage());
} catch (Exception e) {
- logger.error(EELFLoggerDelegate.applicationLogger, e);
+ logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage(),e);
throw new MusicLockingException("Unable to create lock reference. " + e.getMessage(), e);
}
- long end = System.currentTimeMillis();
- logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
+ end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference for key [" + lockName + "]:" + (end - start) + " ms");
return lockReference;
}
@@ -164,7 +242,6 @@ public class MusicCassaCore implements MusicCoreService {
String keyspace = splitString[0].substring(1);//remove '$'
String table = splitString[1];
String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
- String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
logger.info(EELFLoggerDelegate.applicationLogger,"Attempting to promote lock " + lockId);
@@ -733,7 +810,7 @@ public class MusicCassaCore implements MusicCoreService {
throws MusicLockingException, MusicQueryException, MusicServiceException {
long start = System.currentTimeMillis();
String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
- String lockId = createLockReference(fullyQualifiedKey, LockType.WRITE);
+ String lockId = createLockReferenceAtomic(fullyQualifiedKey, LockType.WRITE);
long lockCreationTime = System.currentTimeMillis();
ReturnType lockAcqResult = null;
logger.info(EELFLoggerDelegate.applicationLogger,
@@ -803,8 +880,7 @@ public class MusicCassaCore implements MusicCoreService {
public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
- String lockId = createLockReference(fullyQualifiedKey, LockType.READ);
- long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
+ String lockId = createLockReferenceAtomic(fullyQualifiedKey, LockType.READ);
ReturnType lockAcqResult = null;
ResultSet result = null;
logger.info(EELFLoggerDelegate.applicationLogger, "Acquiring lock for atomicGet() : " + queryObject.getQuery());