diff options
author | Thomas Nelson <nelson24@att.com> | 2020-01-29 19:47:22 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2020-01-29 19:47:22 +0000 |
commit | a40b03fe9039d247dcc5a366f9778155a92c947b (patch) | |
tree | 045768c0d8508773014b6b00f31f573b36e469d1 /music-core | |
parent | a0a52fadf838954bf4e133d8dd94f5114ddc4ac4 (diff) | |
parent | 25d19ed4c6285b419e6ea7b9f0003c60668d4218 (diff) |
Merge "Bug fixes, syncronization, and clean up daemon"
Diffstat (limited to 'music-core')
6 files changed, 203 insertions, 20 deletions
diff --git a/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java b/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java index 7f6c42ca..cb22c0f4 100755 --- a/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java +++ b/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java @@ -374,22 +374,25 @@ public class MusicDataStore { ResultSet rs = session.execute(preparedInsert); result = rs.wasApplied(); - - } - catch (AlreadyExistsException ae) { - // logger.error(EELFLoggerDelegate.errorLogger,"AlreadExistsException: " + ae.getMessage(),AppMessages.QUERYERROR, - // ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); - throw new MusicQueryException("AlreadyExistsException: " + ae.getMessage(),ae); - } catch ( InvalidQueryException e ) { - // logger.error(EELFLoggerDelegate.errorLogger,"InvalidQueryException: " + e.getMessage(),AppMessages.SESSIONFAILED + " [" - // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); - throw new MusicQueryException("InvalidQueryException: " + e.getMessage(),e); + } catch (AlreadyExistsException ae) { + throw new MusicServiceException("Already Exists Exception: " + ae.getMessage()); + } catch (InvalidQueryException e) { + if (e.getMessage().contains("unconfigured table")) { + throw new MusicServiceException("Invalid Query Exception: " + e.getMessage()); + } else { + logger.info(EELFLoggerDelegate.applicationLogger, "Query Exception: " + e.getMessage(), + AppMessages.SESSIONFAILED + " [" + queryObject.getQuery() + "]", ErrorSeverity.INFO, + ErrorTypes.QUERYERROR, e); + throw new MusicServiceException("Query Exception: " + e.getMessage()); + } } catch (Exception e) { - // logger.error(EELFLoggerDelegate.errorLogger,e.getClass().toString() + ":" + e.getMessage(),AppMessages.SESSIONFAILED + " [" - // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, e); - throw new MusicServiceException("Executing Session Failure for Request = " + "[" - + queryObject.getQuery() + "]" + " Reason = " + e.getMessage(),e); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), + AppMessages.SESSIONFAILED + " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, + ErrorTypes.QUERYERROR, e); + throw new MusicServiceException("Executing Session Failure for Request = " + "[" + queryObject.getQuery() + + "]" + " Reason = " + e.getMessage()); } + return result; } diff --git a/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java index e9533344..a727357f 100644 --- a/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java +++ b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java @@ -50,7 +50,7 @@ import com.datastax.driver.extras.codecs.enums.EnumNameCodec; public class CassaLockStore { private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class); - private static String table_prepend_name = "lockQ_"; + public static final String table_prepend_name = "lockQ_"; private MusicDataStore dsHandle; public CassaLockStore() { diff --git a/music-core/src/main/java/org/onap/music/lockingservice/cassandra/LockCleanUpDaemon.java b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/LockCleanUpDaemon.java new file mode 100644 index 00000000..492a48f0 --- /dev/null +++ b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/LockCleanUpDaemon.java @@ -0,0 +1,132 @@ +/* + * ============LICENSE_START========================================== + * org.onap.music + * =================================================================== + * Copyright (c) 2019 AT&T Intellectual Property + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ + +package org.onap.music.lockingservice.cassandra; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.onap.music.datastore.MusicDataStoreHandle; +import org.onap.music.datastore.PreparedQueryObject; +import org.onap.music.eelf.logging.EELFLoggerDelegate; +import org.onap.music.exceptions.MusicQueryException; +import org.onap.music.exceptions.MusicServiceException; +import org.onap.music.main.MusicCore; +import org.onap.music.main.MusicUtil; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; + +public class LockCleanUpDaemon extends Thread { + + boolean terminated = false; + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(LockCleanUpDaemon.class); + + + public LockCleanUpDaemon() { + } + + @Override + public void run() { + if (MusicUtil.getLockDaemonSleepTimeMs()<0) { + terminate(); + } + while (!terminated) { + try { + cleanupStaleLocks(); + } catch (MusicServiceException e) { + logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to clean up locks", e); + } + try { + Thread.sleep(MusicUtil.getLockDaemonSleepTimeMs()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + private void cleanupStaleLocks() throws MusicServiceException { + Set<String> lockQTables = getLockQTables(); + logger.info(EELFLoggerDelegate.applicationLogger, "Lock q tables found: " + lockQTables); + for(String lockTable: lockQTables) { + try { + cleanUpLocksFromTable(lockTable); + } catch (MusicServiceException e) { + logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to clear locks on table " + lockTable, e); + } + } + } + + + private Set<String> getLockQTables() throws MusicServiceException { + Set<String> keyspacesToCleanUp = MusicUtil.getKeyspacesToCleanLocks(); + Set<String> lockQTables = new HashSet<>(); + + PreparedQueryObject query = new PreparedQueryObject(); + query.appendQueryString("SELECT keyspace_name, table_name FROM system_schema.tables;"); + ResultSet results = MusicCore.get(query); + + for (Row row: results) { + if (keyspacesToCleanUp.contains(row.getString("keyspace_name")) + && row.getString("table_name").toLowerCase().startsWith(CassaLockStore.table_prepend_name.toLowerCase()) ) { + lockQTables.add(row.getString("keyspace_name") + "." + row.getString("table_name")); + } + } + return lockQTables; + } + + private void cleanUpLocksFromTable(String lockTable) throws MusicServiceException { + PreparedQueryObject query = new PreparedQueryObject(); + query.appendQueryString("SELECT * from " + lockTable); + ResultSet results = MusicCore.get(query); + for (Row lock: results) { + if (!lock.isNull("lockreference")) { + try { + deleteLockIfStale(lockTable, lock); + } catch (MusicServiceException e) { + logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to delete a potentially stale lock " + lock, e); + } + } + } + } + + + private void deleteLockIfStale(String lockTable, Row lock) throws MusicServiceException { + if (lock.isNull("createtime") && lock.isNull("acquiretime")) { + return; + } + + long createTime = lock.isNull("createtime") ? 0 : Long.parseLong(lock.getString("createtime")); + long acquireTime = lock.isNull("acquiretime") ? 0 : Long.parseLong(lock.getString("acquiretime")); + long row_access_time = Math.max(createTime, acquireTime); + if (System.currentTimeMillis() > row_access_time + MusicUtil.getDefaultLockLeasePeriod()) { + logger.info(EELFLoggerDelegate.applicationLogger, "Stale lock detected and being removed: " + lock); + PreparedQueryObject query = new PreparedQueryObject(); + query.appendQueryString("DELETE FROM " + lockTable + " WHERE key='" + lock.getString("key") + "' AND " + + "lockreference=" + lock.getLong("lockreference") + " IF EXISTS;"); + MusicDataStoreHandle.getDSHandle().getSession().execute(query.getQuery()); + } + } + + public void terminate() { + terminated = true; + } +} diff --git a/music-core/src/main/java/org/onap/music/main/MusicUtil.java b/music-core/src/main/java/org/onap/music/main/MusicUtil.java index 78d17c60..865ca01f 100644 --- a/music-core/src/main/java/org/onap/music/main/MusicUtil.java +++ b/music-core/src/main/java/org/onap/music/main/MusicUtil.java @@ -34,9 +34,11 @@ import java.io.FileNotFoundException; import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Scanner; +import java.util.Set; import java.util.UUID; import javax.ws.rs.core.Response; @@ -107,6 +109,8 @@ public class MusicUtil { private static boolean debug = true; private static String version = "0.0.0"; private static String build = ""; + private static long lockDaemonSleepms = 1000; + private static Set<String> keyspacesToCleanLocks = new HashSet<>(); private static String musicPropertiesFilePath = PROPERTIES_FILE; // private static final String[] propKeys = new String[] { MusicUtil.class.getDeclaredMethod(arg0, )"build","cassandra.host", "debug", @@ -160,7 +164,26 @@ public class MusicUtil { private static Boolean clientIdRequired = false; private static Boolean messageIdRequired = false; private static String cipherEncKey = ""; + + private static long createLockWaitPeriod = 300; + private static int createLockWaitIncrement = 50; + + public static long getCreateLockWaitPeriod() { + return createLockWaitPeriod; + } + + public static void setCreateLockWaitPeriod(long createLockWaitPeriod) { + MusicUtil.createLockWaitPeriod = createLockWaitPeriod; + } + public static int getCreateLockWaitIncrement() { + return createLockWaitIncrement; + } + + public static void setCreateLockWaitIncrement(int createLockWaitIncrement) { + MusicUtil.createLockWaitIncrement = createLockWaitIncrement; + } + public MusicUtil() { throw new IllegalStateException("Utility Class"); } @@ -810,6 +833,27 @@ public class MusicUtil { MusicUtil.messageIdRequired = messageIdRequired; } + /** + * @return the sleep time, in milliseconds, for the lock cleanup daemon + */ + public static long getLockDaemonSleepTimeMs() { + return lockDaemonSleepms; + } + + /** + * set the sleep time, in milliseconds, for the lock cleanup daemon + */ + public static void setLockDaemonSleepTimeMs(long timeoutms) { + MusicUtil.lockDaemonSleepms = timeoutms; + } + + public static Set<String> getKeyspacesToCleanLocks() { + return keyspacesToCleanLocks; + } + + public static void setKeyspacesToCleanLocks(Set<String> keyspaces) { + MusicUtil.keyspacesToCleanLocks = keyspaces; + } public static String getCipherEncKey() { return MusicUtil.cipherEncKey; diff --git a/music-core/src/main/java/org/onap/music/service/MusicCoreService.java b/music-core/src/main/java/org/onap/music/service/MusicCoreService.java index 2fc88145..753d9b28 100644 --- a/music-core/src/main/java/org/onap/music/service/MusicCoreService.java +++ b/music-core/src/main/java/org/onap/music/service/MusicCoreService.java @@ -97,7 +97,7 @@ public interface MusicCoreService { * @param owner the owner of the lock, for deadlock prevention */ public String createLockReference(String fullyQualifiedKey, String owner) throws MusicLockingException; - + /** * Create a lock ref in the music lock store * @param fullyQualifiedKey the key to create a lock on diff --git a/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java b/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java index 63f2d14c..e1416f86 100644 --- a/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java +++ b/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java @@ -27,12 +27,13 @@ package org.onap.music.service.impl; import java.io.StringWriter; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.StringTokenizer; - +import java.util.concurrent.atomic.AtomicInteger; import javax.ws.rs.core.MultivaluedMap; import org.onap.music.datastore.Condition; @@ -75,7 +76,9 @@ public class MusicCassaCore implements MusicCoreService { private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class); private static MusicCassaCore musicCassaCoreInstance = null; private static Set<String> set = Collections.synchronizedSet(new HashSet<String>()); - + HashMap<String, Integer> map = new HashMap<>(); + AtomicInteger wait = new AtomicInteger(0); + private MusicCassaCore() { // not going to happen } @@ -119,10 +122,11 @@ public class MusicCassaCore implements MusicCoreService { public String createLockReference(String fullyQualifiedKey, String owner) throws MusicLockingException { return createLockReference(fullyQualifiedKey, LockType.WRITE, owner); } - + + /** * This will be called for Atomic calls - * + * it ensures that only one thread tries to create a lock on each key at a time */ public String createLockReferenceAtomic(String fullyQualifiedKey, LockType locktype) throws MusicLockingException { String[] splitString = fullyQualifiedKey.split("\\."); |