diff options
Diffstat (limited to 'src/main/java/org/onap')
4 files changed, 77 insertions, 79 deletions
diff --git a/src/main/java/org/onap/music/conductor/conditionals/MusicConditional.java b/src/main/java/org/onap/music/conductor/conditionals/MusicConditional.java index 8aadcba3..2bed5fea 100644 --- a/src/main/java/org/onap/music/conductor/conditionals/MusicConditional.java +++ b/src/main/java/org/onap/music/conductor/conditionals/MusicConditional.java @@ -122,7 +122,7 @@ public class MusicConditional { ReturnType lockAcqResult = MusicCore.acquireLock(fullyQualifiedKey, lockId); if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) { try { - results = MusicCore.getDSHandle().executeCriticalGet(queryBank.get(MusicUtil.SELECT)); + results = MusicCore.getDSHandle().executeQuorumConsistencyGet(queryBank.get(MusicUtil.SELECT)); } catch (Exception e) { return new ReturnType(ResultType.FAILURE, e.getMessage()); } @@ -178,7 +178,7 @@ public class MusicConditional { try { ReturnType lockAcqResult = MusicCore.acquireLockWithLease(key, lockId, leasePeriod); if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) { - Row row = MusicCore.getDSHandle().executeCriticalGet(queryBank.get(MusicUtil.SELECT)).one(); + Row row = MusicCore.getDSHandle().executeQuorumConsistencyGet(queryBank.get(MusicUtil.SELECT)).one(); if(row != null) { Map<String, String> updatedValues = cascadeColumnUpdateSpecific(row, cascadeColumnValues, casscadeColumnName, planId); diff --git a/src/main/java/org/onap/music/datastore/CassaDataStore.java b/src/main/java/org/onap/music/datastore/CassaDataStore.java index ec0b2581..a56cf63a 100644 --- a/src/main/java/org/onap/music/datastore/CassaDataStore.java +++ b/src/main/java/org/onap/music/datastore/CassaDataStore.java @@ -30,7 +30,6 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.UUID; import com.datastax.driver.core.*; import org.onap.music.eelf.logging.EELFLoggerDelegate; @@ -44,8 +43,6 @@ import com.datastax.driver.core.ColumnDefinitions.Definition; import com.datastax.driver.core.exceptions.AlreadyExistsException; import com.datastax.driver.core.exceptions.InvalidQueryException; import com.datastax.driver.core.exceptions.NoHostAvailableException; -import com.datastax.driver.core.utils.UUIDs; -import com.sun.jersey.core.util.Base64; /** * @author nelson24 @@ -81,11 +78,12 @@ import com.sun.jersey.core.util.Base64; */ public class CassaDataStore { + public static final String CONSISTENCY_LEVEL_ONE = "ONE"; + public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM"; + private Session session; private Cluster cluster; - - /** * @param session */ @@ -94,7 +92,7 @@ public class CassaDataStore { } /** - * @param session + * @param */ public Session getSession() { return session; @@ -333,22 +331,38 @@ public class CassaDataStore { return resultMap; } + /** + * This Method performs DDL and DML operations on Cassandra using specified consistency level outside any time-slot + * + * @param queryObject Object containing cassandra prepared query and values. + * @param consistency Specify consistency level for data synchronization across cassandra + * replicas + * @return Boolean Indicates operation success or failure + * @throws MusicServiceException + * @throws MusicQueryException + */ + public boolean executePut(PreparedQueryObject queryObject, String consistency) + throws MusicServiceException, MusicQueryException { + return executePut(queryObject, consistency, 0); + } // Prepared Statements 1802 additions /** * This Method performs DDL and DML operations on Cassandra using specified consistency level * * @param queryObject Object containing cassandra prepared query and values. - * @param consistency Specify consistency level for data synchronization across cassandra + * @param consistencyLevel Specify consistency level for data synchronization across cassandra * replicas + * @param timeSlot Specify timestamp time-slot * @return Boolean Indicates operation success or failure * @throws MusicServiceException * @throws MusicQueryException */ - public boolean executePut(PreparedQueryObject queryObject, String consistency) - throws MusicServiceException, MusicQueryException { + public boolean executePut(PreparedQueryObject queryObject, String consistencyLevel, long timeSlot) + throws MusicServiceException, MusicQueryException { boolean result = false; + long timeOfWrite = System.currentTimeMillis(); if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) { logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); @@ -359,10 +373,10 @@ public class CassaDataStore { "In preprared Execute Put: the actual insert query:" + queryObject.getQuery() + "; the values" + queryObject.getValues()); - PreparedStatement preparedInsert = null; + SimpleStatement statement; try { - - preparedInsert = session.prepare(queryObject.getQuery()); + + statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray()); } catch(InvalidQueryException iqe) { logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); throw new MusicQueryException(iqe.getMessage()); @@ -372,18 +386,18 @@ public class CassaDataStore { } try { - if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) { + if (consistencyLevel.equalsIgnoreCase(MusicUtil.CRITICAL)) { logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query"); - preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM); - } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) { + statement.setConsistencyLevel(ConsistencyLevel.QUORUM); + } else if (consistencyLevel.equalsIgnoreCase(MusicUtil.EVENTUAL)) { logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query"); - preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE); + statement.setConsistencyLevel(ConsistencyLevel.ONE); } - BoundStatement boundStatement = preparedInsert.bind(queryObject.getValues().toArray()); - boundStatement.setDefaultTimestamp(MusicUtil.v2sTimeStampInMicroseconds(0, System.currentTimeMillis())); + long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite); + statement.setDefaultTimestamp(timestamp); - ResultSet rs = session.execute(boundStatement); + ResultSet rs = session.execute(statement); result = rs.wasApplied(); } catch (AlreadyExistsException ae) { @@ -401,66 +415,60 @@ public class CassaDataStore { } /** - * This method performs DDL operations on Cassandra using consistency level ONE. - * + * This method performs DDL operations on Cassandra using consistency specified consistency. + * * @param queryObject Object containing cassandra prepared query and values. - * @return ResultSet - * @throws MusicServiceException - * @throws MusicQueryException */ - public ResultSet executeEventualGet(PreparedQueryObject queryObject) - throws MusicServiceException, MusicQueryException { + public ResultSet executeGet(PreparedQueryObject queryObject, String consistencyLevel) + throws MusicServiceException, MusicQueryException { if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) { - logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); - throw new MusicQueryException("Ill formed queryObject for the request = " + "[" - + queryObject.getQuery() + "]"); + logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); + throw new MusicQueryException("Ill formed queryObject for the request = " + "[" + + queryObject.getQuery() + "]"); } logger.info(EELFLoggerDelegate.applicationLogger, - "Executing Eventual get query:" + queryObject.getQuery()); - + "Executing Eventual get query:" + queryObject.getQuery()); + ResultSet results = null; try { - PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery()); - preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE); - results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray())); + SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray()); + + if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) { + statement.setConsistencyLevel(ConsistencyLevel.ONE); + } + else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) { + statement.setConsistencyLevel(ConsistencyLevel.QUORUM); + } + + results = session.execute(statement); } catch (Exception ex) { - logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); - throw new MusicServiceException(ex.getMessage()); + logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); + throw new MusicServiceException(ex.getMessage()); } return results; } /** + * This method performs DDL operations on Cassandra using consistency level ONE. + * + * @param queryObject Object containing cassandra prepared query and values. + */ + public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject) + throws MusicServiceException, MusicQueryException { + return executeGet(queryObject, CONSISTENCY_LEVEL_ONE); + } + + /** * * This method performs DDL operation on Cassandra using consistency level QUORUM. * * @param queryObject Object containing cassandra prepared query and values. - * @return ResultSet - * @throws MusicServiceException - * @throws MusicQueryException */ - public ResultSet executeCriticalGet(PreparedQueryObject queryObject) + public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject) throws MusicServiceException, MusicQueryException { - if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) { - logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); - throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "[" - + queryObject.getQuery() + "]"); - } - logger.info(EELFLoggerDelegate.applicationLogger, - "Executing Critical get query:" + queryObject.getQuery()); - PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery()); - preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM); - ResultSet results = null; - try { - results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray())); - } catch (Exception ex) { - logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); - throw new MusicServiceException(ex.getMessage()); - } - return results; - + return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM); } } diff --git a/src/main/java/org/onap/music/datastore/CassaLockStore.java b/src/main/java/org/onap/music/datastore/CassaLockStore.java index 67e96533..c1bf4784 100644 --- a/src/main/java/org/onap/music/datastore/CassaLockStore.java +++ b/src/main/java/org/onap/music/datastore/CassaLockStore.java @@ -1,16 +1,13 @@ package org.onap.music.datastore; import java.util.List; -import java.util.UUID; import org.onap.music.eelf.logging.EELFLoggerDelegate; import org.onap.music.exceptions.MusicQueryException; import org.onap.music.exceptions.MusicServiceException; -import org.onap.music.main.MusicUtil; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; -import com.datastax.driver.core.utils.UUIDs; /* * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state. @@ -57,8 +54,7 @@ public class CassaLockStore { String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, PRIMARY KEY ((key), lockReference) ) " + "WITH CLUSTERING ORDER BY (lockReference ASC);"; - System.out.println(tabQuery); - PreparedQueryObject queryObject = new PreparedQueryObject(); + PreparedQueryObject queryObject = new PreparedQueryObject(); queryObject.appendQueryString(tabQuery); boolean result; @@ -86,7 +82,7 @@ public class CassaLockStore { queryObject.addValue(lockName); queryObject.appendQueryString(selectQuery); - ResultSet gqResult = dsHandle.executeEventualGet(queryObject); + ResultSet gqResult = dsHandle.executeOneConsistencyGet(queryObject); List<Row> latestGuardRow = gqResult.all(); long prevGuard = 0; @@ -140,7 +136,7 @@ public class CassaLockStore { String selectQuery = "select * from "+keyspace+"."+table+" where key='"+key+"' LIMIT 1;"; PreparedQueryObject queryObject = new PreparedQueryObject(); queryObject.appendQueryString(selectQuery); - ResultSet results = dsHandle.executeEventualGet(queryObject); + ResultSet results = dsHandle.executeOneConsistencyGet(queryObject); Row row = results.one(); String lockReference = "" + row.getLong("lockReference"); String createTime = row.getString("createTime"); diff --git a/src/main/java/org/onap/music/main/MusicCore.java b/src/main/java/org/onap/music/main/MusicCore.java index cf2a47ee..f085be05 100644 --- a/src/main/java/org/onap/music/main/MusicCore.java +++ b/src/main/java/org/onap/music/main/MusicCore.java @@ -230,7 +230,7 @@ public class MusicCore { String query = "select * from "+syncTable+" where key='"+fullyQualifiedKey+"';"; PreparedQueryObject readQueryObject = new PreparedQueryObject(); readQueryObject.appendQueryString(query); - ResultSet results = getDSHandle().executeCriticalGet(readQueryObject); + ResultSet results = getDSHandle().executeQuorumConsistencyGet(readQueryObject); if (results.all().size() != 0) { logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!"); try { @@ -315,7 +315,7 @@ public class MusicCore { selectQuery.addValue(cqlFormattedPrimaryKeyValue); ResultSet results = null; try { - results = getDSHandle().executeCriticalGet(selectQuery); + results = getDSHandle().executeQuorumConsistencyGet(selectQuery); // write it back to a quorum Row row = results.one(); ColumnDefinitions colInfo = row.getColumnDefinitions(); @@ -356,7 +356,7 @@ public class MusicCore { public static ResultSet quorumGet(PreparedQueryObject query) { ResultSet results = null; try { - results = getDSHandle().executeCriticalGet(query); + results = getDSHandle().executeQuorumConsistencyGet(query); } catch (MusicServiceException | MusicQueryException e) { logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR); @@ -518,15 +518,9 @@ public class MusicCore { + e.getMessage()); } - String query = queryObject.getQuery(); - long timeOfWrite = System.currentTimeMillis(); long lockOrdinal = Long.parseLong(lockReference); - long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite); - // TODO: use Statement instead of modifying query - query = query.replaceFirst("SET", "USING TIMESTAMP "+ ts + " SET"); - queryObject.replaceQueryString(query); CassaDataStore dsHandle = getDSHandle(); - dsHandle.executePut(queryObject, MusicUtil.CRITICAL); + dsHandle.executePut(queryObject, MusicUtil.CRITICAL, lockOrdinal); long end = System.currentTimeMillis(); logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms"); }catch (MusicQueryException | MusicServiceException | MusicLockingException e) { @@ -572,7 +566,7 @@ public class MusicCore { public static ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException { ResultSet results = null; try { - results = getDSHandle().executeEventualGet(queryObject); + results = getDSHandle().executeOneConsistencyGet(queryObject); } catch (MusicQueryException | MusicServiceException e) { logger.error(EELFLoggerDelegate.errorLogger,e.getMessage()); throw new MusicServiceException(e.getMessage()); @@ -599,7 +593,7 @@ public class MusicCore { ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference); if(result.getResult().equals(ResultType.FAILURE)) return null;//not top of the lock store q - results = getDSHandle().executeCriticalGet(queryObject); + results = getDSHandle().executeQuorumConsistencyGet(queryObject); } catch (MusicQueryException | MusicServiceException | MusicLockingException e) { logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR); } |