aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/music/service/impl
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/music/service/impl')
-rw-r--r--src/main/java/org/onap/music/service/impl/MusicCassaCore.java762
-rw-r--r--src/main/java/org/onap/music/service/impl/MusicZKCore.java937
2 files changed, 1699 insertions, 0 deletions
diff --git a/src/main/java/org/onap/music/service/impl/MusicCassaCore.java b/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
new file mode 100644
index 00000000..8737a060
--- /dev/null
+++ b/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
@@ -0,0 +1,762 @@
+/*
+ * ============LICENSE_START==========================================
+ * org.onap.music
+ * ===================================================================
+ * Copyright (c) 2017 AT&T Intellectual Property
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=============================================
+ * ====================================================================
+ */
+
+package org.onap.music.service.impl;
+
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import org.onap.music.datastore.MusicDataStore;
+import org.onap.music.datastore.MusicDataStoreHandle;
+import org.onap.music.datastore.PreparedQueryObject;
+import org.onap.music.eelf.logging.EELFLoggerDelegate;
+import org.onap.music.eelf.logging.format.AppMessages;
+import org.onap.music.eelf.logging.format.ErrorSeverity;
+import org.onap.music.eelf.logging.format.ErrorTypes;
+import org.onap.music.exceptions.MusicLockingException;
+import org.onap.music.exceptions.MusicQueryException;
+import org.onap.music.exceptions.MusicServiceException;
+import org.onap.music.lockingservice.cassandra.CassaLockStore;
+import org.onap.music.lockingservice.cassandra.CassaLockStore.LockObject;
+import org.onap.music.lockingservice.cassandra.MusicLockState;
+import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
+import org.onap.music.main.MusicUtil;
+import org.onap.music.main.ResultType;
+import org.onap.music.main.ReturnType;
+import org.onap.music.service.MusicCoreService;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ColumnDefinitions.Definition;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.TableMetadata;
+import com.google.common.util.concurrent.Monitor.Guard;
+
+import org.onap.music.datastore.*;
+
+public class MusicCassaCore implements MusicCoreService {
+
+ public static CassaLockStore mLockHandle = null;;
+ private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
+ private static boolean unitTestRun=true;
+ private static MusicCassaCore musicCassaCoreInstance = null;
+
+ private MusicCassaCore() {
+
+ }
+ public static MusicCassaCore getInstance() {
+
+ if(musicCassaCoreInstance == null) {
+ musicCassaCoreInstance = new MusicCassaCore();
+ }
+ return musicCassaCoreInstance;
+ }
+
+ public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
+ logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
+ long start = System.currentTimeMillis();
+
+ if (mLockHandle == null) {
+ try {
+ mLockHandle = new CassaLockStore(MusicDataStoreHandle.getDSHandle());
+ } catch (Exception e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ throw new MusicLockingException("Failed to aquire Locl store handle " + e);
+ }
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
+ return mLockHandle;
+ }
+
+
+
+ public String createLockReference(String fullyQualifiedKey) {
+ String[] splitString = fullyQualifiedKey.split("\\.");
+ String keyspace = splitString[0];
+ String table = splitString[1];
+ String lockName = splitString[2];
+
+ logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
+ long start = System.currentTimeMillis();
+ String lockReference = null;
+ try {
+ lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName);
+ } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
+ e.printStackTrace();
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
+ return lockReference;
+ }
+
+
+ public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException {
+ evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
+ return acquireLock(fullyQualifiedKey, lockReference);
+ }
+
+ private void evictExpiredLockHolder(String fullyQualifiedKey, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException {
+
+ String[] splitString = fullyQualifiedKey.split("\\.");
+ String keyspace = splitString[0];
+ String table = splitString[1];
+ String primaryKeyValue = splitString[2];
+
+ LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
+
+ /* Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the
+ * reference*/
+
+ long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.acquireTime), Long.parseLong(currentLockHolderObject.createTime));
+ if((System.currentTimeMillis() - referenceTime) > leasePeriod) {
+ forciblyReleaseLock(fullyQualifiedKey, currentLockHolderObject.lockRef+"");
+ logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.lockRef+" forcibly released");
+ }
+ }
+
+ private static ReturnType isTopOfLockStore(String keyspace, String table, String primaryKeyValue, String lockReference) throws MusicLockingException, MusicQueryException, MusicServiceException {
+
+ //return failure to lock holders too early or already evicted from the lock store
+ String topOfLockStoreS = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
+ long topOfLockStoreL = Long.parseLong(topOfLockStoreS);
+ long lockReferenceL = Long.parseLong(lockReference);
+
+ if(lockReferenceL > topOfLockStoreL) {
+ logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not the lock holder yet");
+ return new ReturnType(ResultType.FAILURE, lockReference+" is not the lock holder yet");
+ }
+
+
+ if(lockReferenceL < topOfLockStoreL) {
+ logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is no longer/or was never in the lock store queue");
+ return new ReturnType(ResultType.FAILURE, lockReference+" is no longer/or was never in the lock store queue");
+ }
+
+ return new ReturnType(ResultType.SUCCESS, lockReference+" is top of lock store");
+ }
+
+ public ReturnType acquireLock(String fullyQualifiedKey, String lockId)
+ throws MusicLockingException, MusicQueryException, MusicServiceException {
+ String[] splitString = lockId.split("\\.");
+ String keyspace = splitString[0].substring(1);//remove '$'
+ String table = splitString[1];
+ String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
+ fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
+ String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
+
+ ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockRef);
+
+ if(result.getResult().equals(ResultType.FAILURE))
+ return result;//not top of the lock store q
+
+ //check to see if the value of the key has to be synced in case there was a forceful release
+ String syncTable = keyspace+".unsyncedKeys_"+table;
+ String query = "select * from "+syncTable+" where key='"+fullyQualifiedKey+"';";
+ PreparedQueryObject readQueryObject = new PreparedQueryObject();
+ readQueryObject.appendQueryString(query);
+ ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject);
+ if (results.all().size() != 0) {
+ logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
+ try {
+ syncQuorum(keyspace, table, primaryKeyValue);
+ } catch (Exception e) {
+ StringWriter sw = new StringWriter();
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ String exceptionAsString = sw.toString();
+ return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);
+ }
+ String cleanQuery = "delete from music_internal.unsynced_keys where key='"+fullyQualifiedKey+"';";
+ PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
+ deleteQueryObject.appendQueryString(cleanQuery);
+ MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
+ }
+
+ getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockRef);
+
+ return new ReturnType(ResultType.SUCCESS, lockRef+" is the lock holder for the key");
+ }
+
+
+
+ /**
+ *
+ * @param tableQueryObject
+ * @param consistency
+ * @return Boolean Indicates success or failure
+ * @throws MusicServiceException
+ *
+ *
+ */
+ public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject, String consistency) throws MusicServiceException {
+ boolean result = false;
+
+ try {
+ //create shadow locking table
+ result = getLockingServiceHandle().createLockQueue(keyspace, table);
+ if(result == false)
+ return ResultType.FAILURE;
+
+ result = false;
+
+ //create table to track unsynced_keys
+ table = "unsyncedKeys_"+table;
+
+ String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
+ + " ( key text,PRIMARY KEY (key) );";
+ System.out.println(tabQuery);
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+
+ queryObject.appendQueryString(tabQuery);
+ result = false;
+ result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
+
+
+ //create actual table
+ result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
+ } catch (MusicQueryException | MusicServiceException | MusicLockingException ex) {
+ logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
+ throw new MusicServiceException(ex.getMessage());
+ }
+ return result?ResultType.SUCCESS:ResultType.FAILURE;
+ }
+
+ private static void syncQuorum(String keyspace, String table, String primaryKeyValue) throws Exception {
+ logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
+ PreparedQueryObject selectQuery = new PreparedQueryObject();
+ PreparedQueryObject updateQuery = new PreparedQueryObject();
+
+ // get the primary key d
+ TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
+ String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName();// we only support single
+ // primary key
+ DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
+ Object cqlFormattedPrimaryKeyValue =
+ MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
+
+ // get the row of data from a quorum
+ selectQuery.appendQueryString("SELECT * FROM " + keyspace + "." + table + " WHERE "
+ + primaryKeyName + "= ?" + ";");
+ selectQuery.addValue(cqlFormattedPrimaryKeyValue);
+ ResultSet results = null;
+ try {
+ results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(selectQuery);
+ // write it back to a quorum
+ Row row = results.one();
+ ColumnDefinitions colInfo = row.getColumnDefinitions();
+ int totalColumns = colInfo.size();
+ int counter = 1;
+ StringBuilder fieldValueString = new StringBuilder("");
+ for (Definition definition : colInfo) {
+ String colName = definition.getName();
+ if (colName.equals(primaryKeyName))
+ continue;
+ DataType colType = definition.getType();
+ Object valueObj = MusicDataStoreHandle.getDSHandle().getColValue(row, colName, colType);
+ Object valueString = MusicUtil.convertToActualDataType(colType, valueObj);
+ fieldValueString.append(colName + " = ?");
+ updateQuery.addValue(valueString);
+ if (counter != (totalColumns - 1))
+ fieldValueString.append(",");
+ counter = counter + 1;
+ }
+ updateQuery.appendQueryString("UPDATE " + keyspace + "." + table + " SET "
+ + fieldValueString + " WHERE " + primaryKeyName + "= ? " + ";");
+ updateQuery.addValue(cqlFormattedPrimaryKeyValue);
+
+ MusicDataStoreHandle.getDSHandle().executePut(updateQuery, "critical");
+ } catch (MusicServiceException | MusicQueryException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.QUERYERROR +""+updateQuery ,ErrorSeverity.MAJOR, ErrorTypes.QUERYERROR);
+ }
+ }
+
+
+
+
+ /**
+ *
+ * @param query
+ * @return ResultSet
+ */
+ public ResultSet quorumGet(PreparedQueryObject query) {
+ ResultSet results = null;
+ try {
+ results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
+ } catch (MusicServiceException | MusicQueryException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR);
+
+ }
+ return results;
+
+ }
+
+
+
+ /**
+ *
+ * @param fullyQualifiedKey lockName
+ * @return
+ */
+ public String whoseTurnIsIt(String fullyQualifiedKey) {
+ String[] splitString = fullyQualifiedKey.split("\\.");
+ String keyspace = splitString[0];
+ String table = splitString[1];
+ String primaryKeyValue = splitString[2];
+ try {
+ return "$" + fullyQualifiedKey + "$"
+ + getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
+ } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ return null;
+ }
+
+ /**
+ *
+ * @param lockReference
+ * @return
+ */
+ public static String getLockNameFromId(String lockReference) {
+ StringTokenizer st = new StringTokenizer(lockReference);
+ return st.nextToken("$");
+ }
+
+ @Override
+ public void destroyLockRef(String lockId) {
+ long start = System.currentTimeMillis();
+ String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
+ String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
+ String[] splitString = fullyQualifiedKey.split("\\.");
+ String keyspace = splitString[0];
+ String table = splitString[1];
+ String primaryKeyValue = splitString[2];
+ try {
+ getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef);
+ } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
+ }
+
+ public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) {
+ long start = System.currentTimeMillis();
+ String[] splitString = fullyQualifiedKey.split("\\.");
+ String keyspace = splitString[0];
+ String table = splitString[1];
+ String primaryKeyValue = splitString[2];
+ try {
+ getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference);
+ } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockReference ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
+ return new MusicLockState(LockStatus.UNLOCKED, "");
+ }
+
+ @Override
+ public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) {
+ String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
+ String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
+ if (voluntaryRelease) {
+ return voluntaryReleaseLock(fullyQualifiedKey, lockRef);
+ } else {
+ return forciblyReleaseLock(fullyQualifiedKey, lockRef);
+ }
+ }
+
+ public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference) {
+ return destroyLockRef(fullyQualifiedKey, lockReference);
+ }
+
+ public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) {
+ String[] splitString = fullyQualifiedKey.split("\\.");
+ String keyspace = splitString[0];
+ String table = splitString[1];
+
+ //leave a signal that this key could potentially be unsynchronized
+ String syncTable = keyspace+".unsyncedKeys_"+table;
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ String values = "(?)";
+ queryObject.addValue(fullyQualifiedKey);
+ String insQuery = "insert into "+syncTable+" (key) values "+values+";";
+ queryObject.appendQueryString(insQuery);
+ try {
+ MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical");
+ } catch (Exception e) {
+ logger.error("Cannot forcibly release lock: " + fullyQualifiedKey + " " + lockReference + ". "
+ + e.getMessage());
+ }
+
+ //now release the lock
+ return destroyLockRef(fullyQualifiedKey, lockReference);
+ }
+
+ /**
+ *
+ * @param lockName
+ * @throws MusicLockingException
+ */
+ public void deleteLock(String lockName) throws MusicLockingException {
+ //deprecated
+ }
+
+ // Prepared Query Additions.
+
+ /**
+ *
+ * @param queryObject
+ * @return ReturnType
+ * @throws MusicServiceException
+ */
+ public ReturnType eventualPut(PreparedQueryObject queryObject) {
+ boolean result = false;
+ try {
+ result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
+ } catch (MusicServiceException | MusicQueryException ex) {
+ logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
+ logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
+ return new ReturnType(ResultType.FAILURE, ex.getMessage());
+ }
+ if (result) {
+ return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
+ } else {
+ return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
+ }
+ }
+
+ /**
+ *
+ * @param queryObject
+ * @return ReturnType
+ * @throws MusicServiceException
+ */
+ public ReturnType eventualPut_nb(PreparedQueryObject queryObject,String keyspace,String tablename,String primaryKey) {
+ boolean result = false;
+ long guard = 0;
+ PreparedQueryObject getGaurd = new PreparedQueryObject();
+ getGaurd.appendQueryString("SELECT guard FROM "+keyspace+".lockq_"+tablename+ " WHERE key = ? ;");
+ getGaurd.addValue(primaryKey);
+ try {
+ ResultSet getGaurdResult = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(getGaurd);
+ Row row = getGaurdResult.one();
+ if (row != null) {
+ guard = row.getLong("guard");
+ long timeOfWrite = System.currentTimeMillis();
+ long ts = MusicUtil.v2sTimeStampInMicroseconds(guard, timeOfWrite);
+ String query = queryObject.getQuery();
+ if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
+ if (queryObject.getOperation().equalsIgnoreCase("delete"))
+ query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
+ else
+ query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
+ }
+ queryObject.replaceQueryString(query);
+ }
+
+ } catch (MusicServiceException | MusicQueryException e) {
+ logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage());
+ }
+ try {
+ result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
+ } catch (MusicServiceException | MusicQueryException ex) {
+ logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
+ logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
+ return new ReturnType(ResultType.FAILURE, ex.getMessage());
+ }
+ if (result) {
+ return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
+ } else {
+ return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
+ }
+ }
+
+ /**
+ *
+ * @param keyspace
+ * @param table
+ * @param primaryKeyValue
+ * @param queryObject
+ * @param lockId
+ * @return
+ */
+ public ReturnType criticalPut(String keyspace, String table, String primaryKeyValue,
+ PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
+ long start = System.currentTimeMillis();
+ try {
+ ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue,
+ lockId.substring(lockId.lastIndexOf("$")+1));
+ if(result.getResult().equals(ResultType.FAILURE))
+ return result;//not top of the lock store q
+
+ if (conditionInfo != null)
+ try {
+ if (conditionInfo.testCondition() == false)
+ return new ReturnType(ResultType.FAILURE,
+ "Lock acquired but the condition is not true");
+ } catch (Exception e) {
+ return new ReturnType(ResultType.FAILURE,
+ "Exception thrown while checking the condition, check its sanctity:\n"
+ + e.getMessage());
+ }
+
+ String query = queryObject.getQuery();
+ long timeOfWrite = System.currentTimeMillis();
+ long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$")+1));
+ long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
+ // TODO: use Statement instead of modifying query
+ if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
+ if (queryObject.getOperation().equalsIgnoreCase("delete"))
+ query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
+ else if (queryObject.getOperation().equalsIgnoreCase("insert"))
+ query = query.replaceFirst(";", " USING TIMESTAMP " + ts + " ; ");
+ else
+ query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
+ }
+ queryObject.replaceQueryString(query);
+ MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
+ dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
+ }catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
+ return new ReturnType(ResultType.FAILURE,
+ "Exception thrown while doing the critical put\n"
+ + e.getMessage());
+ }
+ return new ReturnType(ResultType.SUCCESS, "Update performed");
+ }
+
+
+ /**
+ *
+ * @param queryObject
+ * @param consistency
+ * @return Boolean Indicates success or failure
+ * @throws MusicServiceException
+ *
+ *
+ */
+ public ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException {
+ // this is mainly for some functions like keyspace creation etc which does not
+ // really need the bells and whistles of Music locking.
+ boolean result = false;
+ try {
+ result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
+ } catch (MusicQueryException | MusicServiceException ex) {
+ logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR,
+ ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
+ throw new MusicServiceException(ex.getMessage());
+ }
+ return result ? ResultType.SUCCESS : ResultType.FAILURE;
+ }
+
+ /**
+ * This method performs DDL operation on cassandra.
+ *
+ * @param queryObject query object containing prepared query and values
+ * @return ResultSet
+ * @throws MusicServiceException
+ */
+ public ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
+ ResultSet results = null;
+ try {
+ results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
+ } catch (MusicQueryException | MusicServiceException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
+ throw new MusicServiceException(e.getMessage());
+ }
+ return results;
+ }
+
+ /**
+ * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
+ * is used to check if the resource is free.
+ *
+ * @param keyspace name of the keyspace
+ * @param table name of the table
+ * @param primaryKeyValue primary key value
+ * @param queryObject query object containing prepared query and values
+ * @param lockId lock ID to check if the resource is free to perform the operation.
+ * @return ResultSet
+ */
+ public ResultSet criticalGet(String keyspace, String table, String primaryKeyValue,
+ PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
+ ResultSet results = null;
+
+ try {
+ ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue,
+ lockId.substring(lockId.lastIndexOf("$")+1));
+ if(result.getResult().equals(ResultType.FAILURE))
+ return null;//not top of the lock store q
+ results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
+ } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
+ }
+ return results;
+ }
+
+ /**
+ * This method performs DML operation on cassandra, when the lock of the dd is acquired.
+ *
+ * @param keyspaceName name of the keyspace
+ * @param tableName name of the table
+ * @param primaryKey primary key value
+ * @param queryObject query object containing prepared query and values
+ * @return ReturnType
+ * @throws MusicLockingException
+ * @throws MusicServiceException
+ * @throws MusicQueryException
+ */
+ public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
+ PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException, MusicQueryException, MusicServiceException {
+ long start = System.currentTimeMillis();
+ String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
+ String lockId = createLockReference(fullyQualifiedKey);
+ long lockCreationTime = System.currentTimeMillis();
+ ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockId);
+ long lockAcqTime = System.currentTimeMillis();
+
+ if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
+ voluntaryReleaseLock(fullyQualifiedKey,lockId);
+ return lockAcqResult;
+ }
+
+ logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
+ String lockRef = lockId.substring(lockId.lastIndexOf("$"));
+ ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
+ queryObject, lockRef, conditionInfo);
+ long criticalPutTime = System.currentTimeMillis();
+ voluntaryReleaseLock(fullyQualifiedKey,lockId);
+ long lockDeleteTime = System.currentTimeMillis();
+ String timingInfo = "|lock creation time:" + (lockCreationTime - start)
+ + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
+ + "|critical put time:" + (criticalPutTime - lockAcqTime)
+ + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
+ criticalPutResult.setTimingInfo(timingInfo);
+ return criticalPutResult;
+ }
+
+
+
+
+ /**
+ * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
+ *
+ * @param keyspaceName name of the keyspace
+ * @param tableName name of the table
+ * @param primaryKey primary key value
+ * @param queryObject query object containing prepared query and values
+ * @return ResultSet
+ * @throws MusicServiceException
+ * @throws MusicLockingException
+ * @throws MusicQueryException
+ */
+ public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
+ PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
+ String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
+ String lockId = createLockReference(fullyQualifiedKey);
+ long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
+ ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockId);
+ if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
+ String lockRef = lockId.substring(lockId.lastIndexOf("$"));
+ ResultSet result =
+ criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
+ voluntaryReleaseLock(fullyQualifiedKey,lockId);
+ return result;
+ } else {
+ voluntaryReleaseLock(fullyQualifiedKey,lockId);
+ logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
+ return null;
+ }
+ }
+
+
+
+ /**
+ * @param lockName
+ * @return
+ */
+ public Map<String, Object> validateLock(String lockName) {
+ Map<String, Object> resultMap = new HashMap<>();
+ String[] locks = lockName.split("\\.");
+ if(locks.length < 3) {
+ resultMap.put("Error", "Invalid lock. Please make sure lock is of the type keyspaceName.tableName.primaryKey");
+ return resultMap;
+ }
+ String keyspace= locks[0];
+ if(keyspace.startsWith("$"))
+ keyspace = keyspace.substring(1);
+ resultMap.put("keyspace",keyspace);
+ return resultMap;
+ }
+
+
+ public static void main(String[] args) {
+ String x = "axe top";
+ x = x.replaceFirst("top", "sword");
+ System.out.print(x); //returns sword pickaxe
+ }
+
+
+
+ @Override
+ public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
+ PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
+ //Deprecated
+ return null;
+ }
+ @Override
+ public List<String> getLockQueue(String fullyQualifiedKey)
+ throws MusicServiceException, MusicQueryException, MusicLockingException {
+ String[] splitString = fullyQualifiedKey.split("\\.");
+ String keyspace = splitString[0];
+ String table = splitString[1];
+ String primaryKeyValue = splitString[2];
+
+ return getLockingServiceHandle().getLockQueue(keyspace, table, primaryKeyValue);
+ }
+ @Override
+ public long getLockQueueSize(String fullyQualifiedKey)
+ throws MusicServiceException, MusicQueryException, MusicLockingException {
+ String[] splitString = fullyQualifiedKey.split("\\.");
+ String keyspace = splitString[0];
+ String table = splitString[1];
+ String primaryKeyValue = splitString[2];
+
+ return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
+ }
+ @Override
+ public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
+ PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {
+ //deprecated
+ return null;
+ }
+
+}
diff --git a/src/main/java/org/onap/music/service/impl/MusicZKCore.java b/src/main/java/org/onap/music/service/impl/MusicZKCore.java
new file mode 100644
index 00000000..1662b20c
--- /dev/null
+++ b/src/main/java/org/onap/music/service/impl/MusicZKCore.java
@@ -0,0 +1,937 @@
+/*
+ * ============LICENSE_START==========================================
+ * org.onap.music
+ * ===================================================================
+ * Copyright (c) 2017 AT&T Intellectual Property
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=============================================
+ * ====================================================================
+ */
+
+package org.onap.music.service.impl;
+
+
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.commons.jcs.access.CacheAccess;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.onap.music.datastore.MusicDataStore;
+import org.onap.music.datastore.PreparedQueryObject;
+import org.onap.music.datastore.jsonobjects.JsonKeySpace;
+import org.onap.music.eelf.logging.EELFLoggerDelegate;
+import org.onap.music.eelf.logging.format.AppMessages;
+import org.onap.music.eelf.logging.format.ErrorSeverity;
+import org.onap.music.eelf.logging.format.ErrorTypes;
+import org.onap.music.exceptions.MusicLockingException;
+import org.onap.music.exceptions.MusicQueryException;
+import org.onap.music.exceptions.MusicServiceException;
+import org.onap.music.lockingservice.cassandra.MusicLockState;
+import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
+import org.onap.music.lockingservice.zookeeper.MusicLockingService;
+import org.onap.music.main.MusicUtil;
+import org.onap.music.main.ResultType;
+import org.onap.music.main.ReturnType;
+import org.onap.music.service.MusicCoreService;
+import org.onap.music.datastore.*;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ColumnDefinitions.Definition;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.TableMetadata;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+
+/**
+ * This class .....
+ *
+ *
+ */
+public class MusicZKCore implements MusicCoreService {
+
+ public static MusicLockingService mLockHandle = null;
+ private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicZKCore.class);
+ private static MusicZKCore musicZKCoreInstance = null;
+
+ private MusicZKCore() {
+
+ }
+ public static MusicZKCore getInstance() {
+
+ if(musicZKCoreInstance == null) {
+ musicZKCoreInstance = new MusicZKCore();
+ }
+ return musicZKCoreInstance;
+ }
+
+
+
+
+
+ public static MusicLockingService getLockingServiceHandle() throws MusicLockingException {
+ logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
+ long start = System.currentTimeMillis();
+
+ if (mLockHandle == null) {
+ try {
+ mLockHandle = new MusicLockingService();
+ } catch (Exception e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ throw new MusicLockingException("Failed to aquire Locl store handle " + e);
+ }
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
+ return mLockHandle;
+ }
+
+
+
+ public String createLockReference(String lockName) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
+ long start = System.currentTimeMillis();
+ String lockId = null;
+ try {
+ lockId = getLockingServiceHandle().createLockId("/" + lockName);
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.CREATELOCK+lockName,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
+ return lockId;
+ }
+
+ /**
+ *
+ * @param key
+ * @return
+ */
+ public static boolean isTableOrKeySpaceLock(String key) {
+ String[] splitString = key.split("\\.");
+ if (splitString.length > 2)
+ return false;
+ else
+ return true;
+ }
+
+ /**
+ *
+ * @param key
+ * @return
+ */
+ public static MusicLockState getMusicLockState(String key) {
+ long start = System.currentTimeMillis();
+ try {
+ String[] splitString = key.split("\\.");
+ String keyspaceName = splitString[0];
+ String tableName = splitString[1];
+ String primaryKey = splitString[2];
+ MusicLockState mls;
+ String lockName = keyspaceName + "." + tableName + "." + primaryKey;
+ mls = getLockingServiceHandle().getLockState(lockName);
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to get lock state:" + (end - start) + " ms");
+ return mls;
+ } catch (NullPointerException | MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.INVALIDLOCK,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ return null;
+ }
+
+ public ReturnType acquireLockWithLease(String key, String lockId, long leasePeriod) {
+ try {
+ long start = System.currentTimeMillis();
+ /* check if the current lock has exceeded its lease and if yes, release that lock */
+ MusicLockState mls = getMusicLockState(key);
+ if (mls != null) {
+ if (mls.getLockStatus().equals(LockStatus.LOCKED)) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"The current lock holder for " + key + " is " + mls.getLockHolder()
+ + ". Checking if it has exceeded lease");
+ long currentLockPeriod = System.currentTimeMillis() - mls.getLeaseStartTime();
+ long currentLeasePeriod = mls.getLeasePeriod();
+ if (currentLockPeriod > currentLeasePeriod) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"Lock period " + currentLockPeriod
+ + " has exceeded lease period " + currentLeasePeriod);
+ boolean voluntaryRelease = false;
+ String currentLockHolder = mls.getLockHolder();
+ mls = releaseLock(currentLockHolder, voluntaryRelease);
+ }
+ }
+ } else
+ logger.error(EELFLoggerDelegate.errorLogger,key, AppMessages.INVALIDLOCK,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+
+ /*
+ * call the traditional acquire lock now and if the result returned is true, set the
+ * begin time-stamp and lease period
+ */
+ if (acquireLock(key, lockId).getResult() == ResultType.SUCCESS) {
+ mls = getMusicLockState(key);// get latest state
+ if ( mls == null ) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"Music Lock State is null");
+ return new ReturnType(ResultType.FAILURE, "Could not acquire lock, Lock State is null");
+ }
+ if (mls.getLeaseStartTime() == -1) {// set it again only if it is not set already
+ mls.setLeaseStartTime(System.currentTimeMillis());
+ mls.setLeasePeriod(leasePeriod);
+ getLockingServiceHandle().setLockState(key, mls);
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire leased lock:" + (end - start) + " ms");
+ return new ReturnType(ResultType.SUCCESS, "Accquired lock");
+ } else {
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to fail to acquire leased lock:" + (end - start) + " ms");
+ return new ReturnType(ResultType.FAILURE, "Could not acquire lock");
+ }
+ } catch (Exception e) {
+ StringWriter sw = new StringWriter();
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+
+ String exceptionAsString = sw.toString();
+ return new ReturnType(ResultType.FAILURE,
+ "Exception thrown in acquireLockWithLease:\n" + exceptionAsString);
+ }
+ }
+
+ public ReturnType acquireLock(String key, String lockId) throws MusicLockingException {
+ /*
+ * first check if I am on top. Since ids are not reusable there is no need to check
+ * lockStatus If the status is unlocked, then the above call will automatically return
+ * false.
+ */
+ Boolean result = false;
+ try {
+ result = getLockingServiceHandle().isMyTurn(lockId);
+ } catch (MusicLockingException e2) {
+ logger.error(EELFLoggerDelegate.errorLogger,AppMessages.INVALIDLOCK + lockId + " " + e2);
+ throw new MusicLockingException();
+ }
+ if (!result) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Not your turn, someone else has the lock");
+ try {
+ if (!getLockingServiceHandle().lockIdExists(lockId)) {
+ logger.info(EELFLoggerDelegate.applicationLogger, "In acquire lock: this lockId doesn't exist");
+ return new ReturnType(ResultType.FAILURE, "Lockid doesn't exist");
+ }
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.INVALIDLOCK+lockId,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ throw new MusicLockingException();
+ }
+ logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: returning failure");
+ return new ReturnType(ResultType.FAILURE, "Not your turn, someone else has the lock");
+ }
+
+
+ // this is for backward compatibility where locks could also be acquired on just
+ // keyspaces or tables.
+ if (isTableOrKeySpaceLock(key)) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: A table or keyspace lock so no need to perform sync...so returning true");
+ return new ReturnType(ResultType.SUCCESS, "A table or keyspace lock so no need to perform sync...so returning true");
+ }
+
+ // read the lock name corresponding to the key and if the status is locked or being locked,
+ // then return false
+ MusicLockState currentMls = null;
+ MusicLockState newMls = null;
+ try {
+ currentMls = getMusicLockState(key);
+ String currentLockHolder = null;
+ if(currentMls != null) { currentLockHolder = currentMls.getLockHolder(); };
+ if (lockId.equals(currentLockHolder)) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: You already have the lock!");
+ return new ReturnType(ResultType.SUCCESS, "You already have the lock!");
+ }
+ } catch (NullPointerException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.INVALIDLOCK+lockId,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+
+ // change status to "being locked". This state transition is necessary to ensure syncing
+ // before granting the lock
+ String lockHolder = null;
+ boolean needToSyncQuorum = false;
+ if (currentMls != null)
+ needToSyncQuorum = currentMls.isNeedToSyncQuorum();
+
+
+ newMls = new MusicLockState(MusicLockState.LockStatus.BEING_LOCKED, lockHolder,
+ needToSyncQuorum);
+ try {
+ getLockingServiceHandle().setLockState(key, newMls);
+ } catch (MusicLockingException e1) {
+ logger.error(EELFLoggerDelegate.errorLogger,e1.getMessage(), AppMessages.LOCKSTATE+key,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Set lock state to being_locked");
+
+ // do syncing if this was a forced lock release
+ if (needToSyncQuorum) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Since there was a forcible release, need to sync quorum!");
+ try {
+ syncQuorum(key);
+ } catch (Exception e) {
+ logger.error(EELFLoggerDelegate.errorLogger,"Failed to set Lock state " + e);
+ }
+ }
+
+ // change status to locked
+ lockHolder = lockId;
+ needToSyncQuorum = false;
+ newMls = new MusicLockState(MusicLockState.LockStatus.LOCKED, lockHolder, needToSyncQuorum);
+ try {
+ getLockingServiceHandle().setLockState(key, newMls);
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKSTATE+key,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Set lock state to locked and assigned current lock ref "
+ + lockId + " as holder");
+
+ return new ReturnType(result?ResultType.SUCCESS:ResultType.FAILURE, "Set lock state to locked and assigned a lock holder");
+ }
+
+
+
+ /**
+ *
+ * @param keyspaceName
+ * @param kspObject
+ * @return
+ * @throws Exception
+ */
+ public boolean createKeyspace(String keyspaceName, JsonKeySpace kspObject) throws Exception {
+ return true;
+ }
+
+
+ private static void syncQuorum(String key) throws Exception {
+ logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
+ String[] splitString = key.split("\\.");
+ String keyspaceName = splitString[0];
+ String tableName = splitString[1];
+ String primaryKeyValue = splitString[2];
+ PreparedQueryObject selectQuery = new PreparedQueryObject();
+ PreparedQueryObject updateQuery = new PreparedQueryObject();
+
+ // get the primary key d
+ TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspaceName, tableName);
+ String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName();// we only support single
+ // primary key
+ DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
+ Object cqlFormattedPrimaryKeyValue =
+ MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
+
+ // get the row of data from a quorum
+ selectQuery.appendQueryString("SELECT * FROM " + keyspaceName + "." + tableName + " WHERE "
+ + primaryKeyName + "= ?" + ";");
+ selectQuery.addValue(cqlFormattedPrimaryKeyValue);
+ ResultSet results = null;
+ try {
+ results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(selectQuery);
+ // write it back to a quorum
+ Row row = results.one();
+ ColumnDefinitions colInfo = row.getColumnDefinitions();
+ int totalColumns = colInfo.size();
+ int counter = 1;
+ StringBuilder fieldValueString = new StringBuilder("");
+ for (Definition definition : colInfo) {
+ String colName = definition.getName();
+ if (colName.equals(primaryKeyName))
+ continue;
+ DataType colType = definition.getType();
+ Object valueObj = MusicDataStoreHandle.getDSHandle().getColValue(row, colName, colType);
+ Object valueString = MusicUtil.convertToActualDataType(colType, valueObj);
+ fieldValueString.append(colName + " = ?");
+ updateQuery.addValue(valueString);
+ if (counter != (totalColumns - 1))
+ fieldValueString.append(",");
+ counter = counter + 1;
+ }
+ updateQuery.appendQueryString("UPDATE " + keyspaceName + "." + tableName + " SET "
+ + fieldValueString + " WHERE " + primaryKeyName + "= ? " + ";");
+ updateQuery.addValue(cqlFormattedPrimaryKeyValue);
+
+ MusicDataStoreHandle.getDSHandle().executePut(updateQuery, "critical");
+ } catch (MusicServiceException | MusicQueryException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.QUERYERROR +""+updateQuery ,ErrorSeverity.MAJOR, ErrorTypes.QUERYERROR);
+ }
+ }
+
+
+
+
+ /**
+ *
+ * @param query
+ * @return ResultSet
+ */
+ public ResultSet quorumGet(PreparedQueryObject query) {
+ ResultSet results = null;
+ try {
+ results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
+ } catch (MusicServiceException | MusicQueryException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR);
+
+ }
+ return results;
+
+ }
+
+
+
+ /**
+ *
+ * @param lockName
+ * @return
+ */
+ public String whoseTurnIsIt(String lockName) {
+
+ try {
+ return getLockingServiceHandle().whoseTurnIsIt("/" + lockName) + "";
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+lockName ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ return null;
+
+
+ }
+
+ /**
+ *
+ * @param lockId
+ * @return
+ */
+ public static String getLockNameFromId(String lockId) {
+ StringTokenizer st = new StringTokenizer(lockId);
+ return st.nextToken("$");
+ }
+
+ public void destroyLockRef(String lockId) {
+ long start = System.currentTimeMillis();
+ try {
+ getLockingServiceHandle().unlockAndDeleteId(lockId);
+ } catch (MusicLockingException | NoNodeException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockId ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
+ }
+
+ public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) {
+ long start = System.currentTimeMillis();
+ try {
+ getLockingServiceHandle().unlockAndDeleteId(lockId);
+ } catch (MusicLockingException e1) {
+ logger.error(EELFLoggerDelegate.errorLogger,e1.getMessage(), AppMessages.RELEASELOCK+lockId ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ } catch (KeeperException.NoNodeException nne) {
+ logger.error(EELFLoggerDelegate.errorLogger,"Failed to release Lock " + lockId + " " + nne);
+ MusicLockState mls = new MusicLockState("Lock doesn't exists. Release lock operation failed.");
+ return mls;
+ }
+ String lockName = getLockNameFromId(lockId);
+ MusicLockState mls;
+ String lockHolder = null;
+ if (voluntaryRelease) {
+ mls = new MusicLockState(MusicLockState.LockStatus.UNLOCKED, lockHolder);
+ logger.info(EELFLoggerDelegate.applicationLogger,"In unlock: lock voluntarily released for " + lockId);
+ } else {
+ boolean needToSyncQuorum = true;
+ mls = new MusicLockState(MusicLockState.LockStatus.UNLOCKED, lockHolder,
+ needToSyncQuorum);
+ logger.info(EELFLoggerDelegate.applicationLogger,"In unlock: lock forcibly released for " + lockId);
+ }
+ try {
+ getLockingServiceHandle().setLockState(lockName, mls);
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.RELEASELOCK+lockId ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to release lock:" + (end - start) + " ms");
+ return mls;
+ }
+
+ public static void voluntaryReleaseLock(String lockId) throws MusicLockingException{
+ try {
+ getLockingServiceHandle().unlockAndDeleteId(lockId);
+ } catch (KeeperException.NoNodeException e) {
+ // ??? No way
+ }
+ }
+
+ /**
+ *
+ * @param lockName
+ * @throws MusicLockingException
+ */
+ public void deleteLock(String lockName) throws MusicLockingException {
+ long start = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Deleting lock for " + lockName);
+ try {
+ getLockingServiceHandle().deleteLock("/" + lockName);
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DELTELOCK+lockName ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ throw new MusicLockingException(e.getMessage());
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to delete lock:" + (end - start) + " ms");
+ }
+
+
+ /**
+ *
+ * @param nodeName
+ */
+ public static void pureZkCreate(String nodeName) {
+ try {
+ getLockingServiceHandle().getzkLockHandle().createNode(nodeName);
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ }
+
+ /**
+ *
+ * @param nodeName
+ * @param data
+ */
+ public static void pureZkWrite(String nodeName, byte[] data) {
+ long start = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Performing zookeeper write to " + nodeName);
+ try {
+ getLockingServiceHandle().getzkLockHandle().setNodeData(nodeName, data);
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ logger.info(EELFLoggerDelegate.applicationLogger,"Performed zookeeper write to " + nodeName);
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the actual zk put:" + (end - start) + " ms");
+ }
+
+ /**
+ *
+ * @param nodeName
+ * @return
+ */
+ public static byte[] pureZkRead(String nodeName) {
+ long start = System.currentTimeMillis();
+ byte[] data = null;
+ try {
+ data = getLockingServiceHandle().getzkLockHandle().getNodeData(nodeName);
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the actual zk put:" + (end - start) + " ms");
+ return data;
+ }
+
+
+
+ // Prepared Query Additions.
+
+ /**
+ *
+ * @param keyspaceName
+ * @param tableName
+ * @param primaryKey
+ * @param queryObject
+ * @return ReturnType
+ * @throws MusicServiceException
+ */
+ public ReturnType eventualPut(PreparedQueryObject queryObject) {
+ boolean result = false;
+ try {
+ result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
+ } catch (MusicServiceException | MusicQueryException ex) {
+ logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
+ logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
+ return new ReturnType(ResultType.FAILURE, ex.getMessage());
+ }
+ if (result) {
+ return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
+ } else {
+ return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
+ }
+ }
+
+ /**
+ *
+ * @param keyspaceName
+ * @param tableName
+ * @param primaryKey
+ * @param queryObject
+ * @param lockId
+ * @return
+ */
+ public ReturnType criticalPut(String keyspaceName, String tableName, String primaryKey,
+ PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
+ long start = System.currentTimeMillis();
+
+ try {
+ MusicLockState mls = getLockingServiceHandle()
+ .getLockState(keyspaceName + "." + tableName + "." + primaryKey);
+ if (mls.getLockHolder().equals(lockId) == true) {
+ if (conditionInfo != null)
+ try {
+ if (conditionInfo.testCondition() == false)
+ return new ReturnType(ResultType.FAILURE,
+ "Lock acquired but the condition is not true");
+ } catch (Exception e) {
+ return new ReturnType(ResultType.FAILURE,
+ "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
+ + e.getMessage());
+ }
+ boolean result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.CRITICAL);
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
+ if (result) {
+ return new ReturnType(ResultType.SUCCESS, "Update performed");
+ } else {
+ return new ReturnType(ResultType.FAILURE, "Update failed to perform");
+ }
+ } else
+ return new ReturnType(ResultType.FAILURE,
+ "Cannot perform operation since you are the not the lock holder");
+ } catch (MusicQueryException | MusicServiceException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
+ return new ReturnType(ResultType.FAILURE,
+ "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
+ + e.getMessage());
+ }catch(MusicLockingException ex){
+ return new ReturnType(ResultType.FAILURE,ex.getMessage());
+ }
+
+ }
+
+ /**
+ *
+ * @param queryObject
+ * @param consistency
+ * @return Boolean Indicates success or failure
+ * @throws MusicServiceException
+ *
+ *
+ */
+ public ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException {
+ // this is mainly for some functions like keyspace creation etc which does not
+ // really need the bells and whistles of Music locking.
+ boolean result = false;
+ try {
+ result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
+ } catch (MusicQueryException | MusicServiceException ex) {
+ logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
+ throw new MusicServiceException(ex.getMessage());
+ }
+ return result?ResultType.SUCCESS:ResultType.FAILURE;
+ }
+
+ /**
+ * This method performs DDL operation on cassandra.
+ *
+ * @param queryObject query object containing prepared query and values
+ * @return ResultSet
+ * @throws MusicServiceException
+ */
+ public ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
+ ResultSet results = null;
+ try {
+ results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
+ } catch (MusicQueryException | MusicServiceException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
+ throw new MusicServiceException(e.getMessage());
+ }
+ return results;
+ }
+
+ public static String getMyHostId() {
+ PreparedQueryObject pQuery = new PreparedQueryObject();
+ pQuery.appendQueryString("SELECT HOST_ID FROM SYSTEM.LOCAL");
+ ResultSet rs = null;
+ try {
+ rs = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(pQuery);
+ Row row = rs.one();
+ return (row == null) ? "UNKNOWN" : row.getUUID("HOST_ID").toString();
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
+ }
+ logger.error(EELFLoggerDelegate.errorLogger, "Some issue during MusicCore.getMyHostId");
+ return "UNKNOW";
+ }
+
+ /**
+ * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
+ * is used to check if the resource is free.
+ *
+ * @param keyspaceName name of the keyspace
+ * @param tableName name of the table
+ * @param primaryKey primary key value
+ * @param queryObject query object containing prepared query and values
+ * @param lockId lock ID to check if the resource is free to perform the operation.
+ * @return ResultSet
+ */
+ public ResultSet criticalGet(String keyspaceName, String tableName, String primaryKey,
+ PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
+ ResultSet results = null;
+ try {
+ MusicLockState mls = getLockingServiceHandle()
+ .getLockState(keyspaceName + "." + tableName + "." + primaryKey);
+ if (mls.getLockHolder().equals(lockId)) {
+ results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
+ } else
+ throw new MusicServiceException("YOU DO NOT HAVE THE LOCK");
+ } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
+ }
+ return results;
+ }
+
+ /**
+ * This method performs DML operation on cassandra, when the lock of the dd is acquired.
+ *
+ * @param keyspaceName name of the keyspace
+ * @param tableName name of the table
+ * @param primaryKey primary key value
+ * @param queryObject query object containing prepared query and values
+ * @return ReturnType
+ * @throws MusicLockingException
+ */
+ public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
+ PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
+
+ long start = System.currentTimeMillis();
+ String key = keyspaceName + "." + tableName + "." + primaryKey;
+ String lockId = createLockReference(key);
+ long lockCreationTime = System.currentTimeMillis();
+ ReturnType lockAcqResult = acquireLock(key, lockId);
+ long lockAcqTime = System.currentTimeMillis();
+ if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
+ ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
+ queryObject, lockId, conditionInfo);
+ long criticalPutTime = System.currentTimeMillis();
+ voluntaryReleaseLock(lockId);
+ long lockDeleteTime = System.currentTimeMillis();
+ String timingInfo = "|lock creation time:" + (lockCreationTime - start)
+ + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
+ + "|critical put time:" + (criticalPutTime - lockAcqTime)
+ + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
+ criticalPutResult.setTimingInfo(timingInfo);
+ return criticalPutResult;
+ } else {
+ logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
+ destroyLockRef(lockId);
+ return lockAcqResult;
+ }
+ }
+
+ /**
+ * this function is mainly for the benchmarks to see the effect of lock deletion.
+ *
+ * @param keyspaceName
+ * @param tableName
+ * @param primaryKey
+ * @param queryObject
+ * @param conditionInfo
+ * @return
+ * @throws MusicLockingException
+ */
+ public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName,
+ String primaryKey, PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
+
+ long start = System.currentTimeMillis();
+ String key = keyspaceName + "." + tableName + "." + primaryKey;
+ String lockId = createLockReference(key);
+ long lockCreationTime = System.currentTimeMillis();
+ long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
+ ReturnType lockAcqResult = acquireLock(key, lockId);
+ long lockAcqTime = System.currentTimeMillis();
+ if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
+ ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
+ queryObject, lockId, conditionInfo);
+ long criticalPutTime = System.currentTimeMillis();
+ deleteLock(key);
+ long lockDeleteTime = System.currentTimeMillis();
+ String timingInfo = "|lock creation time:" + (lockCreationTime - start)
+ + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
+ + "|critical put time:" + (criticalPutTime - lockAcqTime)
+ + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
+ criticalPutResult.setTimingInfo(timingInfo);
+ return criticalPutResult;
+ } else {
+ logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
+ deleteLock(key);
+ return lockAcqResult;
+ }
+ }
+
+
+
+
+ /**
+ * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
+ *
+ * @param keyspaceName name of the keyspace
+ * @param tableName name of the table
+ * @param primaryKey primary key value
+ * @param queryObject query object containing prepared query and values
+ * @return ResultSet
+ * @throws MusicServiceException
+ * @throws MusicLockingException
+ */
+ public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
+ PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {
+ String key = keyspaceName + "." + tableName + "." + primaryKey;
+ String lockId = createLockReference(key);
+ long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
+ ReturnType lockAcqResult = acquireLock(key, lockId);
+ if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
+ ResultSet result =
+ criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockId);
+ voluntaryReleaseLock(lockId);
+ return result;
+ } else {
+ destroyLockRef(lockId);
+ logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
+ return null;
+ }
+ }
+
+ public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
+ PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {
+ String key = keyspaceName + "." + tableName + "." + primaryKey;
+ String lockId = createLockReference(key);
+ long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
+
+ ReturnType lockAcqResult = acquireLock(key, lockId);
+
+ if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+ logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
+ ResultSet result = criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockId);
+ deleteLock(key);
+ return result;
+ } else {
+ deleteLock(key);
+ logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
+ return null;
+ }
+ }
+
+ /**
+ * @param lockName
+ * @return
+ */
+ public Map<String, Object> validateLock(String lockName) {
+ Map<String, Object> resultMap = new HashMap<>();
+ String[] locks = lockName.split("\\.");
+ if(locks.length < 3) {
+ resultMap.put("Error", "Invalid lock. Please make sure lock is of the type keyspaceName.tableName.primaryKey");
+ return resultMap;
+ }
+ String keyspace= locks[0];
+ if(keyspace.startsWith("$"))
+ keyspace = keyspace.substring(1);
+ resultMap.put("keyspace",keyspace);
+ return resultMap;
+ }
+
+
+
+ @Override
+ public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject,
+ String consistency) throws MusicServiceException {
+ boolean result = false;
+
+ try {
+ //create shadow locking table
+ result = createLockQueue(keyspace, table);
+ if(result == false)
+ return ResultType.FAILURE;
+
+ result = false;
+
+ //create table to track unsynced_keys
+ table = "unsyncedKeys_"+table;
+
+ String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
+ + " ( key text,PRIMARY KEY (key) );";
+ System.out.println(tabQuery);
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+
+ queryObject.appendQueryString(tabQuery);
+ result = false;
+ result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
+
+
+ //create actual table
+ result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
+ } catch (MusicQueryException | MusicServiceException ex) {
+ logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
+ throw new MusicServiceException(ex.getMessage());
+ }
+ return result?ResultType.SUCCESS:ResultType.FAILURE;
+ }
+
+ public static boolean createLockQueue(String keyspace, String table) throws MusicServiceException, MusicQueryException {
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "Create lock queue/table for " + keyspace+"."+table);
+ table = "lockQ_"+table;
+ String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
+ + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, PRIMARY KEY ((key), lockReference) ) "
+ + "WITH CLUSTERING ORDER BY (lockReference ASC);";
+ System.out.println(tabQuery);
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+
+ queryObject.appendQueryString(tabQuery);
+ boolean result;
+ result = MusicDataStoreHandle.mDstoreHandle.executePut(queryObject, "eventual");
+ return result;
+ }
+
+
+ @Override
+ public List<String> getLockQueue(String fullyQualifiedKey)
+ throws MusicServiceException, MusicQueryException, MusicLockingException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+
+
+ @Override
+ public long getLockQueueSize(String fullyQualifiedKey)
+ throws MusicServiceException, MusicQueryException, MusicLockingException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+ @Override
+ public ReturnType eventualPut_nb(PreparedQueryObject queryObject, String keyspace, String tablename,
+ String primaryKey) {
+ return eventualPut(queryObject);
+ }
+
+
+
+
+}