From 0922e1c8a4e095668707a3973b6e8a5ee2fe3329 Mon Sep 17 00:00:00 2001 From: Mohammad Salehe Date: Wed, 7 Nov 2018 15:56:21 -0500 Subject: Improve timestamp and query handling Add timeSlot parameter to CassaDataStore.executePut to prevent inconsistent timestamps Rename CassaDataStore.executeEventualGet and CassaDataStore.executeCriticalPut to reflect their real functionality Use simple bound statement instead of prepared queries to improve performance Change-Id: I439c5279f1c8e645740a9650ab8807c5ffa1725a Issue-ID: MUSIC-148 Signed-off-by: Mohammad Salehe --- .../conductor/conditionals/MusicConditional.java | 4 +- .../org/onap/music/datastore/CassaDataStore.java | 124 +++++++++++---------- .../org/onap/music/datastore/CassaLockStore.java | 10 +- src/main/java/org/onap/music/main/MusicCore.java | 18 +-- .../onap/music/unittests/MusicDataStoreTest.java | 5 +- .../onap/music/unittests/MusicLockStoreTest.java | 5 +- .../org/onap/music/unittests/TestMusicCore.java | 6 +- 7 files changed, 83 insertions(+), 89 deletions(-) (limited to 'src') diff --git a/src/main/java/org/onap/music/conductor/conditionals/MusicConditional.java b/src/main/java/org/onap/music/conductor/conditionals/MusicConditional.java index 8aadcba3..2bed5fea 100644 --- a/src/main/java/org/onap/music/conductor/conditionals/MusicConditional.java +++ b/src/main/java/org/onap/music/conductor/conditionals/MusicConditional.java @@ -122,7 +122,7 @@ public class MusicConditional { ReturnType lockAcqResult = MusicCore.acquireLock(fullyQualifiedKey, lockId); if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) { try { - results = MusicCore.getDSHandle().executeCriticalGet(queryBank.get(MusicUtil.SELECT)); + results = MusicCore.getDSHandle().executeQuorumConsistencyGet(queryBank.get(MusicUtil.SELECT)); } catch (Exception e) { return new ReturnType(ResultType.FAILURE, e.getMessage()); } @@ -178,7 +178,7 @@ public class MusicConditional { try { ReturnType lockAcqResult = MusicCore.acquireLockWithLease(key, lockId, leasePeriod); if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) { - Row row = MusicCore.getDSHandle().executeCriticalGet(queryBank.get(MusicUtil.SELECT)).one(); + Row row = MusicCore.getDSHandle().executeQuorumConsistencyGet(queryBank.get(MusicUtil.SELECT)).one(); if(row != null) { Map updatedValues = cascadeColumnUpdateSpecific(row, cascadeColumnValues, casscadeColumnName, planId); diff --git a/src/main/java/org/onap/music/datastore/CassaDataStore.java b/src/main/java/org/onap/music/datastore/CassaDataStore.java index ec0b2581..a56cf63a 100644 --- a/src/main/java/org/onap/music/datastore/CassaDataStore.java +++ b/src/main/java/org/onap/music/datastore/CassaDataStore.java @@ -30,7 +30,6 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.UUID; import com.datastax.driver.core.*; import org.onap.music.eelf.logging.EELFLoggerDelegate; @@ -44,8 +43,6 @@ import com.datastax.driver.core.ColumnDefinitions.Definition; import com.datastax.driver.core.exceptions.AlreadyExistsException; import com.datastax.driver.core.exceptions.InvalidQueryException; import com.datastax.driver.core.exceptions.NoHostAvailableException; -import com.datastax.driver.core.utils.UUIDs; -import com.sun.jersey.core.util.Base64; /** * @author nelson24 @@ -81,11 +78,12 @@ import com.sun.jersey.core.util.Base64; */ public class CassaDataStore { + public static final String CONSISTENCY_LEVEL_ONE = "ONE"; + public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM"; + private Session session; private Cluster cluster; - - /** * @param session */ @@ -94,7 +92,7 @@ public class CassaDataStore { } /** - * @param session + * @param */ public Session getSession() { return session; @@ -333,22 +331,38 @@ public class CassaDataStore { return resultMap; } + /** + * This Method performs DDL and DML operations on Cassandra using specified consistency level outside any time-slot + * + * @param queryObject Object containing cassandra prepared query and values. + * @param consistency Specify consistency level for data synchronization across cassandra + * replicas + * @return Boolean Indicates operation success or failure + * @throws MusicServiceException + * @throws MusicQueryException + */ + public boolean executePut(PreparedQueryObject queryObject, String consistency) + throws MusicServiceException, MusicQueryException { + return executePut(queryObject, consistency, 0); + } // Prepared Statements 1802 additions /** * This Method performs DDL and DML operations on Cassandra using specified consistency level * * @param queryObject Object containing cassandra prepared query and values. - * @param consistency Specify consistency level for data synchronization across cassandra + * @param consistencyLevel Specify consistency level for data synchronization across cassandra * replicas + * @param timeSlot Specify timestamp time-slot * @return Boolean Indicates operation success or failure * @throws MusicServiceException * @throws MusicQueryException */ - public boolean executePut(PreparedQueryObject queryObject, String consistency) - throws MusicServiceException, MusicQueryException { + public boolean executePut(PreparedQueryObject queryObject, String consistencyLevel, long timeSlot) + throws MusicServiceException, MusicQueryException { boolean result = false; + long timeOfWrite = System.currentTimeMillis(); if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) { logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); @@ -359,10 +373,10 @@ public class CassaDataStore { "In preprared Execute Put: the actual insert query:" + queryObject.getQuery() + "; the values" + queryObject.getValues()); - PreparedStatement preparedInsert = null; + SimpleStatement statement; try { - - preparedInsert = session.prepare(queryObject.getQuery()); + + statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray()); } catch(InvalidQueryException iqe) { logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); throw new MusicQueryException(iqe.getMessage()); @@ -372,18 +386,18 @@ public class CassaDataStore { } try { - if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) { + if (consistencyLevel.equalsIgnoreCase(MusicUtil.CRITICAL)) { logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query"); - preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM); - } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) { + statement.setConsistencyLevel(ConsistencyLevel.QUORUM); + } else if (consistencyLevel.equalsIgnoreCase(MusicUtil.EVENTUAL)) { logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query"); - preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE); + statement.setConsistencyLevel(ConsistencyLevel.ONE); } - BoundStatement boundStatement = preparedInsert.bind(queryObject.getValues().toArray()); - boundStatement.setDefaultTimestamp(MusicUtil.v2sTimeStampInMicroseconds(0, System.currentTimeMillis())); + long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite); + statement.setDefaultTimestamp(timestamp); - ResultSet rs = session.execute(boundStatement); + ResultSet rs = session.execute(statement); result = rs.wasApplied(); } catch (AlreadyExistsException ae) { @@ -401,66 +415,60 @@ public class CassaDataStore { } /** - * This method performs DDL operations on Cassandra using consistency level ONE. - * + * This method performs DDL operations on Cassandra using consistency specified consistency. + * * @param queryObject Object containing cassandra prepared query and values. - * @return ResultSet - * @throws MusicServiceException - * @throws MusicQueryException */ - public ResultSet executeEventualGet(PreparedQueryObject queryObject) - throws MusicServiceException, MusicQueryException { + public ResultSet executeGet(PreparedQueryObject queryObject, String consistencyLevel) + throws MusicServiceException, MusicQueryException { if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) { - logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); - throw new MusicQueryException("Ill formed queryObject for the request = " + "[" - + queryObject.getQuery() + "]"); + logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); + throw new MusicQueryException("Ill formed queryObject for the request = " + "[" + + queryObject.getQuery() + "]"); } logger.info(EELFLoggerDelegate.applicationLogger, - "Executing Eventual get query:" + queryObject.getQuery()); - + "Executing Eventual get query:" + queryObject.getQuery()); + ResultSet results = null; try { - PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery()); - preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE); - results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray())); + SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray()); + + if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) { + statement.setConsistencyLevel(ConsistencyLevel.ONE); + } + else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) { + statement.setConsistencyLevel(ConsistencyLevel.QUORUM); + } + + results = session.execute(statement); } catch (Exception ex) { - logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); - throw new MusicServiceException(ex.getMessage()); + logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); + throw new MusicServiceException(ex.getMessage()); } return results; } + /** + * This method performs DDL operations on Cassandra using consistency level ONE. + * + * @param queryObject Object containing cassandra prepared query and values. + */ + public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject) + throws MusicServiceException, MusicQueryException { + return executeGet(queryObject, CONSISTENCY_LEVEL_ONE); + } + /** * * This method performs DDL operation on Cassandra using consistency level QUORUM. * * @param queryObject Object containing cassandra prepared query and values. - * @return ResultSet - * @throws MusicServiceException - * @throws MusicQueryException */ - public ResultSet executeCriticalGet(PreparedQueryObject queryObject) + public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject) throws MusicServiceException, MusicQueryException { - if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) { - logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); - throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "[" - + queryObject.getQuery() + "]"); - } - logger.info(EELFLoggerDelegate.applicationLogger, - "Executing Critical get query:" + queryObject.getQuery()); - PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery()); - preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM); - ResultSet results = null; - try { - results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray())); - } catch (Exception ex) { - logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); - throw new MusicServiceException(ex.getMessage()); - } - return results; - + return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM); } } diff --git a/src/main/java/org/onap/music/datastore/CassaLockStore.java b/src/main/java/org/onap/music/datastore/CassaLockStore.java index 67e96533..c1bf4784 100644 --- a/src/main/java/org/onap/music/datastore/CassaLockStore.java +++ b/src/main/java/org/onap/music/datastore/CassaLockStore.java @@ -1,16 +1,13 @@ package org.onap.music.datastore; import java.util.List; -import java.util.UUID; import org.onap.music.eelf.logging.EELFLoggerDelegate; import org.onap.music.exceptions.MusicQueryException; import org.onap.music.exceptions.MusicServiceException; -import org.onap.music.main.MusicUtil; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; -import com.datastax.driver.core.utils.UUIDs; /* * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state. @@ -57,8 +54,7 @@ public class CassaLockStore { String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, PRIMARY KEY ((key), lockReference) ) " + "WITH CLUSTERING ORDER BY (lockReference ASC);"; - System.out.println(tabQuery); - PreparedQueryObject queryObject = new PreparedQueryObject(); + PreparedQueryObject queryObject = new PreparedQueryObject(); queryObject.appendQueryString(tabQuery); boolean result; @@ -86,7 +82,7 @@ public class CassaLockStore { queryObject.addValue(lockName); queryObject.appendQueryString(selectQuery); - ResultSet gqResult = dsHandle.executeEventualGet(queryObject); + ResultSet gqResult = dsHandle.executeOneConsistencyGet(queryObject); List latestGuardRow = gqResult.all(); long prevGuard = 0; @@ -140,7 +136,7 @@ public class CassaLockStore { String selectQuery = "select * from "+keyspace+"."+table+" where key='"+key+"' LIMIT 1;"; PreparedQueryObject queryObject = new PreparedQueryObject(); queryObject.appendQueryString(selectQuery); - ResultSet results = dsHandle.executeEventualGet(queryObject); + ResultSet results = dsHandle.executeOneConsistencyGet(queryObject); Row row = results.one(); String lockReference = "" + row.getLong("lockReference"); String createTime = row.getString("createTime"); diff --git a/src/main/java/org/onap/music/main/MusicCore.java b/src/main/java/org/onap/music/main/MusicCore.java index cf2a47ee..f085be05 100644 --- a/src/main/java/org/onap/music/main/MusicCore.java +++ b/src/main/java/org/onap/music/main/MusicCore.java @@ -230,7 +230,7 @@ public class MusicCore { String query = "select * from "+syncTable+" where key='"+fullyQualifiedKey+"';"; PreparedQueryObject readQueryObject = new PreparedQueryObject(); readQueryObject.appendQueryString(query); - ResultSet results = getDSHandle().executeCriticalGet(readQueryObject); + ResultSet results = getDSHandle().executeQuorumConsistencyGet(readQueryObject); if (results.all().size() != 0) { logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!"); try { @@ -315,7 +315,7 @@ public class MusicCore { selectQuery.addValue(cqlFormattedPrimaryKeyValue); ResultSet results = null; try { - results = getDSHandle().executeCriticalGet(selectQuery); + results = getDSHandle().executeQuorumConsistencyGet(selectQuery); // write it back to a quorum Row row = results.one(); ColumnDefinitions colInfo = row.getColumnDefinitions(); @@ -356,7 +356,7 @@ public class MusicCore { public static ResultSet quorumGet(PreparedQueryObject query) { ResultSet results = null; try { - results = getDSHandle().executeCriticalGet(query); + results = getDSHandle().executeQuorumConsistencyGet(query); } catch (MusicServiceException | MusicQueryException e) { logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR); @@ -518,15 +518,9 @@ public class MusicCore { + e.getMessage()); } - String query = queryObject.getQuery(); - long timeOfWrite = System.currentTimeMillis(); long lockOrdinal = Long.parseLong(lockReference); - long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite); - // TODO: use Statement instead of modifying query - query = query.replaceFirst("SET", "USING TIMESTAMP "+ ts + " SET"); - queryObject.replaceQueryString(query); CassaDataStore dsHandle = getDSHandle(); - dsHandle.executePut(queryObject, MusicUtil.CRITICAL); + dsHandle.executePut(queryObject, MusicUtil.CRITICAL, lockOrdinal); long end = System.currentTimeMillis(); logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms"); }catch (MusicQueryException | MusicServiceException | MusicLockingException e) { @@ -572,7 +566,7 @@ public class MusicCore { public static ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException { ResultSet results = null; try { - results = getDSHandle().executeEventualGet(queryObject); + results = getDSHandle().executeOneConsistencyGet(queryObject); } catch (MusicQueryException | MusicServiceException e) { logger.error(EELFLoggerDelegate.errorLogger,e.getMessage()); throw new MusicServiceException(e.getMessage()); @@ -599,7 +593,7 @@ public class MusicCore { ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference); if(result.getResult().equals(ResultType.FAILURE)) return null;//not top of the lock store q - results = getDSHandle().executeCriticalGet(queryObject); + results = getDSHandle().executeQuorumConsistencyGet(queryObject); } catch (MusicQueryException | MusicServiceException | MusicLockingException e) { logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR); } diff --git a/src/test/java/org/onap/music/unittests/MusicDataStoreTest.java b/src/test/java/org/onap/music/unittests/MusicDataStoreTest.java index 3f7fd3b7..b2c6df8a 100644 --- a/src/test/java/org/onap/music/unittests/MusicDataStoreTest.java +++ b/src/test/java/org/onap/music/unittests/MusicDataStoreTest.java @@ -29,7 +29,6 @@ import org.junit.BeforeClass; import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; -import org.mockito.Mock; import org.onap.music.exceptions.MusicQueryException; import org.onap.music.exceptions.MusicServiceException; @@ -105,7 +104,7 @@ public class MusicDataStoreTest { boolean result = false; int count = 0; ResultSet output = null; - output = dataStore.executeEventualGet(testObject); + output = dataStore.executeOneConsistencyGet(testObject); System.out.println(output); ; for (Row row : output) { @@ -124,7 +123,7 @@ public class MusicDataStoreTest { boolean result = false; int count = 0; ResultSet output = null; - output = dataStore.executeCriticalGet(testObject); + output = dataStore.executeQuorumConsistencyGet(testObject); System.out.println(output); ; for (Row row : output) { diff --git a/src/test/java/org/onap/music/unittests/MusicLockStoreTest.java b/src/test/java/org/onap/music/unittests/MusicLockStoreTest.java index 86774538..a027fd93 100644 --- a/src/test/java/org/onap/music/unittests/MusicLockStoreTest.java +++ b/src/test/java/org/onap/music/unittests/MusicLockStoreTest.java @@ -29,7 +29,6 @@ import org.junit.BeforeClass; import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; -import org.mockito.Mock; import org.onap.music.exceptions.MusicQueryException; import org.onap.music.exceptions.MusicServiceException; @@ -113,7 +112,7 @@ public class MusicLockStoreTest { boolean result = false; int count = 0; ResultSet output = null; - output = dataStore.executeEventualGet(testObject); + output = dataStore.executeOneConsistencyGet(testObject); System.out.println(output); ; for (Row row : output) { @@ -132,7 +131,7 @@ public class MusicLockStoreTest { boolean result = false; int count = 0; ResultSet output = null; - output = dataStore.executeCriticalGet(testObject); + output = dataStore.executeQuorumConsistencyGet(testObject); System.out.println(output); ; for (Row row : output) { diff --git a/src/test/java/org/onap/music/unittests/TestMusicCore.java b/src/test/java/org/onap/music/unittests/TestMusicCore.java index 01d2ffb6..f7b9c0d0 100644 --- a/src/test/java/org/onap/music/unittests/TestMusicCore.java +++ b/src/test/java/org/onap/music/unittests/TestMusicCore.java @@ -3,8 +3,6 @@ package org.onap.music.unittests; import static org.junit.Assert.assertEquals; import java.util.HashMap; -import java.util.Iterator; -import java.util.List; import java.util.Map; import org.junit.AfterClass; @@ -68,7 +66,7 @@ public class TestMusicCore { queryObject = new PreparedQueryObject(); String systemQuery = "SELECT keyspace_name FROM system_schema.keyspaces where keyspace_name='"+keyspace.toLowerCase()+"';"; queryObject.appendQueryString(systemQuery); - ResultSet rs = dataStore.executeEventualGet(queryObject); + ResultSet rs = dataStore.executeOneConsistencyGet(queryObject); assert rs.all().size()> 0; } @@ -84,7 +82,7 @@ public class TestMusicCore { queryObject = new PreparedQueryObject(); String systemQuery = "SELECT table_name FROM system_schema.tables where keyspace_name='"+keyspace.toLowerCase()+"' and table_name='"+table.toLowerCase()+"';"; queryObject.appendQueryString(systemQuery); - ResultSet rs = dataStore.executeEventualGet(queryObject); + ResultSet rs = dataStore.executeOneConsistencyGet(queryObject); assert rs.all().size()> 0; } -- cgit 1.2.3-korg