aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorTschaen, Brendan <ctschaen@att.com>2019-09-25 14:54:46 -0400
committerTschaen, Brendan <ctschaen@att.com>2019-09-26 13:52:01 -0400
commit90d35b7f55d1ea3eb6ccf8218d9ac42412fd0d90 (patch)
tree972d5110063f0f56405ba1c29f0c8534f17f01e8 /src
parentc8adfc5ea25d6ffd45edd5213195ce0c4568b57f (diff)
Read lock promotion
Change-Id: Ib2515c728503fb729e6ecc2e09973bbfa9e2e317 Issue-ID: MUSIC-508 Signed-off-by: Tschaen, Brendan <ctschaen@att.com>
Diffstat (limited to 'src')
-rwxr-xr-xsrc/main/java/org/onap/music/datastore/MusicDataStore.java101
-rw-r--r--src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java118
-rw-r--r--src/main/java/org/onap/music/lockingservice/cassandra/LockType.java2
-rw-r--r--src/main/java/org/onap/music/main/MusicCore.java4
-rw-r--r--src/main/java/org/onap/music/rest/RestMusicLocksAPI.java61
-rw-r--r--src/main/java/org/onap/music/service/MusicCoreService.java9
-rw-r--r--src/main/java/org/onap/music/service/impl/MusicCassaCore.java20
-rw-r--r--src/test/java/org/onap/music/unittests/CassandraCQL.java6
-rw-r--r--src/test/java/org/onap/music/unittests/TstRestMusicLockAPI.java130
9 files changed, 351 insertions, 100 deletions
diff --git a/src/main/java/org/onap/music/datastore/MusicDataStore.java b/src/main/java/org/onap/music/datastore/MusicDataStore.java
index 6555ea21..5a658688 100755
--- a/src/main/java/org/onap/music/datastore/MusicDataStore.java
+++ b/src/main/java/org/onap/music/datastore/MusicDataStore.java
@@ -41,6 +41,7 @@ import org.onap.music.eelf.logging.format.ErrorSeverity;
import org.onap.music.eelf.logging.format.ErrorTypes;
import org.onap.music.exceptions.MusicQueryException;
import org.onap.music.exceptions.MusicServiceException;
+import org.onap.music.lockingservice.cassandra.LockType;
import org.onap.music.main.CipherUtil;
import org.onap.music.main.MusicUtil;
import com.datastax.driver.core.Cluster;
@@ -57,9 +58,12 @@ import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.core.exceptions.AlreadyExistsException;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
+import com.datastax.driver.extras.codecs.enums.EnumOrdinalCodec;
/**
* @author nelson24
@@ -93,15 +97,23 @@ public class MusicDataStore {
public void setCluster(Cluster cluster) {
this.cluster = cluster;
}
+
+ public Cluster getCluster() {
+ return this.cluster;
+ }
private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
/**
- *
+ * Connect to default Cassandra address
*/
public MusicDataStore() {
- connectToCassaCluster();
+ try {
+ connectToCassaCluster(MusicUtil.getMyCassaHost());
+ } catch (MusicServiceException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
+ }
}
@@ -129,82 +141,6 @@ public class MusicDataStore {
/**
*
- * @return
- */
- private ArrayList<String> getAllPossibleLocalIps() {
- ArrayList<String> allPossibleIps = new ArrayList<>();
- try {
- Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
- while (en.hasMoreElements()) {
- NetworkInterface ni = en.nextElement();
- Enumeration<InetAddress> ee = ni.getInetAddresses();
- while (ee.hasMoreElements()) {
- InetAddress ia = ee.nextElement();
- allPossibleIps.add(ia.getHostAddress());
- }
- }
- } catch (SocketException e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR,
- ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR, e);
- }catch(Exception e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes
- .GENERALSERVICEERROR, e);
- }
- return allPossibleIps;
- }
-
- /**
- * This method iterates through all available IP addresses and connects to multiple cassandra
- * clusters.
- */
- private void connectToCassaCluster() {
- Iterator<String> it = getAllPossibleLocalIps().iterator();
- String address = "localhost";
- String[] addresses = null;
- address = MusicUtil.getMyCassaHost();
- addresses = address.split(",");
-
- logger.info(EELFLoggerDelegate.applicationLogger,
- "Connecting to cassa cluster: Iterating through possible ips:"
- + getAllPossibleLocalIps());
- PoolingOptions poolingOptions = new PoolingOptions();
- poolingOptions
- .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
- .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
- while (it.hasNext()) {
- try {
- if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
- String cassPwd = CipherUtil.decryptPKC(MusicUtil.getCassPwd());
- logger.info(EELFLoggerDelegate.applicationLogger,
- "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
- cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
- .withCredentials(MusicUtil.getCassName(), cassPwd)
- //.withLoadBalancingPolicy(new RoundRobinPolicy())
- .withoutJMXReporting()
- .withPoolingOptions(poolingOptions)
- .addContactPoints(addresses).build();
- }
- else
- cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
- //.withLoadBalancingPolicy(new RoundRobinPolicy())
- .addContactPoints(addresses).build();
-
- Metadata metadata = cluster.getMetadata();
- logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
- + metadata.getClusterName() + " at " + address);
- session = cluster.connect();
-
- break;
- } catch (NoHostAvailableException e) {
- address = it.next();
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE,
- ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR, e);
- }
- }
- }
-
- /**
- *
*/
public void close() {
session.close();
@@ -222,6 +158,7 @@ public class MusicDataStore {
poolingOptions
.setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
.setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
+
if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
String cassPwd = CipherUtil.decryptPKC(MusicUtil.getCassPwd());
logger.info(EELFLoggerDelegate.applicationLogger,
@@ -236,12 +173,18 @@ public class MusicDataStore {
cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
.withoutJMXReporting()
.withPoolingOptions(poolingOptions)
- .addContactPoints(addresses).build();
+ .addContactPoints(addresses)
+ .build();
}
+
Metadata metadata = cluster.getMetadata();
logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
+ metadata.getClusterName() + " at " + address);
+
+ EnumNameCodec<LockType> lockTypeCodec = new EnumNameCodec<LockType>(LockType.class);
+ cluster.getConfiguration().getCodecRegistry().register(lockTypeCodec);
+
try {
session = cluster.connect();
} catch (Exception ex) {
diff --git a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
index 0ec85077..10898476 100644
--- a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
+++ b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
@@ -36,9 +36,12 @@ import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.main.DeadlockDetectionUtil;
import org.onap.music.main.DeadlockDetectionUtil.OwnershipType;
import org.onap.music.main.MusicUtil;
-
+import org.onap.music.main.ResultType;
+import org.onap.music.main.ReturnType;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
/*
* This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state.
@@ -126,7 +129,7 @@ public class CassaLockStore {
table = table_prepend_name+table;
String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
+ " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, "
- + "writeLock boolean, owner text, PRIMARY KEY ((key), lockReference) ) "
+ + "lockType text, owner text, PRIMARY KEY ((key), lockReference) ) "
+ "WITH CLUSTERING ORDER BY (lockReference ASC);";
PreparedQueryObject queryObject = new PreparedQueryObject();
@@ -176,13 +179,14 @@ public class CassaLockStore {
logger.info(EELFLoggerDelegate.applicationLogger,
"Created lock reference for " + keyspace + "." + lockTable + "." + lockName + ":" + lockRef);
-
+
queryObject = new PreparedQueryObject();
+
String insQuery = "BEGIN BATCH" +
" UPDATE " + keyspace + "." + lockTable +
" SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
" INSERT INTO " + keyspace + "." + lockTable +
- "(key, lockReference, createTime, acquireTime, writeLock, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
+ "(key, lockReference, createTime, acquireTime, lockType, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
queryObject.addValue(lockRef);
queryObject.addValue(lockName);
@@ -193,7 +197,7 @@ public class CassaLockStore {
queryObject.addValue(lockRef);
queryObject.addValue(String.valueOf(lockEpochMillis));
queryObject.addValue("0");
- queryObject.addValue(locktype==LockType.WRITE ? true : false );
+ queryObject.addValue(locktype);
queryObject.addValue(owner);
queryObject.appendQueryString(insQuery);
boolean pResult = dsHandle.executePut(queryObject, "critical");
@@ -285,7 +289,7 @@ public class CassaLockStore {
String lockReference = "" + row.getLong("lockReference");
String createTime = row.getString("createTime");
String acquireTime = row.getString("acquireTime");
- LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
+ LockType locktype = row.get("lockType", LockType.class);
String owner = row.getString("owner");
return new LockObject(true, lockReference, createTime, acquireTime, locktype, owner);
@@ -313,7 +317,7 @@ public class CassaLockStore {
return lockHolders;
}
lockReference.append(lock).append(row.getLong("lockReference"));
- if (row.isNull("writeLock") || row.getBool("writeLock")) {
+ if (row.get("lockType", LockType.class)!=LockType.WRITE) {
if (topOfQueue) {
lockHolders.add(lockReference.toString());
break;
@@ -356,7 +360,7 @@ public class CassaLockStore {
boolean topOfQueue = true;
for (Row row : rs) {
String lockReference = "" + row.getLong("lockReference");
- if (row.isNull("writeLock") || row.getBool("writeLock")) {
+ if (row.get("lockType", LockType.class)==LockType.WRITE) {
if (topOfQueue && lockRef.equals(lockReference)) {
return true;
} else {
@@ -404,7 +408,7 @@ public class CassaLockStore {
String lockReference = "" + row.getLong("lockReference");
String createTime = row.getString("createTime");
String acquireTime = row.getString("acquireTime");
- LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
+ LockType locktype = row.get("lockType", LockType.class);
boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef);
String owner = row.getString("owner");
@@ -456,11 +460,9 @@ public class CassaLockStore {
public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) {
table = table_prepend_name + table;
- PreparedQueryObject queryObject = new PreparedQueryObject();
Long lockReferenceL = Long.parseLong(lockReference);
String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis()
+ "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;";
- queryObject.appendQueryString(updateQuery);
//cannot use executePut because we need to ignore music timestamp adjustments for lock store
dsHandle.getSession().execute(updateQuery);
@@ -473,7 +475,8 @@ public class CassaLockStore {
String lockTable = table_prepend_name + table;
PreparedQueryObject queryObject = new PreparedQueryObject();
queryObject.appendQueryString("SELECT key, acquiretime, owner FROM " + keyspace + "." + lockTable);
- queryObject.appendQueryString(" WHERE writelock = True ALLOW FILTERING");
+ queryObject.appendQueryString(" WHERE lockType = ? ALLOW FILTERING");
+ queryObject.addValue(LockType.WRITE);
DeadlockDetectionUtil ddu = new DeadlockDetectionUtil();
@@ -506,5 +509,96 @@ public class CassaLockStore {
return toRet;
}
+ public ReturnType promoteLock(String keyspace, String table, String key, String lockRef)
+ throws MusicLockingException, MusicServiceException, MusicQueryException {
+ String lockqtable = table_prepend_name + table;
+ String selectQuery = "select * from " + keyspace + "." + lockqtable + " where key=?;";
+
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ queryObject.appendQueryString(selectQuery);
+ queryObject.addValue(key);
+ ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
+
+ long refToPromote = Long.parseLong(lockRef);
+
+ boolean topOfQueue = true;
+ boolean readBlock = false;
+ boolean seenLockToPromote = false;
+ boolean promotionOngoing = false;
+ long readBlockStart = 0;
+ long readBlockEnd = 0;
+
+
+ for (Row row : rs) {
+ long ref = row.getLong("lockreference");
+ LockType lockType = row.get("lockType", LockType.class);
+
+ if (refToPromote==ref) {
+ if (promotionOngoing) {
+ return new ReturnType(ResultType.FAILURE, "Can't promote, already promoting another lockref.");
+ }
+ seenLockToPromote = true;
+ if (!topOfQueue) {
+ readBlockStart = ref;
+ readBlockEnd = ref;
+ break;
+ }
+ } else if (!seenLockToPromote && refToPromote<ref) {
+ return new ReturnType(ResultType.FAILURE, "Lockref does not exist.");
+ }
+
+ if (lockType==LockType.READ || lockType==LockType.PROMOTING) {
+ if (!readBlock) {
+ readBlockStart = ref;
+ readBlock = true;
+ }
+ if (readBlock) {
+ readBlockEnd = ref;
+ }
+ if (lockType==LockType.PROMOTING) {
+ promotionOngoing = true;
+ }
+ }
+
+ if (lockType==LockType.WRITE) {
+ if (refToPromote==ref) {
+ return new ReturnType(ResultType.FAILURE, "Lockref is already write.");
+ }
+ if (readBlock) {
+ readBlock = false;
+ promotionOngoing = false;
+ if (seenLockToPromote) {
+ break;
+ }
+ //can no longer be lock holder after this
+ topOfQueue = false;
+ }
+ }
+ }
+
+ if (readBlockStart<=refToPromote && refToPromote<=readBlockEnd) {
+ if (readBlockStart==refToPromote && refToPromote==readBlockEnd) {
+ promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.WRITE);
+ return new ReturnType(ResultType.SUCCESS, "Lock has successfully been upgraded.");
+ }
+ promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.PROMOTING);
+ return new ReturnType(ResultType.FAILURE, "Your lock upgrade is in progress. Check again to see if successful.");
+ }
+
+ //shouldn't reach here?
+ return new ReturnType(ResultType.FAILURE,"Promotion failed.");
+ }
+
+ private void promoteLockTo(String keyspace, String table, String key, String lockRef, LockType newLockType)
+ throws MusicServiceException, MusicQueryException {
+ PreparedQueryObject queryObject =
+ new PreparedQueryObject("UPDATE " + keyspace + "." + table + " SET lockType=? WHERE key='" + key
+ + "' AND lockReference = " + lockRef + " IF EXISTS;", newLockType);
+
+ //cannot use executePut because we need to ignore music timestamp adjustments for lock store
+ dsHandle.executePut(queryObject, MusicUtil.QUORUM);
+ }
+
+
}
diff --git a/src/main/java/org/onap/music/lockingservice/cassandra/LockType.java b/src/main/java/org/onap/music/lockingservice/cassandra/LockType.java
index 139d5f3f..432b1c51 100644
--- a/src/main/java/org/onap/music/lockingservice/cassandra/LockType.java
+++ b/src/main/java/org/onap/music/lockingservice/cassandra/LockType.java
@@ -25,6 +25,6 @@ package org.onap.music.lockingservice.cassandra;
public enum LockType {
- READ, WRITE;
+ WRITE, READ, PROMOTING;
}
diff --git a/src/main/java/org/onap/music/main/MusicCore.java b/src/main/java/org/onap/music/main/MusicCore.java
index e889e180..658f2124 100644
--- a/src/main/java/org/onap/music/main/MusicCore.java
+++ b/src/main/java/org/onap/music/main/MusicCore.java
@@ -121,6 +121,10 @@ public class MusicCore {
public static List<String> getCurrentLockHolders(String fullyQualifiedKey) {
return musicCore.getCurrentLockHolders(fullyQualifiedKey);
}
+
+ public static ReturnType promoteLock(String lockIdToPromote) throws MusicLockingException {
+ return musicCore.promoteLock(lockIdToPromote);
+ }
public static void destroyLockRef(String lockId) throws MusicLockingException {
musicCore.destroyLockRef(lockId);
diff --git a/src/main/java/org/onap/music/rest/RestMusicLocksAPI.java b/src/main/java/org/onap/music/rest/RestMusicLocksAPI.java
index b839c7f2..321e2561 100644
--- a/src/main/java/org/onap/music/rest/RestMusicLocksAPI.java
+++ b/src/main/java/org/onap/music/rest/RestMusicLocksAPI.java
@@ -568,4 +568,65 @@ public class RestMusicLocksAPI {
}
}
+
+ /**
+ * Puts the requesting process in the q for this lock. The corresponding
+ * node will be created if it did not already exist
+ *
+ * @param lockName
+ * @return
+ * @throws Exception
+ */
+ @POST
+ @Path("/promote/{lockname}")
+ @ApiOperation(value = "Attempt to promote the lock for a single row.",
+ response = Map.class)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiResponses(value={
+ @ApiResponse(code=200, message = "Success",examples = @Example( value = {
+ @ExampleProperty(mediaType="application/json",value =
+ "\"status\" : \"SUCCESS\"}")
+ })),
+ @ApiResponse(code=400, message = "Failure",examples = @Example( value = {
+ @ExampleProperty(mediaType="application/json",value =
+ "{\"error\" : \"Unable to promote lock\","
+ + "\"status\" : \"FAILURE\"}")
+ }))
+ })
+ public Response promoteLock(
+ @ApiParam(value="Lock Id",required=true) @PathParam("lockId") String lockId,
+ @ApiParam(value = "Minor Version",required = false) @HeaderParam(XMINORVERSION) String minorVersion,
+ @ApiParam(value = "Patch Version",required = false) @HeaderParam(XPATCHVERSION) String patchVersion,
+ @ApiParam(value = "Authorization", required = true) @HeaderParam(MusicUtil.AUTHORIZATION) String authorization)
+ throws Exception {
+ try {
+ ResponseBuilder response = MusicUtil.buildVersionResponse(VERSION, minorVersion, patchVersion);
+ Map<String, Object> resultMap = MusicCore.validateLock(lockId);
+ if (resultMap.containsKey("Error")) {
+ logger.error(EELFLoggerDelegate.errorLogger,"", AppMessages.INCORRECTDATA ,ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR);
+ response.status(Status.BAD_REQUEST);
+ return response.entity(new JsonResponse(ResultType.FAILURE).setError(String.valueOf(resultMap.get("Error"))).toMap()).build();
+ }
+
+ String keyspaceName = (String) resultMap.get("keyspace");
+ EELFLoggerDelegate.mdcPut("keyspace", "( " + keyspaceName + " ) ");
+
+ try {
+ ReturnType lockStatus = MusicCore.promoteLock(lockId);
+ if ( lockStatus.getResult().equals(ResultType.SUCCESS)) {
+ response.status(Status.OK);
+ } else {
+ response.status(Status.BAD_REQUEST);
+ }
+ return response.entity(new JsonResponse(lockStatus.getResult()).setLock(lockId).setMessage(lockStatus.getMessage()).toMap()).build();
+ } catch (Exception e) {
+ logger.error(EELFLoggerDelegate.errorLogger,AppMessages.INVALIDLOCK + lockId, ErrorSeverity.CRITICAL,
+ ErrorTypes.LOCKINGERROR, e);
+ return response.status(Status.BAD_REQUEST).entity(new JsonResponse(ResultType.FAILURE).setError("Unable to promote lock").toMap()).build();
+ }
+ } finally {
+ EELFLoggerDelegate.mdcRemove("keyspace");
+ }
+ }
}
diff --git a/src/main/java/org/onap/music/service/MusicCoreService.java b/src/main/java/org/onap/music/service/MusicCoreService.java
index 1ecb2ee1..b3226906 100644
--- a/src/main/java/org/onap/music/service/MusicCoreService.java
+++ b/src/main/java/org/onap/music/service/MusicCoreService.java
@@ -133,15 +133,10 @@ public interface MusicCoreService {
public void destroyLockRef(String lockId) throws MusicLockingException;
- //public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference); // lock name, lock id
-
- //public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference)
- // throws MusicLockingException;// lock name,lock id
-
public void deleteLock(String lockName) throws MusicLockingException;
- //public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException, MusicServiceException, MusicQueryException;
-
+ public ReturnType promoteLock(String lockIdToPromote) throws MusicLockingException;
+
public List<String> getLockQueue(String fullyQualifiedKey)
throws MusicServiceException, MusicQueryException, MusicLockingException;
diff --git a/src/main/java/org/onap/music/service/impl/MusicCassaCore.java b/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
index 0c30cc74..0d2e3f0a 100644
--- a/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
+++ b/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
@@ -158,6 +158,26 @@ public class MusicCassaCore implements MusicCoreService {
logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
return lockReference;
}
+
+ public ReturnType promoteLock(String lockId) throws MusicLockingException {
+ String[] splitString = lockId.split("\\.");
+ String keyspace = splitString[0].substring(1);//remove '$'
+ String table = splitString[1];
+ String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
+ String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
+ String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
+
+ logger.info(EELFLoggerDelegate.applicationLogger,"Attempting to promote lock " + lockId);
+
+ try {
+ return getLockingServiceHandle().promoteLock(keyspace, table, primaryKeyValue, lockRef);
+ } catch (MusicServiceException e) {
+ throw new MusicLockingException("Unable to promote lock. ", e);
+ } catch (MusicQueryException e) {
+ throw new MusicLockingException("Unable to promote lock. ", e);
+ }
+
+ }
public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod)
diff --git a/src/test/java/org/onap/music/unittests/CassandraCQL.java b/src/test/java/org/onap/music/unittests/CassandraCQL.java
index 32072145..7b116bc8 100644
--- a/src/test/java/org/onap/music/unittests/CassandraCQL.java
+++ b/src/test/java/org/onap/music/unittests/CassandraCQL.java
@@ -42,9 +42,10 @@ import java.util.UUID;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.onap.music.datastore.MusicDataStore;
import org.onap.music.datastore.PreparedQueryObject;
-
+import org.onap.music.lockingservice.cassandra.LockType;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
+import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
public class CassandraCQL {
public static final String createAdminKeyspace = "CREATE KEYSPACE admin WITH REPLICATION = "
@@ -235,6 +236,9 @@ public class CassandraCQL {
EmbeddedCassandraServerHelper.startEmbeddedCassandra();
Cluster cluster = new Cluster.Builder().withoutJMXReporting().withoutMetrics().addContactPoint(address).withPort(9142).build();
cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(5000);
+ EnumNameCodec<LockType> lockTypeCodec = new EnumNameCodec<LockType>(LockType.class);
+ cluster.getConfiguration().getCodecRegistry().register(lockTypeCodec);
+
Session session = cluster.connect();
return new MusicDataStore(cluster, session);
diff --git a/src/test/java/org/onap/music/unittests/TstRestMusicLockAPI.java b/src/test/java/org/onap/music/unittests/TstRestMusicLockAPI.java
index 5669580a..e9321d25 100644
--- a/src/test/java/org/onap/music/unittests/TstRestMusicLockAPI.java
+++ b/src/test/java/org/onap/music/unittests/TstRestMusicLockAPI.java
@@ -480,6 +480,136 @@ public class TstRestMusicLockAPI {
assertTrue( ((String)respMapCreate4.get("error")).toLowerCase().indexOf("deadlock") > -1 );
}
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void test_lockPromotion() throws Exception {
+ System.out.println("Testing lock promotion");
+ createAndInsertIntoTable();
+ insertAnotherIntoTable();
+
+ // creates a lock 1
+ JsonLock jsonLock = createJsonLock(LockType.READ);
+ Response responseCreate1 = lock.createLockReference(lockName, "1", "1", authorization,
+ "abcde001-d857-4e90-b1e5-df98a3d40ce6", jsonLock, "process1", appName);
+ Map<String, Object> respMapCreate1 = (Map<String, Object>) responseCreate1.getEntity();
+ String lockRefCreate1 = ((Map<String, String>) respMapCreate1.get("lock")).get("lock");
+
+ Response respMapPromote = lock.promoteLock(lockRefCreate1, "1", "1", authorization);
+ System.out.println("Status: " + respMapPromote.getStatus() + ". Entity " + respMapPromote.getEntity());
+
+ assertEquals(200, respMapPromote.getStatus());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void test_lockPromotionReadWrite() throws Exception {
+ System.out.println("Testing lock promotion with read and writes");
+ createAndInsertIntoTable();
+ insertAnotherIntoTable();
+
+ // creates a lock 1
+ JsonLock jsonLockRead = createJsonLock(LockType.READ);
+ Response responseCreate1 = lock.createLockReference(lockName, "1", "1", authorization,
+ "abcde001-d857-4e90-b1e5-df98a3d40ce6", jsonLockRead, "process1", appName);
+ Map<String, Object> respMapCreate1 = (Map<String, Object>) responseCreate1.getEntity();
+ String lockRefCreate1 = ((Map<String, String>) respMapCreate1.get("lock")).get("lock");
+
+ JsonLock jsonLockWrite = createJsonLock(LockType.WRITE);
+ Response responseCreate2 = lock.createLockReference(lockName, "1", "1", authorization,
+ "abcde001-d857-4e90-b1e5-df98a3d40ce6", jsonLockWrite, "process1", appName);
+ Map<String, Object> respMapCreate2 = (Map<String, Object>) responseCreate2.getEntity();
+ String lockRefCreate2 = ((Map<String, String>) respMapCreate2.get("lock")).get("lock");
+
+ Response respMapPromote = lock.promoteLock(lockRefCreate1, "1", "1", authorization);
+ System.out.println("Status: " + respMapPromote.getStatus() + ". Entity " + respMapPromote.getEntity());
+
+ assertEquals(200, respMapPromote.getStatus());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void test_lockPromotionWriteRead() throws Exception {
+ System.out.println("Testing lock promotion with reads not at top of queue");
+ createAndInsertIntoTable();
+ insertAnotherIntoTable();
+
+ // creates a lock 1
+ JsonLock jsonLockWrite = createJsonLock(LockType.WRITE);
+ Response responseCreate2 = lock.createLockReference(lockName, "1", "1", authorization,
+ "abcde001-d857-4e90-b1e5-df98a3d40ce6", jsonLockWrite, "process1", appName);
+ Map<String, Object> respMapCreate2 = (Map<String, Object>) responseCreate2.getEntity();
+ String lockRefCreate2 = ((Map<String, String>) respMapCreate2.get("lock")).get("lock");
+
+ // creates a lock 2
+ JsonLock jsonLockRead = createJsonLock(LockType.READ);
+ Response responseCreate1 = lock.createLockReference(lockName, "1", "1", authorization,
+ "abcde001-d857-4e90-b1e5-df98a3d40ce6", jsonLockRead, "process1", appName);
+ Map<String, Object> respMapCreate1 = (Map<String, Object>) responseCreate1.getEntity();
+ String lockRefCreate1 = ((Map<String, String>) respMapCreate1.get("lock")).get("lock");
+
+ Response respMapPromote = lock.promoteLock(lockRefCreate1, "1", "1", authorization);
+ System.out.println("Status: " + respMapPromote.getStatus() + ". Entity " + respMapPromote.getEntity());
+
+ assertEquals(200, respMapPromote.getStatus());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void test_lockPromotion2Reads() throws Exception {
+ System.out.println("Testing lock promotion w/ 2 ReadLocks");
+ createAndInsertIntoTable();
+ insertAnotherIntoTable();
+
+ // creates a lock 1
+ JsonLock jsonLockRead = createJsonLock(LockType.READ);
+ Response responseCreate1 = lock.createLockReference(lockName, "1", "1", authorization,
+ "abcde001-d857-4e90-b1e5-df98a3d40ce6", jsonLockRead, "process1", appName);
+ Map<String, Object> respMapCreate1 = (Map<String, Object>) responseCreate1.getEntity();
+ String lockRefCreate1 = ((Map<String, String>) respMapCreate1.get("lock")).get("lock");
+
+ Response responseCreate2 = lock.createLockReference(lockName, "1", "1", authorization,
+ "abcde001-d857-4e90-b1e5-df98a3d40ce6", jsonLockRead, "process1", appName);
+ Map<String, Object> respMapCreate2 = (Map<String, Object>) responseCreate1.getEntity();
+ String lockRefCreate2 = ((Map<String, String>) respMapCreate1.get("lock")).get("lock");
+
+ Response respMapPromote = lock.promoteLock(lockRefCreate1, "1", "1", authorization);
+ System.out.println("Status: " + respMapPromote.getStatus() + ". Entity " + respMapPromote.getEntity());
+
+ assertEquals(400, respMapPromote.getStatus());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void test_2lockPromotions() throws Exception {
+ System.out.println("Testing 2 lock promotions");
+ createAndInsertIntoTable();
+ insertAnotherIntoTable();
+
+ // creates a lock 1
+ JsonLock jsonLockRead = createJsonLock(LockType.READ);
+ Response responseCreate1 = lock.createLockReference(lockName, "1", "1", authorization,
+ "abcde001-d857-4e90-b1e5-df98a3d40ce6", jsonLockRead, "process1", appName);
+ Map<String, Object> respMapCreate1 = (Map<String, Object>) responseCreate1.getEntity();
+ String lockRefCreate1 = ((Map<String, String>) respMapCreate1.get("lock")).get("lock");
+
+ Response responseCreate2 = lock.createLockReference(lockName, "1", "1", authorization,
+ "abcde001-d857-4e90-b1e5-df98a3d40ce6", jsonLockRead, "process1", appName);
+ Map<String, Object> respMapCreate2 = (Map<String, Object>) responseCreate2.getEntity();
+ String lockRefCreate2 = ((Map<String, String>) respMapCreate2.get("lock")).get("lock");
+
+ Response respMapPromote = lock.promoteLock(lockRefCreate1, "1", "1", authorization);
+ System.out.println("Status: " + respMapPromote.getStatus() + ". Entity " + respMapPromote.getEntity());
+
+ assertEquals(400, respMapPromote.getStatus());
+
+ Response respMap2Promote = lock.promoteLock(lockRefCreate2, "1", "1", authorization);
+ System.out.println("Status: " + respMap2Promote.getStatus() + ". Entity " + respMap2Promote.getEntity());
+
+ assertEquals(400, respMapPromote.getStatus());
+ }
+
+
// Ignoring since this is now a duplicate of delete lock ref.
@Test