diff options
10 files changed, 261 insertions, 30 deletions
diff --git a/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java b/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java index 7f6c42ca..cb22c0f4 100755 --- a/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java +++ b/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java @@ -374,22 +374,25 @@ public class MusicDataStore { ResultSet rs = session.execute(preparedInsert); result = rs.wasApplied(); - - } - catch (AlreadyExistsException ae) { - // logger.error(EELFLoggerDelegate.errorLogger,"AlreadExistsException: " + ae.getMessage(),AppMessages.QUERYERROR, - // ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); - throw new MusicQueryException("AlreadyExistsException: " + ae.getMessage(),ae); - } catch ( InvalidQueryException e ) { - // logger.error(EELFLoggerDelegate.errorLogger,"InvalidQueryException: " + e.getMessage(),AppMessages.SESSIONFAILED + " [" - // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); - throw new MusicQueryException("InvalidQueryException: " + e.getMessage(),e); + } catch (AlreadyExistsException ae) { + throw new MusicServiceException("Already Exists Exception: " + ae.getMessage()); + } catch (InvalidQueryException e) { + if (e.getMessage().contains("unconfigured table")) { + throw new MusicServiceException("Invalid Query Exception: " + e.getMessage()); + } else { + logger.info(EELFLoggerDelegate.applicationLogger, "Query Exception: " + e.getMessage(), + AppMessages.SESSIONFAILED + " [" + queryObject.getQuery() + "]", ErrorSeverity.INFO, + ErrorTypes.QUERYERROR, e); + throw new MusicServiceException("Query Exception: " + e.getMessage()); + } } catch (Exception e) { - // logger.error(EELFLoggerDelegate.errorLogger,e.getClass().toString() + ":" + e.getMessage(),AppMessages.SESSIONFAILED + " [" - // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, e); - throw new MusicServiceException("Executing Session Failure for Request = " + "[" - + queryObject.getQuery() + "]" + " Reason = " + e.getMessage(),e); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), + AppMessages.SESSIONFAILED + " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, + ErrorTypes.QUERYERROR, e); + throw new MusicServiceException("Executing Session Failure for Request = " + "[" + queryObject.getQuery() + + "]" + " Reason = " + e.getMessage()); } + return result; } diff --git a/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java index e9533344..a727357f 100644 --- a/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java +++ b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java @@ -50,7 +50,7 @@ import com.datastax.driver.extras.codecs.enums.EnumNameCodec; public class CassaLockStore { private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class); - private static String table_prepend_name = "lockQ_"; + public static final String table_prepend_name = "lockQ_"; private MusicDataStore dsHandle; public CassaLockStore() { diff --git a/music-core/src/main/java/org/onap/music/lockingservice/cassandra/LockCleanUpDaemon.java b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/LockCleanUpDaemon.java new file mode 100644 index 00000000..492a48f0 --- /dev/null +++ b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/LockCleanUpDaemon.java @@ -0,0 +1,132 @@ +/* + * ============LICENSE_START========================================== + * org.onap.music + * =================================================================== + * Copyright (c) 2019 AT&T Intellectual Property + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ + +package org.onap.music.lockingservice.cassandra; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.onap.music.datastore.MusicDataStoreHandle; +import org.onap.music.datastore.PreparedQueryObject; +import org.onap.music.eelf.logging.EELFLoggerDelegate; +import org.onap.music.exceptions.MusicQueryException; +import org.onap.music.exceptions.MusicServiceException; +import org.onap.music.main.MusicCore; +import org.onap.music.main.MusicUtil; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; + +public class LockCleanUpDaemon extends Thread { + + boolean terminated = false; + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(LockCleanUpDaemon.class); + + + public LockCleanUpDaemon() { + } + + @Override + public void run() { + if (MusicUtil.getLockDaemonSleepTimeMs()<0) { + terminate(); + } + while (!terminated) { + try { + cleanupStaleLocks(); + } catch (MusicServiceException e) { + logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to clean up locks", e); + } + try { + Thread.sleep(MusicUtil.getLockDaemonSleepTimeMs()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + private void cleanupStaleLocks() throws MusicServiceException { + Set<String> lockQTables = getLockQTables(); + logger.info(EELFLoggerDelegate.applicationLogger, "Lock q tables found: " + lockQTables); + for(String lockTable: lockQTables) { + try { + cleanUpLocksFromTable(lockTable); + } catch (MusicServiceException e) { + logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to clear locks on table " + lockTable, e); + } + } + } + + + private Set<String> getLockQTables() throws MusicServiceException { + Set<String> keyspacesToCleanUp = MusicUtil.getKeyspacesToCleanLocks(); + Set<String> lockQTables = new HashSet<>(); + + PreparedQueryObject query = new PreparedQueryObject(); + query.appendQueryString("SELECT keyspace_name, table_name FROM system_schema.tables;"); + ResultSet results = MusicCore.get(query); + + for (Row row: results) { + if (keyspacesToCleanUp.contains(row.getString("keyspace_name")) + && row.getString("table_name").toLowerCase().startsWith(CassaLockStore.table_prepend_name.toLowerCase()) ) { + lockQTables.add(row.getString("keyspace_name") + "." + row.getString("table_name")); + } + } + return lockQTables; + } + + private void cleanUpLocksFromTable(String lockTable) throws MusicServiceException { + PreparedQueryObject query = new PreparedQueryObject(); + query.appendQueryString("SELECT * from " + lockTable); + ResultSet results = MusicCore.get(query); + for (Row lock: results) { + if (!lock.isNull("lockreference")) { + try { + deleteLockIfStale(lockTable, lock); + } catch (MusicServiceException e) { + logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to delete a potentially stale lock " + lock, e); + } + } + } + } + + + private void deleteLockIfStale(String lockTable, Row lock) throws MusicServiceException { + if (lock.isNull("createtime") && lock.isNull("acquiretime")) { + return; + } + + long createTime = lock.isNull("createtime") ? 0 : Long.parseLong(lock.getString("createtime")); + long acquireTime = lock.isNull("acquiretime") ? 0 : Long.parseLong(lock.getString("acquiretime")); + long row_access_time = Math.max(createTime, acquireTime); + if (System.currentTimeMillis() > row_access_time + MusicUtil.getDefaultLockLeasePeriod()) { + logger.info(EELFLoggerDelegate.applicationLogger, "Stale lock detected and being removed: " + lock); + PreparedQueryObject query = new PreparedQueryObject(); + query.appendQueryString("DELETE FROM " + lockTable + " WHERE key='" + lock.getString("key") + "' AND " + + "lockreference=" + lock.getLong("lockreference") + " IF EXISTS;"); + MusicDataStoreHandle.getDSHandle().getSession().execute(query.getQuery()); + } + } + + public void terminate() { + terminated = true; + } +} diff --git a/music-core/src/main/java/org/onap/music/main/MusicUtil.java b/music-core/src/main/java/org/onap/music/main/MusicUtil.java index 78d17c60..865ca01f 100644 --- a/music-core/src/main/java/org/onap/music/main/MusicUtil.java +++ b/music-core/src/main/java/org/onap/music/main/MusicUtil.java @@ -34,9 +34,11 @@ import java.io.FileNotFoundException; import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Scanner; +import java.util.Set; import java.util.UUID; import javax.ws.rs.core.Response; @@ -107,6 +109,8 @@ public class MusicUtil { private static boolean debug = true; private static String version = "0.0.0"; private static String build = ""; + private static long lockDaemonSleepms = 1000; + private static Set<String> keyspacesToCleanLocks = new HashSet<>(); private static String musicPropertiesFilePath = PROPERTIES_FILE; // private static final String[] propKeys = new String[] { MusicUtil.class.getDeclaredMethod(arg0, )"build","cassandra.host", "debug", @@ -160,7 +164,26 @@ public class MusicUtil { private static Boolean clientIdRequired = false; private static Boolean messageIdRequired = false; private static String cipherEncKey = ""; + + private static long createLockWaitPeriod = 300; + private static int createLockWaitIncrement = 50; + + public static long getCreateLockWaitPeriod() { + return createLockWaitPeriod; + } + + public static void setCreateLockWaitPeriod(long createLockWaitPeriod) { + MusicUtil.createLockWaitPeriod = createLockWaitPeriod; + } + public static int getCreateLockWaitIncrement() { + return createLockWaitIncrement; + } + + public static void setCreateLockWaitIncrement(int createLockWaitIncrement) { + MusicUtil.createLockWaitIncrement = createLockWaitIncrement; + } + public MusicUtil() { throw new IllegalStateException("Utility Class"); } @@ -810,6 +833,27 @@ public class MusicUtil { MusicUtil.messageIdRequired = messageIdRequired; } + /** + * @return the sleep time, in milliseconds, for the lock cleanup daemon + */ + public static long getLockDaemonSleepTimeMs() { + return lockDaemonSleepms; + } + + /** + * set the sleep time, in milliseconds, for the lock cleanup daemon + */ + public static void setLockDaemonSleepTimeMs(long timeoutms) { + MusicUtil.lockDaemonSleepms = timeoutms; + } + + public static Set<String> getKeyspacesToCleanLocks() { + return keyspacesToCleanLocks; + } + + public static void setKeyspacesToCleanLocks(Set<String> keyspaces) { + MusicUtil.keyspacesToCleanLocks = keyspaces; + } public static String getCipherEncKey() { return MusicUtil.cipherEncKey; diff --git a/music-core/src/main/java/org/onap/music/service/MusicCoreService.java b/music-core/src/main/java/org/onap/music/service/MusicCoreService.java index 2fc88145..753d9b28 100644 --- a/music-core/src/main/java/org/onap/music/service/MusicCoreService.java +++ b/music-core/src/main/java/org/onap/music/service/MusicCoreService.java @@ -97,7 +97,7 @@ public interface MusicCoreService { * @param owner the owner of the lock, for deadlock prevention */ public String createLockReference(String fullyQualifiedKey, String owner) throws MusicLockingException; - + /** * Create a lock ref in the music lock store * @param fullyQualifiedKey the key to create a lock on diff --git a/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java b/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java index 1f3f4a32..d29ba32b 100644 --- a/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java +++ b/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java @@ -27,12 +27,13 @@ package org.onap.music.service.impl; import java.io.StringWriter; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.StringTokenizer; - +import java.util.concurrent.atomic.AtomicInteger; import javax.ws.rs.core.MultivaluedMap; import org.onap.music.datastore.Condition; @@ -75,7 +76,9 @@ public class MusicCassaCore implements MusicCoreService { private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class); private static MusicCassaCore musicCassaCoreInstance = null; private static Set<String> set = Collections.synchronizedSet(new HashSet<String>()); - + HashMap<String, Integer> map = new HashMap<>(); + AtomicInteger wait = new AtomicInteger(0); + private MusicCassaCore() { // not going to happen } @@ -120,10 +123,11 @@ public class MusicCassaCore implements MusicCoreService { public String createLockReference(String fullyQualifiedKey, String owner) throws MusicLockingException { return createLockReference(fullyQualifiedKey, LockType.WRITE, owner); } - + + /** * This will be called for Atomic calls - * + * it ensures that only one thread tries to create a lock on each key at a time */ public String createLockReferenceAtomic(String fullyQualifiedKey, LockType locktype) throws MusicLockingException { String[] splitString = fullyQualifiedKey.split("\\."); diff --git a/music-rest/src/main/java/org/onap/music/MusicApplication.java b/music-rest/src/main/java/org/onap/music/MusicApplication.java index 22c9e7bf..0fe354d9 100755 --- a/music-rest/src/main/java/org/onap/music/MusicApplication.java +++ b/music-rest/src/main/java/org/onap/music/MusicApplication.java @@ -33,6 +33,7 @@ import org.onap.music.authentication.CadiAuthFilter; import org.onap.music.authentication.MusicAuthorizationFilter; import org.onap.music.eelf.logging.EELFLoggerDelegate; import org.onap.music.eelf.logging.MusicLoggingServletFilter; +import org.onap.music.lockingservice.cassandra.LockCleanUpDaemon; import org.onap.music.main.MusicUtil; import org.onap.music.main.PropertiesLoader; import org.springframework.beans.factory.annotation.Autowired; @@ -66,6 +67,10 @@ public class MusicApplication extends SpringBootServletInitializer { public static void main(String[] args) { new MusicApplication().configure(new SpringApplicationBuilder(MusicApplication.class)).run(args); + + LockCleanUpDaemon daemon = new LockCleanUpDaemon(); + daemon.setDaemon(true); + daemon.start(); } @Override diff --git a/music-rest/src/main/java/org/onap/music/conductor/conditionals/MusicConditional.java b/music-rest/src/main/java/org/onap/music/conductor/conditionals/MusicConditional.java index ebaa3a1f..2c69c435 100644 --- a/music-rest/src/main/java/org/onap/music/conductor/conditionals/MusicConditional.java +++ b/music-rest/src/main/java/org/onap/music/conductor/conditionals/MusicConditional.java @@ -142,11 +142,20 @@ public class MusicConditional { return new ReturnType(ResultType.FAILURE, e.getMessage()); } if (results.all().isEmpty()) { - MusicDataStoreHandle.getDSHandle().executePut(queryBank.get(MusicUtil.INSERT), "critical"); - return new ReturnType(ResultType.SUCCESS, "insert"); + PreparedQueryObject qObject = queryBank.get(MusicUtil.INSERT); + qObject.setOperation(MusicUtil.INSERT); + logger.info(EELFLoggerDelegate.debugLogger,"### Conditional Insert"); + MusicCore.criticalPut(keyspace, tableName, primaryKey, qObject, lockId, null); + //MusicDataStoreHandle.getDSHandle().executePut(queryBank.get(MusicUtil.INSERT), "critical"); + return new ReturnType(ResultType.SUCCESS, MusicUtil.INSERT); + } else { - MusicDataStoreHandle.getDSHandle().executePut(queryBank.get(MusicUtil.UPDATE), "critical"); - return new ReturnType(ResultType.SUCCESS, "update"); + PreparedQueryObject qObject = queryBank.get(MusicUtil.UPDATE); + qObject.setOperation(MusicUtil.UPDATE); + logger.info(EELFLoggerDelegate.debugLogger,"### Condition Update"); + MusicCore.criticalPut(keyspace, tableName, primaryKey, qObject, lockId, null); + //MusicDataStoreHandle.getDSHandle().executePut(queryBank.get(MusicUtil.UPDATE), "critical"); + return new ReturnType(ResultType.SUCCESS, MusicUtil.UPDATE); } } else { return new ReturnType(ResultType.FAILURE, @@ -214,13 +223,15 @@ public class MusicConditional { JSONObject json = new JSONObject(updatedValues); PreparedQueryObject update = new PreparedQueryObject(); String vector_ts = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis()); - update.appendQueryString("UPDATE " + dataObj.getKeyspace() + "." + dataObj.getTableName() + " SET " + dataObj.getCascadeColumnName() + "['" + dataObj.getPlanId() + update.appendQueryString("UPDATE " + dataObj.getKeyspace() + "." + dataObj.getTableName() + " SET " + + dataObj.getCascadeColumnName() + "['" + dataObj.getPlanId() + "'] = ?, vector_ts = ? WHERE " + dataObj.getPrimaryKey() + " = ?"); update.addValue(MusicUtil.convertToActualDataType(DataType.text(), json.toString())); update.addValue(MusicUtil.convertToActualDataType(DataType.text(), vector_ts)); update.addValue(MusicUtil.convertToActualDataType(DataType.text(), dataObj.getPrimaryKeyValue())); try { - MusicDataStoreHandle.getDSHandle().executePut(update, "critical"); + update.setOperation(MusicUtil.UPDATE); + MusicCore.criticalPut(dataObj.keyspace, dataObj.tableName, dataObj.primaryKeyValue, update, dataObj.lockId, null); } catch (Exception ex) { logger.error(EELFLoggerDelegate.applicationLogger, ex); return new ReturnType(ResultType.FAILURE, ex.getMessage()); @@ -228,9 +239,10 @@ public class MusicConditional { }else { return new ReturnType(ResultType.FAILURE,"Cannot find data related to key: "+dataObj.getPrimaryKey()); } - MusicDataStoreHandle.getDSHandle().executePut(dataObj.getQueryBank().get(MusicUtil.UPSERT), "critical"); + PreparedQueryObject qObject = dataObj.getQueryBank().get(MusicUtil.UPSERT); + qObject.setOperation(MusicUtil.INSERT); + MusicCore.criticalPut(dataObj.keyspace, dataObj.tableName, dataObj.primaryKeyValue, qObject, dataObj.lockId, null); return new ReturnType(ResultType.SUCCESS, "update success"); - } else { return new ReturnType(ResultType.FAILURE, "Cannot perform operation since you are the not the lock holder"); @@ -322,7 +334,7 @@ public class MusicConditional { counter = counter + 1; } queryObject.appendQueryString("INSERT INTO " + keySpaceName + "." + tableName + " " - + fieldsString + " VALUES " + valueString); + + fieldsString + " VALUES " + valueString + ";"); return queryObject; } diff --git a/music-rest/src/main/java/org/onap/music/main/PropertiesLoader.java b/music-rest/src/main/java/org/onap/music/main/PropertiesLoader.java index 6fbc76a7..9c69b9e2 100644 --- a/music-rest/src/main/java/org/onap/music/main/PropertiesLoader.java +++ b/music-rest/src/main/java/org/onap/music/main/PropertiesLoader.java @@ -22,6 +22,7 @@ package org.onap.music.main; +import java.util.HashSet; import java.util.Properties; import org.onap.music.eelf.logging.EELFLoggerDelegate; @@ -77,7 +78,19 @@ public class PropertiesLoader implements InitializingBean { @Value("${retry.count}") public String rertryCount; + + @Value("${lock.daemon.sleeptime.ms}") + public String lockDaemonSleeptimems; + @Value("${keyspaces.for.lock.cleanup}") + public String keyspacesForLockCleanup; + + @Value("${create.lock.wait.period.ms}") + private long createLockWaitPeriod; + + @Value("${create.lock.wait.increment.ms}") + private int createLockWaitIncrement; + @Value("${transId.header.prefix}") private String transIdPrefix; @@ -172,6 +185,16 @@ public class PropertiesLoader implements InitializingBean { if (isKeyspaceActive != null && !isKeyspaceActive.equals("${keyspace.active}")) { MusicUtil.setKeyspaceActive(Boolean.parseBoolean(isKeyspaceActive)); } + if (lockDaemonSleeptimems != null && !lockDaemonSleeptimems.equals("${lock.daemon.sleeptime.ms}")) { + MusicUtil.setLockDaemonSleepTimeMs(Long.parseLong(lockDaemonSleeptimems)); + } + if (keyspacesForLockCleanup !=null && !keyspacesForLockCleanup.equals("${keyspaces.for.lock.cleanup}")) { + HashSet<String> keyspaces = new HashSet<>(); + for (String keyspace: keyspacesForLockCleanup.split(",")) { + keyspaces.add(keyspace); + } + MusicUtil.setKeyspacesToCleanLocks(keyspaces); + } if(transIdPrefix!=null) { MusicUtil.setTransIdPrefix(transIdPrefix); } @@ -203,6 +226,14 @@ public class PropertiesLoader implements InitializingBean { if(messageIdRequired!=null) { MusicUtil.setMessageIdRequired(messageIdRequired); } + + if(createLockWaitPeriod!=0) { + MusicUtil.setCreateLockWaitPeriod(createLockWaitPeriod); + } + + if(createLockWaitIncrement!=0) { + MusicUtil.setCreateLockWaitIncrement(createLockWaitIncrement); + } } public static void loadProperties(Properties properties) { diff --git a/music-rest/src/test/java/org/onap/music/unittests/TstRestMusicDataAPI.java b/music-rest/src/test/java/org/onap/music/unittests/TstRestMusicDataAPI.java index 407d0323..ea3fb54e 100644 --- a/music-rest/src/test/java/org/onap/music/unittests/TstRestMusicDataAPI.java +++ b/music-rest/src/test/java/org/onap/music/unittests/TstRestMusicDataAPI.java @@ -322,7 +322,7 @@ public class TstRestMusicDataAPI { assertEquals(400, response2.getStatus()); Map<String, String> respMap = (Map<String, String>) response2.getEntity(); assertEquals(ResultType.FAILURE, respMap.get("status")); - assertEquals("AlreadyExistsException: Table " + keyspaceName + "." + tableNameDup + " already exists", respMap.get("error")); + assertEquals("Already Exists Exception: Table " + keyspaceName + "." + tableNameDup + " already exists", respMap.get("error")); } |