aboutsummaryrefslogtreecommitdiffstats
path: root/music-core
diff options
context:
space:
mode:
authorThomas Nelson <nelson24@att.com>2020-01-29 19:47:22 +0000
committerGerrit Code Review <gerrit@onap.org>2020-01-29 19:47:22 +0000
commita40b03fe9039d247dcc5a366f9778155a92c947b (patch)
tree045768c0d8508773014b6b00f31f573b36e469d1 /music-core
parenta0a52fadf838954bf4e133d8dd94f5114ddc4ac4 (diff)
parent25d19ed4c6285b419e6ea7b9f0003c60668d4218 (diff)
Merge "Bug fixes, syncronization, and clean up daemon"
Diffstat (limited to 'music-core')
-rwxr-xr-xmusic-core/src/main/java/org/onap/music/datastore/MusicDataStore.java31
-rw-r--r--music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java2
-rw-r--r--music-core/src/main/java/org/onap/music/lockingservice/cassandra/LockCleanUpDaemon.java132
-rw-r--r--music-core/src/main/java/org/onap/music/main/MusicUtil.java44
-rw-r--r--music-core/src/main/java/org/onap/music/service/MusicCoreService.java2
-rw-r--r--music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java12
6 files changed, 203 insertions, 20 deletions
diff --git a/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java b/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java
index 7f6c42ca..cb22c0f4 100755
--- a/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java
+++ b/music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java
@@ -374,22 +374,25 @@ public class MusicDataStore {
ResultSet rs = session.execute(preparedInsert);
result = rs.wasApplied();
-
- }
- catch (AlreadyExistsException ae) {
- // logger.error(EELFLoggerDelegate.errorLogger,"AlreadExistsException: " + ae.getMessage(),AppMessages.QUERYERROR,
- // ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
- throw new MusicQueryException("AlreadyExistsException: " + ae.getMessage(),ae);
- } catch ( InvalidQueryException e ) {
- // logger.error(EELFLoggerDelegate.errorLogger,"InvalidQueryException: " + e.getMessage(),AppMessages.SESSIONFAILED + " ["
- // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
- throw new MusicQueryException("InvalidQueryException: " + e.getMessage(),e);
+ } catch (AlreadyExistsException ae) {
+ throw new MusicServiceException("Already Exists Exception: " + ae.getMessage());
+ } catch (InvalidQueryException e) {
+ if (e.getMessage().contains("unconfigured table")) {
+ throw new MusicServiceException("Invalid Query Exception: " + e.getMessage());
+ } else {
+ logger.info(EELFLoggerDelegate.applicationLogger, "Query Exception: " + e.getMessage(),
+ AppMessages.SESSIONFAILED + " [" + queryObject.getQuery() + "]", ErrorSeverity.INFO,
+ ErrorTypes.QUERYERROR, e);
+ throw new MusicServiceException("Query Exception: " + e.getMessage());
+ }
} catch (Exception e) {
- // logger.error(EELFLoggerDelegate.errorLogger,e.getClass().toString() + ":" + e.getMessage(),AppMessages.SESSIONFAILED + " ["
- // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, e);
- throw new MusicServiceException("Executing Session Failure for Request = " + "["
- + queryObject.getQuery() + "]" + " Reason = " + e.getMessage(),e);
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),
+ AppMessages.SESSIONFAILED + " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR,
+ ErrorTypes.QUERYERROR, e);
+ throw new MusicServiceException("Executing Session Failure for Request = " + "[" + queryObject.getQuery()
+ + "]" + " Reason = " + e.getMessage());
}
+
return result;
}
diff --git a/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
index e9533344..a727357f 100644
--- a/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
+++ b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
@@ -50,7 +50,7 @@ import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
public class CassaLockStore {
private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class);
- private static String table_prepend_name = "lockQ_";
+ public static final String table_prepend_name = "lockQ_";
private MusicDataStore dsHandle;
public CassaLockStore() {
diff --git a/music-core/src/main/java/org/onap/music/lockingservice/cassandra/LockCleanUpDaemon.java b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/LockCleanUpDaemon.java
new file mode 100644
index 00000000..492a48f0
--- /dev/null
+++ b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/LockCleanUpDaemon.java
@@ -0,0 +1,132 @@
+/*
+ * ============LICENSE_START==========================================
+ * org.onap.music
+ * ===================================================================
+ * Copyright (c) 2019 AT&T Intellectual Property
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=============================================
+ * ====================================================================
+ */
+
+package org.onap.music.lockingservice.cassandra;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.onap.music.datastore.MusicDataStoreHandle;
+import org.onap.music.datastore.PreparedQueryObject;
+import org.onap.music.eelf.logging.EELFLoggerDelegate;
+import org.onap.music.exceptions.MusicQueryException;
+import org.onap.music.exceptions.MusicServiceException;
+import org.onap.music.main.MusicCore;
+import org.onap.music.main.MusicUtil;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+
+public class LockCleanUpDaemon extends Thread {
+
+ boolean terminated = false;
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(LockCleanUpDaemon.class);
+
+
+ public LockCleanUpDaemon() {
+ }
+
+ @Override
+ public void run() {
+ if (MusicUtil.getLockDaemonSleepTimeMs()<0) {
+ terminate();
+ }
+ while (!terminated) {
+ try {
+ cleanupStaleLocks();
+ } catch (MusicServiceException e) {
+ logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to clean up locks", e);
+ }
+ try {
+ Thread.sleep(MusicUtil.getLockDaemonSleepTimeMs());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void cleanupStaleLocks() throws MusicServiceException {
+ Set<String> lockQTables = getLockQTables();
+ logger.info(EELFLoggerDelegate.applicationLogger, "Lock q tables found: " + lockQTables);
+ for(String lockTable: lockQTables) {
+ try {
+ cleanUpLocksFromTable(lockTable);
+ } catch (MusicServiceException e) {
+ logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to clear locks on table " + lockTable, e);
+ }
+ }
+ }
+
+
+ private Set<String> getLockQTables() throws MusicServiceException {
+ Set<String> keyspacesToCleanUp = MusicUtil.getKeyspacesToCleanLocks();
+ Set<String> lockQTables = new HashSet<>();
+
+ PreparedQueryObject query = new PreparedQueryObject();
+ query.appendQueryString("SELECT keyspace_name, table_name FROM system_schema.tables;");
+ ResultSet results = MusicCore.get(query);
+
+ for (Row row: results) {
+ if (keyspacesToCleanUp.contains(row.getString("keyspace_name"))
+ && row.getString("table_name").toLowerCase().startsWith(CassaLockStore.table_prepend_name.toLowerCase()) ) {
+ lockQTables.add(row.getString("keyspace_name") + "." + row.getString("table_name"));
+ }
+ }
+ return lockQTables;
+ }
+
+ private void cleanUpLocksFromTable(String lockTable) throws MusicServiceException {
+ PreparedQueryObject query = new PreparedQueryObject();
+ query.appendQueryString("SELECT * from " + lockTable);
+ ResultSet results = MusicCore.get(query);
+ for (Row lock: results) {
+ if (!lock.isNull("lockreference")) {
+ try {
+ deleteLockIfStale(lockTable, lock);
+ } catch (MusicServiceException e) {
+ logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to delete a potentially stale lock " + lock, e);
+ }
+ }
+ }
+ }
+
+
+ private void deleteLockIfStale(String lockTable, Row lock) throws MusicServiceException {
+ if (lock.isNull("createtime") && lock.isNull("acquiretime")) {
+ return;
+ }
+
+ long createTime = lock.isNull("createtime") ? 0 : Long.parseLong(lock.getString("createtime"));
+ long acquireTime = lock.isNull("acquiretime") ? 0 : Long.parseLong(lock.getString("acquiretime"));
+ long row_access_time = Math.max(createTime, acquireTime);
+ if (System.currentTimeMillis() > row_access_time + MusicUtil.getDefaultLockLeasePeriod()) {
+ logger.info(EELFLoggerDelegate.applicationLogger, "Stale lock detected and being removed: " + lock);
+ PreparedQueryObject query = new PreparedQueryObject();
+ query.appendQueryString("DELETE FROM " + lockTable + " WHERE key='" + lock.getString("key") + "' AND " +
+ "lockreference=" + lock.getLong("lockreference") + " IF EXISTS;");
+ MusicDataStoreHandle.getDSHandle().getSession().execute(query.getQuery());
+ }
+ }
+
+ public void terminate() {
+ terminated = true;
+ }
+}
diff --git a/music-core/src/main/java/org/onap/music/main/MusicUtil.java b/music-core/src/main/java/org/onap/music/main/MusicUtil.java
index 78d17c60..865ca01f 100644
--- a/music-core/src/main/java/org/onap/music/main/MusicUtil.java
+++ b/music-core/src/main/java/org/onap/music/main/MusicUtil.java
@@ -34,9 +34,11 @@ import java.io.FileNotFoundException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
+import java.util.Set;
import java.util.UUID;
import javax.ws.rs.core.Response;
@@ -107,6 +109,8 @@ public class MusicUtil {
private static boolean debug = true;
private static String version = "0.0.0";
private static String build = "";
+ private static long lockDaemonSleepms = 1000;
+ private static Set<String> keyspacesToCleanLocks = new HashSet<>();
private static String musicPropertiesFilePath = PROPERTIES_FILE;
// private static final String[] propKeys = new String[] { MusicUtil.class.getDeclaredMethod(arg0, )"build","cassandra.host", "debug",
@@ -160,7 +164,26 @@ public class MusicUtil {
private static Boolean clientIdRequired = false;
private static Boolean messageIdRequired = false;
private static String cipherEncKey = "";
+
+ private static long createLockWaitPeriod = 300;
+ private static int createLockWaitIncrement = 50;
+
+ public static long getCreateLockWaitPeriod() {
+ return createLockWaitPeriod;
+ }
+
+ public static void setCreateLockWaitPeriod(long createLockWaitPeriod) {
+ MusicUtil.createLockWaitPeriod = createLockWaitPeriod;
+ }
+ public static int getCreateLockWaitIncrement() {
+ return createLockWaitIncrement;
+ }
+
+ public static void setCreateLockWaitIncrement(int createLockWaitIncrement) {
+ MusicUtil.createLockWaitIncrement = createLockWaitIncrement;
+ }
+
public MusicUtil() {
throw new IllegalStateException("Utility Class");
}
@@ -810,6 +833,27 @@ public class MusicUtil {
MusicUtil.messageIdRequired = messageIdRequired;
}
+ /**
+ * @return the sleep time, in milliseconds, for the lock cleanup daemon
+ */
+ public static long getLockDaemonSleepTimeMs() {
+ return lockDaemonSleepms;
+ }
+
+ /**
+ * set the sleep time, in milliseconds, for the lock cleanup daemon
+ */
+ public static void setLockDaemonSleepTimeMs(long timeoutms) {
+ MusicUtil.lockDaemonSleepms = timeoutms;
+ }
+
+ public static Set<String> getKeyspacesToCleanLocks() {
+ return keyspacesToCleanLocks;
+ }
+
+ public static void setKeyspacesToCleanLocks(Set<String> keyspaces) {
+ MusicUtil.keyspacesToCleanLocks = keyspaces;
+ }
public static String getCipherEncKey() {
return MusicUtil.cipherEncKey;
diff --git a/music-core/src/main/java/org/onap/music/service/MusicCoreService.java b/music-core/src/main/java/org/onap/music/service/MusicCoreService.java
index 2fc88145..753d9b28 100644
--- a/music-core/src/main/java/org/onap/music/service/MusicCoreService.java
+++ b/music-core/src/main/java/org/onap/music/service/MusicCoreService.java
@@ -97,7 +97,7 @@ public interface MusicCoreService {
* @param owner the owner of the lock, for deadlock prevention
*/
public String createLockReference(String fullyQualifiedKey, String owner) throws MusicLockingException;
-
+
/**
* Create a lock ref in the music lock store
* @param fullyQualifiedKey the key to create a lock on
diff --git a/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java b/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
index 63f2d14c..e1416f86 100644
--- a/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
+++ b/music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
@@ -27,12 +27,13 @@ package org.onap.music.service.impl;
import java.io.StringWriter;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
-
+import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.core.MultivaluedMap;
import org.onap.music.datastore.Condition;
@@ -75,7 +76,9 @@ public class MusicCassaCore implements MusicCoreService {
private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
private static MusicCassaCore musicCassaCoreInstance = null;
private static Set<String> set = Collections.synchronizedSet(new HashSet<String>());
-
+ HashMap<String, Integer> map = new HashMap<>();
+ AtomicInteger wait = new AtomicInteger(0);
+
private MusicCassaCore() {
// not going to happen
}
@@ -119,10 +122,11 @@ public class MusicCassaCore implements MusicCoreService {
public String createLockReference(String fullyQualifiedKey, String owner) throws MusicLockingException {
return createLockReference(fullyQualifiedKey, LockType.WRITE, owner);
}
-
+
+
/**
* This will be called for Atomic calls
- *
+ * it ensures that only one thread tries to create a lock on each key at a time
*/
public String createLockReferenceAtomic(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
String[] splitString = fullyQualifiedKey.split("\\.");