diff options
Diffstat (limited to 'src/main')
7 files changed, 216 insertions, 99 deletions
diff --git a/src/main/java/org/onap/music/datastore/MusicDataStore.java b/src/main/java/org/onap/music/datastore/MusicDataStore.java index 6555ea21..5a658688 100755 --- a/src/main/java/org/onap/music/datastore/MusicDataStore.java +++ b/src/main/java/org/onap/music/datastore/MusicDataStore.java @@ -41,6 +41,7 @@ import org.onap.music.eelf.logging.format.ErrorSeverity; import org.onap.music.eelf.logging.format.ErrorTypes; import org.onap.music.exceptions.MusicQueryException; import org.onap.music.exceptions.MusicServiceException; +import org.onap.music.lockingservice.cassandra.LockType; import org.onap.music.main.CipherUtil; import org.onap.music.main.MusicUtil; import com.datastax.driver.core.Cluster; @@ -57,9 +58,12 @@ import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.TypeCodec; import com.datastax.driver.core.exceptions.AlreadyExistsException; import com.datastax.driver.core.exceptions.InvalidQueryException; import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.extras.codecs.enums.EnumNameCodec; +import com.datastax.driver.extras.codecs.enums.EnumOrdinalCodec; /** * @author nelson24 @@ -93,15 +97,23 @@ public class MusicDataStore { public void setCluster(Cluster cluster) { this.cluster = cluster; } + + public Cluster getCluster() { + return this.cluster; + } private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class); /** - * + * Connect to default Cassandra address */ public MusicDataStore() { - connectToCassaCluster(); + try { + connectToCassaCluster(MusicUtil.getMyCassaHost()); + } catch (MusicServiceException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e); + } } @@ -129,82 +141,6 @@ public class MusicDataStore { /** * - * @return - */ - private ArrayList<String> getAllPossibleLocalIps() { - ArrayList<String> allPossibleIps = new ArrayList<>(); - try { - Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces(); - while (en.hasMoreElements()) { - NetworkInterface ni = en.nextElement(); - Enumeration<InetAddress> ee = ni.getInetAddresses(); - while (ee.hasMoreElements()) { - InetAddress ia = ee.nextElement(); - allPossibleIps.add(ia.getHostAddress()); - } - } - } catch (SocketException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR, - ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR, e); - }catch(Exception e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes - .GENERALSERVICEERROR, e); - } - return allPossibleIps; - } - - /** - * This method iterates through all available IP addresses and connects to multiple cassandra - * clusters. - */ - private void connectToCassaCluster() { - Iterator<String> it = getAllPossibleLocalIps().iterator(); - String address = "localhost"; - String[] addresses = null; - address = MusicUtil.getMyCassaHost(); - addresses = address.split(","); - - logger.info(EELFLoggerDelegate.applicationLogger, - "Connecting to cassa cluster: Iterating through possible ips:" - + getAllPossibleLocalIps()); - PoolingOptions poolingOptions = new PoolingOptions(); - poolingOptions - .setConnectionsPerHost(HostDistance.LOCAL, 4, 10) - .setConnectionsPerHost(HostDistance.REMOTE, 2, 4); - while (it.hasNext()) { - try { - if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) { - String cassPwd = CipherUtil.decryptPKC(MusicUtil.getCassPwd()); - logger.info(EELFLoggerDelegate.applicationLogger, - "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd()); - cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) - .withCredentials(MusicUtil.getCassName(), cassPwd) - //.withLoadBalancingPolicy(new RoundRobinPolicy()) - .withoutJMXReporting() - .withPoolingOptions(poolingOptions) - .addContactPoints(addresses).build(); - } - else - cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) - //.withLoadBalancingPolicy(new RoundRobinPolicy()) - .addContactPoints(addresses).build(); - - Metadata metadata = cluster.getMetadata(); - logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster " - + metadata.getClusterName() + " at " + address); - session = cluster.connect(); - - break; - } catch (NoHostAvailableException e) { - address = it.next(); - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE, - ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR, e); - } - } - } - - /** - * */ public void close() { session.close(); @@ -222,6 +158,7 @@ public class MusicDataStore { poolingOptions .setConnectionsPerHost(HostDistance.LOCAL, 4, 10) .setConnectionsPerHost(HostDistance.REMOTE, 2, 4); + if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) { String cassPwd = CipherUtil.decryptPKC(MusicUtil.getCassPwd()); logger.info(EELFLoggerDelegate.applicationLogger, @@ -236,12 +173,18 @@ public class MusicDataStore { cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) .withoutJMXReporting() .withPoolingOptions(poolingOptions) - .addContactPoints(addresses).build(); + .addContactPoints(addresses) + .build(); } + Metadata metadata = cluster.getMetadata(); logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster " + metadata.getClusterName() + " at " + address); + + EnumNameCodec<LockType> lockTypeCodec = new EnumNameCodec<LockType>(LockType.class); + cluster.getConfiguration().getCodecRegistry().register(lockTypeCodec); + try { session = cluster.connect(); } catch (Exception ex) { diff --git a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java index 0ec85077..10898476 100644 --- a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java +++ b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java @@ -36,9 +36,12 @@ import org.onap.music.exceptions.MusicServiceException; import org.onap.music.main.DeadlockDetectionUtil; import org.onap.music.main.DeadlockDetectionUtil.OwnershipType; import org.onap.music.main.MusicUtil; - +import org.onap.music.main.ResultType; +import org.onap.music.main.ReturnType; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.extras.codecs.enums.EnumNameCodec; /* * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state. @@ -126,7 +129,7 @@ public class CassaLockStore { table = table_prepend_name+table; String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, " - + "writeLock boolean, owner text, PRIMARY KEY ((key), lockReference) ) " + + "lockType text, owner text, PRIMARY KEY ((key), lockReference) ) " + "WITH CLUSTERING ORDER BY (lockReference ASC);"; PreparedQueryObject queryObject = new PreparedQueryObject(); @@ -176,13 +179,14 @@ public class CassaLockStore { logger.info(EELFLoggerDelegate.applicationLogger, "Created lock reference for " + keyspace + "." + lockTable + "." + lockName + ":" + lockRef); - + queryObject = new PreparedQueryObject(); + String insQuery = "BEGIN BATCH" + " UPDATE " + keyspace + "." + lockTable + " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" + " INSERT INTO " + keyspace + "." + lockTable + - "(key, lockReference, createTime, acquireTime, writeLock, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;"; + "(key, lockReference, createTime, acquireTime, lockType, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;"; queryObject.addValue(lockRef); queryObject.addValue(lockName); @@ -193,7 +197,7 @@ public class CassaLockStore { queryObject.addValue(lockRef); queryObject.addValue(String.valueOf(lockEpochMillis)); queryObject.addValue("0"); - queryObject.addValue(locktype==LockType.WRITE ? true : false ); + queryObject.addValue(locktype); queryObject.addValue(owner); queryObject.appendQueryString(insQuery); boolean pResult = dsHandle.executePut(queryObject, "critical"); @@ -285,7 +289,7 @@ public class CassaLockStore { String lockReference = "" + row.getLong("lockReference"); String createTime = row.getString("createTime"); String acquireTime = row.getString("acquireTime"); - LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ; + LockType locktype = row.get("lockType", LockType.class); String owner = row.getString("owner"); return new LockObject(true, lockReference, createTime, acquireTime, locktype, owner); @@ -313,7 +317,7 @@ public class CassaLockStore { return lockHolders; } lockReference.append(lock).append(row.getLong("lockReference")); - if (row.isNull("writeLock") || row.getBool("writeLock")) { + if (row.get("lockType", LockType.class)!=LockType.WRITE) { if (topOfQueue) { lockHolders.add(lockReference.toString()); break; @@ -356,7 +360,7 @@ public class CassaLockStore { boolean topOfQueue = true; for (Row row : rs) { String lockReference = "" + row.getLong("lockReference"); - if (row.isNull("writeLock") || row.getBool("writeLock")) { + if (row.get("lockType", LockType.class)==LockType.WRITE) { if (topOfQueue && lockRef.equals(lockReference)) { return true; } else { @@ -404,7 +408,7 @@ public class CassaLockStore { String lockReference = "" + row.getLong("lockReference"); String createTime = row.getString("createTime"); String acquireTime = row.getString("acquireTime"); - LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ; + LockType locktype = row.get("lockType", LockType.class); boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef); String owner = row.getString("owner"); @@ -456,11 +460,9 @@ public class CassaLockStore { public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) { table = table_prepend_name + table; - PreparedQueryObject queryObject = new PreparedQueryObject(); Long lockReferenceL = Long.parseLong(lockReference); String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis() + "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;"; - queryObject.appendQueryString(updateQuery); //cannot use executePut because we need to ignore music timestamp adjustments for lock store dsHandle.getSession().execute(updateQuery); @@ -473,7 +475,8 @@ public class CassaLockStore { String lockTable = table_prepend_name + table; PreparedQueryObject queryObject = new PreparedQueryObject(); queryObject.appendQueryString("SELECT key, acquiretime, owner FROM " + keyspace + "." + lockTable); - queryObject.appendQueryString(" WHERE writelock = True ALLOW FILTERING"); + queryObject.appendQueryString(" WHERE lockType = ? ALLOW FILTERING"); + queryObject.addValue(LockType.WRITE); DeadlockDetectionUtil ddu = new DeadlockDetectionUtil(); @@ -506,5 +509,96 @@ public class CassaLockStore { return toRet; } + public ReturnType promoteLock(String keyspace, String table, String key, String lockRef) + throws MusicLockingException, MusicServiceException, MusicQueryException { + String lockqtable = table_prepend_name + table; + String selectQuery = "select * from " + keyspace + "." + lockqtable + " where key=?;"; + + PreparedQueryObject queryObject = new PreparedQueryObject(); + queryObject.appendQueryString(selectQuery); + queryObject.addValue(key); + ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject); + + long refToPromote = Long.parseLong(lockRef); + + boolean topOfQueue = true; + boolean readBlock = false; + boolean seenLockToPromote = false; + boolean promotionOngoing = false; + long readBlockStart = 0; + long readBlockEnd = 0; + + + for (Row row : rs) { + long ref = row.getLong("lockreference"); + LockType lockType = row.get("lockType", LockType.class); + + if (refToPromote==ref) { + if (promotionOngoing) { + return new ReturnType(ResultType.FAILURE, "Can't promote, already promoting another lockref."); + } + seenLockToPromote = true; + if (!topOfQueue) { + readBlockStart = ref; + readBlockEnd = ref; + break; + } + } else if (!seenLockToPromote && refToPromote<ref) { + return new ReturnType(ResultType.FAILURE, "Lockref does not exist."); + } + + if (lockType==LockType.READ || lockType==LockType.PROMOTING) { + if (!readBlock) { + readBlockStart = ref; + readBlock = true; + } + if (readBlock) { + readBlockEnd = ref; + } + if (lockType==LockType.PROMOTING) { + promotionOngoing = true; + } + } + + if (lockType==LockType.WRITE) { + if (refToPromote==ref) { + return new ReturnType(ResultType.FAILURE, "Lockref is already write."); + } + if (readBlock) { + readBlock = false; + promotionOngoing = false; + if (seenLockToPromote) { + break; + } + //can no longer be lock holder after this + topOfQueue = false; + } + } + } + + if (readBlockStart<=refToPromote && refToPromote<=readBlockEnd) { + if (readBlockStart==refToPromote && refToPromote==readBlockEnd) { + promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.WRITE); + return new ReturnType(ResultType.SUCCESS, "Lock has successfully been upgraded."); + } + promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.PROMOTING); + return new ReturnType(ResultType.FAILURE, "Your lock upgrade is in progress. Check again to see if successful."); + } + + //shouldn't reach here? + return new ReturnType(ResultType.FAILURE,"Promotion failed."); + } + + private void promoteLockTo(String keyspace, String table, String key, String lockRef, LockType newLockType) + throws MusicServiceException, MusicQueryException { + PreparedQueryObject queryObject = + new PreparedQueryObject("UPDATE " + keyspace + "." + table + " SET lockType=? WHERE key='" + key + + "' AND lockReference = " + lockRef + " IF EXISTS;", newLockType); + + //cannot use executePut because we need to ignore music timestamp adjustments for lock store + dsHandle.executePut(queryObject, MusicUtil.QUORUM); + } + + } diff --git a/src/main/java/org/onap/music/lockingservice/cassandra/LockType.java b/src/main/java/org/onap/music/lockingservice/cassandra/LockType.java index 139d5f3f..432b1c51 100644 --- a/src/main/java/org/onap/music/lockingservice/cassandra/LockType.java +++ b/src/main/java/org/onap/music/lockingservice/cassandra/LockType.java @@ -25,6 +25,6 @@ package org.onap.music.lockingservice.cassandra; public enum LockType { - READ, WRITE; + WRITE, READ, PROMOTING; } diff --git a/src/main/java/org/onap/music/main/MusicCore.java b/src/main/java/org/onap/music/main/MusicCore.java index e889e180..658f2124 100644 --- a/src/main/java/org/onap/music/main/MusicCore.java +++ b/src/main/java/org/onap/music/main/MusicCore.java @@ -121,6 +121,10 @@ public class MusicCore { public static List<String> getCurrentLockHolders(String fullyQualifiedKey) { return musicCore.getCurrentLockHolders(fullyQualifiedKey); } + + public static ReturnType promoteLock(String lockIdToPromote) throws MusicLockingException { + return musicCore.promoteLock(lockIdToPromote); + } public static void destroyLockRef(String lockId) throws MusicLockingException { musicCore.destroyLockRef(lockId); diff --git a/src/main/java/org/onap/music/rest/RestMusicLocksAPI.java b/src/main/java/org/onap/music/rest/RestMusicLocksAPI.java index b839c7f2..321e2561 100644 --- a/src/main/java/org/onap/music/rest/RestMusicLocksAPI.java +++ b/src/main/java/org/onap/music/rest/RestMusicLocksAPI.java @@ -568,4 +568,65 @@ public class RestMusicLocksAPI { } } + + /** + * Puts the requesting process in the q for this lock. The corresponding + * node will be created if it did not already exist + * + * @param lockName + * @return + * @throws Exception + */ + @POST + @Path("/promote/{lockname}") + @ApiOperation(value = "Attempt to promote the lock for a single row.", + response = Map.class) + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @ApiResponses(value={ + @ApiResponse(code=200, message = "Success",examples = @Example( value = { + @ExampleProperty(mediaType="application/json",value = + "\"status\" : \"SUCCESS\"}") + })), + @ApiResponse(code=400, message = "Failure",examples = @Example( value = { + @ExampleProperty(mediaType="application/json",value = + "{\"error\" : \"Unable to promote lock\"," + + "\"status\" : \"FAILURE\"}") + })) + }) + public Response promoteLock( + @ApiParam(value="Lock Id",required=true) @PathParam("lockId") String lockId, + @ApiParam(value = "Minor Version",required = false) @HeaderParam(XMINORVERSION) String minorVersion, + @ApiParam(value = "Patch Version",required = false) @HeaderParam(XPATCHVERSION) String patchVersion, + @ApiParam(value = "Authorization", required = true) @HeaderParam(MusicUtil.AUTHORIZATION) String authorization) + throws Exception { + try { + ResponseBuilder response = MusicUtil.buildVersionResponse(VERSION, minorVersion, patchVersion); + Map<String, Object> resultMap = MusicCore.validateLock(lockId); + if (resultMap.containsKey("Error")) { + logger.error(EELFLoggerDelegate.errorLogger,"", AppMessages.INCORRECTDATA ,ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR); + response.status(Status.BAD_REQUEST); + return response.entity(new JsonResponse(ResultType.FAILURE).setError(String.valueOf(resultMap.get("Error"))).toMap()).build(); + } + + String keyspaceName = (String) resultMap.get("keyspace"); + EELFLoggerDelegate.mdcPut("keyspace", "( " + keyspaceName + " ) "); + + try { + ReturnType lockStatus = MusicCore.promoteLock(lockId); + if ( lockStatus.getResult().equals(ResultType.SUCCESS)) { + response.status(Status.OK); + } else { + response.status(Status.BAD_REQUEST); + } + return response.entity(new JsonResponse(lockStatus.getResult()).setLock(lockId).setMessage(lockStatus.getMessage()).toMap()).build(); + } catch (Exception e) { + logger.error(EELFLoggerDelegate.errorLogger,AppMessages.INVALIDLOCK + lockId, ErrorSeverity.CRITICAL, + ErrorTypes.LOCKINGERROR, e); + return response.status(Status.BAD_REQUEST).entity(new JsonResponse(ResultType.FAILURE).setError("Unable to promote lock").toMap()).build(); + } + } finally { + EELFLoggerDelegate.mdcRemove("keyspace"); + } + } } diff --git a/src/main/java/org/onap/music/service/MusicCoreService.java b/src/main/java/org/onap/music/service/MusicCoreService.java index 1ecb2ee1..b3226906 100644 --- a/src/main/java/org/onap/music/service/MusicCoreService.java +++ b/src/main/java/org/onap/music/service/MusicCoreService.java @@ -133,15 +133,10 @@ public interface MusicCoreService { public void destroyLockRef(String lockId) throws MusicLockingException; - //public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference); // lock name, lock id - - //public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference) - // throws MusicLockingException;// lock name,lock id - public void deleteLock(String lockName) throws MusicLockingException; - //public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException, MusicServiceException, MusicQueryException; - + public ReturnType promoteLock(String lockIdToPromote) throws MusicLockingException; + public List<String> getLockQueue(String fullyQualifiedKey) throws MusicServiceException, MusicQueryException, MusicLockingException; diff --git a/src/main/java/org/onap/music/service/impl/MusicCassaCore.java b/src/main/java/org/onap/music/service/impl/MusicCassaCore.java index 0c30cc74..0d2e3f0a 100644 --- a/src/main/java/org/onap/music/service/impl/MusicCassaCore.java +++ b/src/main/java/org/onap/music/service/impl/MusicCassaCore.java @@ -158,6 +158,26 @@ public class MusicCassaCore implements MusicCoreService { logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms"); return lockReference; } + + public ReturnType promoteLock(String lockId) throws MusicLockingException { + String[] splitString = lockId.split("\\."); + String keyspace = splitString[0].substring(1);//remove '$' + String table = splitString[1]; + String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$")); + String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$")); + String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end + + logger.info(EELFLoggerDelegate.applicationLogger,"Attempting to promote lock " + lockId); + + try { + return getLockingServiceHandle().promoteLock(keyspace, table, primaryKeyValue, lockRef); + } catch (MusicServiceException e) { + throw new MusicLockingException("Unable to promote lock. ", e); + } catch (MusicQueryException e) { + throw new MusicLockingException("Unable to promote lock. ", e); + } + + } public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod) |