diff options
Diffstat (limited to 'jar/src/main/java/org/onap/music/main/MusicCore.java')
-rw-r--r-- | jar/src/main/java/org/onap/music/main/MusicCore.java | 992 |
1 files changed, 992 insertions, 0 deletions
diff --git a/jar/src/main/java/org/onap/music/main/MusicCore.java b/jar/src/main/java/org/onap/music/main/MusicCore.java new file mode 100644 index 00000000..cad384a3 --- /dev/null +++ b/jar/src/main/java/org/onap/music/main/MusicCore.java @@ -0,0 +1,992 @@ +/* + * ============LICENSE_START========================================== + * org.onap.music + * =================================================================== + * Copyright (c) 2017 AT&T Intellectual Property + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ +package org.onap.music.main; + + +import java.io.StringWriter; +import java.util.HashMap; +import java.util.Map; +import java.util.StringTokenizer; + +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.onap.music.datastore.MusicDataStore; +import org.onap.music.datastore.PreparedQueryObject; +import org.onap.music.datastore.jsonobjects.JsonKeySpace; +import org.onap.music.eelf.logging.EELFLoggerDelegate; +import org.onap.music.eelf.logging.format.AppMessages; +import org.onap.music.eelf.logging.format.ErrorSeverity; +import org.onap.music.eelf.logging.format.ErrorTypes; +import org.onap.music.exceptions.MusicLockingException; +import org.onap.music.exceptions.MusicQueryException; +import org.onap.music.exceptions.MusicServiceException; +import org.onap.music.lockingservice.MusicLockState; +import org.onap.music.lockingservice.MusicLockState.LockStatus; +import org.onap.music.lockingservice.MusicLockingService; + +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.ColumnDefinitions.Definition; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.TableMetadata; + +/** + * This class ..... + * + * + */ +public class MusicCore { + + public static MusicLockingService mLockHandle = null; + public static MusicDataStore mDstoreHandle = null; + private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCore.class); + + public static class Condition { + Map<String, Object> conditions; + PreparedQueryObject selectQueryForTheRow; + + public Condition(Map<String, Object> conditions, PreparedQueryObject selectQueryForTheRow) { + this.conditions = conditions; + this.selectQueryForTheRow = selectQueryForTheRow; + } + + public boolean testCondition() throws Exception { + // first generate the row + ResultSet results = quorumGet(selectQueryForTheRow); + Row row = results.one(); + return getDSHandle().doesRowSatisfyCondition(row, conditions); + } + } + + + public static MusicLockingService getLockingServiceHandle() throws MusicLockingException { + logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle"); + long start = System.currentTimeMillis(); + + if (mLockHandle == null) { + try { + mLockHandle = new MusicLockingService(); + } catch (Exception e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + throw new MusicLockingException("Failed to aquire Locl store handle " + e); + } + } + long end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms"); + return mLockHandle; + } + + /** + * + * @param remoteIp + * @return + */ + public static MusicDataStore getDSHandle(String remoteIp) { + logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring data store handle"); + long start = System.currentTimeMillis(); + if (mDstoreHandle == null) { + try { + MusicUtil.loadProperties(); + } catch (Exception e) { + logger.error(EELFLoggerDelegate.errorLogger, "No properties file defined. Falling back to default."); + } + mDstoreHandle = new MusicDataStore(remoteIp); + } + long end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire data store handle:" + (end - start) + " ms"); + return mDstoreHandle; + } + + /** + * + * @return + * @throws MusicServiceException + */ + public static MusicDataStore getDSHandle() throws MusicServiceException { + logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring data store handle"); + long start = System.currentTimeMillis(); + if (mDstoreHandle == null) { + try { + MusicUtil.loadProperties(); + } catch (Exception e) { + logger.error(EELFLoggerDelegate.errorLogger, "No properties file defined. Falling back to default."); + } + // Quick Fix - Best to put this into every call to getDSHandle? + if (! MusicUtil.getMyCassaHost().equals("localhost") ) { + mDstoreHandle = new MusicDataStore(MusicUtil.getMyCassaHost()); + } else { + mDstoreHandle = new MusicDataStore(); + } + } + if(mDstoreHandle.getSession() == null) { + String message = "Connection to Cassandra has not been enstablished." + + " Please check connection properites and reboot."; + logger.info(EELFLoggerDelegate.applicationLogger, message); + throw new MusicServiceException(message); + } + long end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire data store handle:" + (end - start) + " ms"); + return mDstoreHandle; + } + + public static String createLockReference(String lockName) { + logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName); + long start = System.currentTimeMillis(); + String lockId = null; + try { + lockId = getLockingServiceHandle().createLockId("/" + lockName); + } catch (MusicLockingException e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.CREATELOCK+lockName,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + + } + long end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms"); + return lockId; + } + + /** + * + * @param key + * @return + */ + public static boolean isTableOrKeySpaceLock(String key) { + String[] splitString = key.split("\\."); + if (splitString.length > 2) + return false; + else + return true; + } + + /** + * + * @param key + * @return + */ + public static MusicLockState getMusicLockState(String key) { + long start = System.currentTimeMillis(); + try { + String[] splitString = key.split("\\."); + String keyspaceName = splitString[0]; + String tableName = splitString[1]; + String primaryKey = splitString[2]; + MusicLockState mls; + String lockName = keyspaceName + "." + tableName + "." + primaryKey; + mls = getLockingServiceHandle().getLockState(lockName); + long end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to get lock state:" + (end - start) + " ms"); + return mls; + } catch (NullPointerException | MusicLockingException e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.INVALIDLOCK,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + } + return null; + } + + public static ReturnType acquireLockWithLease(String key, String lockId, long leasePeriod) { + try { + long start = System.currentTimeMillis(); + /* check if the current lock has exceeded its lease and if yes, release that lock */ + MusicLockState mls = getMusicLockState(key); + if (mls != null) { + if (mls.getLockStatus().equals(LockStatus.LOCKED)) { + logger.info(EELFLoggerDelegate.applicationLogger,"The current lock holder for " + key + " is " + mls.getLockHolder() + + ". Checking if it has exceeded lease"); + long currentLockPeriod = System.currentTimeMillis() - mls.getLeaseStartTime(); + long currentLeasePeriod = mls.getLeasePeriod(); + if (currentLockPeriod > currentLeasePeriod) { + logger.info(EELFLoggerDelegate.applicationLogger,"Lock period " + currentLockPeriod + + " has exceeded lease period " + currentLeasePeriod); + boolean voluntaryRelease = false; + String currentLockHolder = mls.getLockHolder(); + mls = releaseLock(currentLockHolder, voluntaryRelease); + } + } + } else + logger.error(EELFLoggerDelegate.errorLogger,key, AppMessages.INVALIDLOCK,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + + /* + * call the traditional acquire lock now and if the result returned is true, set the + * begin time-stamp and lease period + */ + if (acquireLock(key, lockId).getResult() == ResultType.SUCCESS) { + mls = getMusicLockState(key);// get latest state + if ( mls == null ) { + logger.info(EELFLoggerDelegate.applicationLogger,"Music Lock State is null"); + return new ReturnType(ResultType.FAILURE, "Could not acquire lock, Lock State is null"); + } + if (mls.getLeaseStartTime() == -1) {// set it again only if it is not set already + mls.setLeaseStartTime(System.currentTimeMillis()); + mls.setLeasePeriod(leasePeriod); + getLockingServiceHandle().setLockState(key, mls); + } + long end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire leased lock:" + (end - start) + " ms"); + return new ReturnType(ResultType.SUCCESS, "Accquired lock"); + } else { + long end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to fail to acquire leased lock:" + (end - start) + " ms"); + return new ReturnType(ResultType.FAILURE, "Could not acquire lock"); + } + } catch (Exception e) { + StringWriter sw = new StringWriter(); + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + + String exceptionAsString = sw.toString(); + return new ReturnType(ResultType.FAILURE, + "Exception thrown in acquireLockWithLease:\n" + exceptionAsString); + } + } + + public static ReturnType acquireLock(String key, String lockId) throws MusicLockingException { + /* + * first check if I am on top. Since ids are not reusable there is no need to check + * lockStatus If the status is unlocked, then the above call will automatically return + * false. + */ + Boolean result = false; + try { + result = getLockingServiceHandle().isMyTurn(lockId); + } catch (MusicLockingException e2) { + logger.error(EELFLoggerDelegate.errorLogger,AppMessages.INVALIDLOCK + lockId + " " + e2); + throw new MusicLockingException(); + } + if (!result) { + logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Not your turn, someone else has the lock"); + try { + if (!getLockingServiceHandle().lockIdExists(lockId)) { + logger.info(EELFLoggerDelegate.applicationLogger, "In acquire lock: this lockId doesn't exist"); + return new ReturnType(ResultType.FAILURE, "Lockid doesn't exist"); + } + } catch (MusicLockingException e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.INVALIDLOCK+lockId,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + throw new MusicLockingException(); + } + logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: returning failure"); + return new ReturnType(ResultType.FAILURE, "Not your turn, someone else has the lock"); + } + + + // this is for backward compatibility where locks could also be acquired on just + // keyspaces or tables. + if (isTableOrKeySpaceLock(key)) { + logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: A table or keyspace lock so no need to perform sync...so returning true"); + return new ReturnType(ResultType.SUCCESS, "A table or keyspace lock so no need to perform sync...so returning true"); + } + + // read the lock name corresponding to the key and if the status is locked or being locked, + // then return false + MusicLockState currentMls = null; + MusicLockState newMls = null; + try { + currentMls = getMusicLockState(key); + String currentLockHolder = currentMls.getLockHolder(); + if (lockId.equals(currentLockHolder)) { + logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: You already have the lock!"); + return new ReturnType(ResultType.SUCCESS, "You already have the lock!"); + } + } catch (NullPointerException e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.INVALIDLOCK+lockId,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + } + + // change status to "being locked". This state transition is necessary to ensure syncing + // before granting the lock + String lockHolder = null; + boolean needToSyncQuorum = false; + if (currentMls != null) + needToSyncQuorum = currentMls.isNeedToSyncQuorum(); + + + newMls = new MusicLockState(MusicLockState.LockStatus.BEING_LOCKED, lockHolder, + needToSyncQuorum); + try { + getLockingServiceHandle().setLockState(key, newMls); + } catch (MusicLockingException e1) { + logger.error(EELFLoggerDelegate.errorLogger,e1.getMessage(), AppMessages.LOCKSTATE+key,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + } + logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Set lock state to being_locked"); + + // do syncing if this was a forced lock release + if (needToSyncQuorum) { + logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Since there was a forcible release, need to sync quorum!"); + try { + syncQuorum(key); + } catch (Exception e) { + logger.error(EELFLoggerDelegate.errorLogger,"Failed to set Lock state " + e); + } + } + + // change status to locked + lockHolder = lockId; + needToSyncQuorum = false; + newMls = new MusicLockState(MusicLockState.LockStatus.LOCKED, lockHolder, needToSyncQuorum); + try { + getLockingServiceHandle().setLockState(key, newMls); + } catch (MusicLockingException e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKSTATE+key,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + } + logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Set lock state to locked and assigned current lock ref " + + lockId + " as holder"); + + return new ReturnType(result?ResultType.SUCCESS:ResultType.FAILURE, "Set lock state to locked and assigned a lock holder"); + } + + + + /** + * + * @param keyspaceName + * @param kspObject + * @return + * @throws Exception + */ + public boolean createKeyspace(String keyspaceName, JsonKeySpace kspObject) throws Exception { + return true; + } + + + private static void syncQuorum(String key) throws Exception { + logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---"); + String[] splitString = key.split("\\."); + String keyspaceName = splitString[0]; + String tableName = splitString[1]; + String primaryKeyValue = splitString[2]; + PreparedQueryObject selectQuery = new PreparedQueryObject(); + PreparedQueryObject updateQuery = new PreparedQueryObject(); + + // get the primary key d + TableMetadata tableInfo = returnColumnMetadata(keyspaceName, tableName); + String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName();// we only support single + // primary key + DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType(); + Object cqlFormattedPrimaryKeyValue = + MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue); + + // get the row of data from a quorum + selectQuery.appendQueryString("SELECT * FROM " + keyspaceName + "." + tableName + " WHERE " + + primaryKeyName + "= ?" + ";"); + selectQuery.addValue(cqlFormattedPrimaryKeyValue); + ResultSet results = null; + try { + results = getDSHandle().executeCriticalGet(selectQuery); + // write it back to a quorum + Row row = results.one(); + ColumnDefinitions colInfo = row.getColumnDefinitions(); + int totalColumns = colInfo.size(); + int counter = 1; + StringBuilder fieldValueString = new StringBuilder(""); + for (Definition definition : colInfo) { + String colName = definition.getName(); + if (colName.equals(primaryKeyName)) + continue; + DataType colType = definition.getType(); + Object valueObj = getDSHandle().getColValue(row, colName, colType); + Object valueString = MusicUtil.convertToActualDataType(colType, valueObj); + fieldValueString.append(colName + " = ?"); + updateQuery.addValue(valueString); + if (counter != (totalColumns - 1)) + fieldValueString.append(","); + counter = counter + 1; + } + updateQuery.appendQueryString("UPDATE " + keyspaceName + "." + tableName + " SET " + + fieldValueString + " WHERE " + primaryKeyName + "= ? " + ";"); + updateQuery.addValue(cqlFormattedPrimaryKeyValue); + + getDSHandle().executePut(updateQuery, "critical"); + } catch (MusicServiceException | MusicQueryException e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.QUERYERROR +""+updateQuery ,ErrorSeverity.MAJOR, ErrorTypes.QUERYERROR); + } + } + + + + + /** + * + * @param query + * @return ResultSet + */ + public static ResultSet quorumGet(PreparedQueryObject query) { + ResultSet results = null; + try { + results = getDSHandle().executeCriticalGet(query); + } catch (MusicServiceException | MusicQueryException e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR); + + } + return results; + + } + + /** + * + * @param results + * @return + * @throws MusicServiceException + */ + public static Map<String, HashMap<String, Object>> marshallResults(ResultSet results) throws MusicServiceException { + return getDSHandle().marshalData(results); + } + + /** + * + * @param lockName + * @return + */ + public static String whoseTurnIsIt(String lockName) { + + try { + return getLockingServiceHandle().whoseTurnIsIt("/" + lockName) + ""; + } catch (MusicLockingException e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+lockName ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + } + return null; + + + } + + /** + * + * @param lockId + * @return + */ + public static String getLockNameFromId(String lockId) { + StringTokenizer st = new StringTokenizer(lockId); + return st.nextToken("$"); + } + + public static void destroyLockRef(String lockId) { + long start = System.currentTimeMillis(); + try { + getLockingServiceHandle().unlockAndDeleteId(lockId); + } catch (MusicLockingException | NoNodeException e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockId ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + } + long end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms"); + } + + public static MusicLockState releaseLock(String lockId, boolean voluntaryRelease) { + long start = System.currentTimeMillis(); + try { + getLockingServiceHandle().unlockAndDeleteId(lockId); + } catch (MusicLockingException e1) { + logger.error(EELFLoggerDelegate.errorLogger,e1.getMessage(), AppMessages.RELEASELOCK+lockId ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + } catch (KeeperException.NoNodeException nne) { + logger.error(EELFLoggerDelegate.errorLogger,"Failed to release Lock " + lockId + " " + nne); + MusicLockState mls = new MusicLockState("Lock doesn't exists. Release lock operation failed."); + return mls; + } + String lockName = getLockNameFromId(lockId); + MusicLockState mls; + String lockHolder = null; + if (voluntaryRelease) { + mls = new MusicLockState(MusicLockState.LockStatus.UNLOCKED, lockHolder); + logger.info(EELFLoggerDelegate.applicationLogger,"In unlock: lock voluntarily released for " + lockId); + } else { + boolean needToSyncQuorum = true; + mls = new MusicLockState(MusicLockState.LockStatus.UNLOCKED, lockHolder, + needToSyncQuorum); + logger.info(EELFLoggerDelegate.applicationLogger,"In unlock: lock forcibly released for " + lockId); + } + try { + getLockingServiceHandle().setLockState(lockName, mls); + } catch (MusicLockingException e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.RELEASELOCK+lockId ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + } + long end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to release lock:" + (end - start) + " ms"); + return mls; + } + + public static void voluntaryReleaseLock(String lockId) throws MusicLockingException{ + try { + getLockingServiceHandle().unlockAndDeleteId(lockId); + } catch (KeeperException.NoNodeException e) { + // ??? No way + } + } + + /** + * + * @param lockName + * @throws MusicLockingException + */ + public static void deleteLock(String lockName) throws MusicLockingException { + long start = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger,"Deleting lock for " + lockName); + try { + getLockingServiceHandle().deleteLock("/" + lockName); + } catch (MusicLockingException e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DELTELOCK+lockName ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + throw new MusicLockingException(e.getMessage()); + } + long end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to delete lock:" + (end - start) + " ms"); + } + + + + /** + * + * @param keyspace + * @param tablename + * @return + * @throws MusicServiceException + */ + public static TableMetadata returnColumnMetadata(String keyspace, String tablename) throws MusicServiceException { + return getDSHandle().returnColumnMetadata(keyspace, tablename); + } + + + /** + * + * @param nodeName + */ + public static void pureZkCreate(String nodeName) { + try { + getLockingServiceHandle().getzkLockHandle().createNode(nodeName); + } catch (MusicLockingException e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + } + } + + /** + * + * @param nodeName + * @param data + */ + public static void pureZkWrite(String nodeName, byte[] data) { + long start = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger,"Performing zookeeper write to " + nodeName); + try { + getLockingServiceHandle().getzkLockHandle().setNodeData(nodeName, data); + } catch (MusicLockingException e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + } + logger.info(EELFLoggerDelegate.applicationLogger,"Performed zookeeper write to " + nodeName); + long end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the actual zk put:" + (end - start) + " ms"); + } + + /** + * + * @param nodeName + * @return + */ + public static byte[] pureZkRead(String nodeName) { + long start = System.currentTimeMillis(); + byte[] data = null; + try { + data = getLockingServiceHandle().getzkLockHandle().getNodeData(nodeName); + } catch (MusicLockingException e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR); + } + long end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the actual zk put:" + (end - start) + " ms"); + return data; + } + + + + // Prepared Query Additions. + + /** + * + * @param keyspaceName + * @param tableName + * @param primaryKey + * @param queryObject + * @return ReturnType + * @throws MusicServiceException + */ + public static ReturnType eventualPut(PreparedQueryObject queryObject) { + boolean result = false; + try { + result = getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL); + } catch (MusicServiceException | MusicQueryException ex) { + logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR); + logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex); + return new ReturnType(ResultType.FAILURE, ex.getMessage()); + } + if (result) { + return new ReturnType(ResultType.SUCCESS, "Success"); + } else { + return new ReturnType(ResultType.FAILURE, "Failure"); + } + } + + /** + * + * @param keyspaceName + * @param tableName + * @param primaryKey + * @param queryObject + * @param lockId + * @return + */ + public static ReturnType criticalPut(String keyspaceName, String tableName, String primaryKey, + PreparedQueryObject queryObject, String lockId, Condition conditionInfo) { + long start = System.currentTimeMillis(); + + try { + MusicLockState mls = getLockingServiceHandle() + .getLockState(keyspaceName + "." + tableName + "." + primaryKey); + if (mls.getLockHolder().equals(lockId) == true) { + if (conditionInfo != null) + try { + if (conditionInfo.testCondition() == false) + return new ReturnType(ResultType.FAILURE, + "Lock acquired but the condition is not true"); + } catch (Exception e) { + return new ReturnType(ResultType.FAILURE, + "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n" + + e.getMessage()); + } + getDSHandle().executePut(queryObject, MusicUtil.CRITICAL); + long end = System.currentTimeMillis(); + logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms"); + return new ReturnType(ResultType.SUCCESS, "Update performed"); + } else + return new ReturnType(ResultType.FAILURE, + "Cannot perform operation since you are the not the lock holder"); + } catch (MusicQueryException | MusicServiceException e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage()); + return new ReturnType(ResultType.FAILURE, + "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n" + + e.getMessage()); + }catch(MusicLockingException ex){ + return new ReturnType(ResultType.FAILURE,ex.getMessage()); + } + + } + + /** + * + * @param queryObject + * @param consistency + * @return Boolean Indicates success or failure + * @throws MusicServiceException + * + * + */ + public static ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException { + // this is mainly for some functions like keyspace creation etc which does not + // really need the bells and whistles of Music locking. + boolean result = false; + try { + result = getDSHandle().executePut(queryObject, consistency); + } catch (MusicQueryException | MusicServiceException ex) { + logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR); + throw new MusicServiceException(ex.getMessage()); + } + return result?ResultType.SUCCESS:ResultType.FAILURE; + } + + /** + * This method performs DDL operation on cassandra. + * + * @param queryObject query object containing prepared query and values + * @return ResultSet + * @throws MusicServiceException + */ + public static ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException { + ResultSet results = null; + try { + results = getDSHandle().executeEventualGet(queryObject); + } catch (MusicQueryException | MusicServiceException e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage()); + throw new MusicServiceException(e.getMessage()); + } + return results; + } + + /** + * This method performs DDL operations on cassandra, if the the resource is available. Lock ID + * is used to check if the resource is free. + * + * @param keyspaceName name of the keyspace + * @param tableName name of the table + * @param primaryKey primary key value + * @param queryObject query object containing prepared query and values + * @param lockId lock ID to check if the resource is free to perform the operation. + * @return ResultSet + */ + public static ResultSet criticalGet(String keyspaceName, String tableName, String primaryKey, + PreparedQueryObject queryObject, String lockId) throws MusicServiceException { + ResultSet results = null; + try { + MusicLockState mls = getLockingServiceHandle() + .getLockState(keyspaceName + "." + tableName + "." + primaryKey); + if (mls.getLockHolder().equals(lockId)) { + results = getDSHandle().executeCriticalGet(queryObject); + } else + throw new MusicServiceException("YOU DO NOT HAVE THE LOCK"); + } catch (MusicQueryException | MusicServiceException | MusicLockingException e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR); + } + return results; + } + + /** + * This method performs DML operation on cassandra, when the lock of the dd is acquired. + * + * @param keyspaceName name of the keyspace + * @param tableName name of the table + * @param primaryKey primary key value + * @param queryObject query object containing prepared query and values + * @return ReturnType + * @throws MusicLockingException + */ + public static ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey, + PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException { + + long start = System.currentTimeMillis(); + String key = keyspaceName + "." + tableName + "." + primaryKey; + String lockId = createLockReference(key); + long lockCreationTime = System.currentTimeMillis(); + ReturnType lockAcqResult = acquireLock(key, lockId); + long lockAcqTime = System.currentTimeMillis(); + if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) { + logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId); + ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, + queryObject, lockId, conditionInfo); + long criticalPutTime = System.currentTimeMillis(); + voluntaryReleaseLock(lockId); + long lockDeleteTime = System.currentTimeMillis(); + String timingInfo = "|lock creation time:" + (lockCreationTime - start) + + "|lock accquire time:" + (lockAcqTime - lockCreationTime) + + "|critical put time:" + (criticalPutTime - lockAcqTime) + + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|"; + criticalPutResult.setTimingInfo(timingInfo); + return criticalPutResult; + } else { + logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId); + destroyLockRef(lockId); + return lockAcqResult; + } + } + + /** + * this function is mainly for the benchmarks to see the effect of lock deletion. + * + * @param keyspaceName + * @param tableName + * @param primaryKey + * @param queryObject + * @param conditionInfo + * @return + * @throws MusicLockingException + */ + public static ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, + String primaryKey, PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException { + + long start = System.currentTimeMillis(); + String key = keyspaceName + "." + tableName + "." + primaryKey; + String lockId = createLockReference(key); + long lockCreationTime = System.currentTimeMillis(); + long leasePeriod = MusicUtil.getDefaultLockLeasePeriod(); + ReturnType lockAcqResult = acquireLock(key, lockId); + long lockAcqTime = System.currentTimeMillis(); + if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) { + logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId); + ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, + queryObject, lockId, conditionInfo); + long criticalPutTime = System.currentTimeMillis(); + deleteLock(key); + long lockDeleteTime = System.currentTimeMillis(); + String timingInfo = "|lock creation time:" + (lockCreationTime - start) + + "|lock accquire time:" + (lockAcqTime - lockCreationTime) + + "|critical put time:" + (criticalPutTime - lockAcqTime) + + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|"; + criticalPutResult.setTimingInfo(timingInfo); + return criticalPutResult; + } else { + logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId); + deleteLock(key); + return lockAcqResult; + } + } + + + + + /** + * This method performs DDL operation on cassasndra, when the lock for the resource is acquired. + * + * @param keyspaceName name of the keyspace + * @param tableName name of the table + * @param primaryKey primary key value + * @param queryObject query object containing prepared query and values + * @return ResultSet + * @throws MusicServiceException + * @throws MusicLockingException + */ + public static ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey, + PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException { + String key = keyspaceName + "." + tableName + "." + primaryKey; + String lockId = createLockReference(key); + long leasePeriod = MusicUtil.getDefaultLockLeasePeriod(); + ReturnType lockAcqResult = acquireLock(key, lockId); + if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) { + logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId); + ResultSet result = + criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockId); + voluntaryReleaseLock(lockId); + return result; + } else { + destroyLockRef(lockId); + logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId); + return null; + } + } + + public static ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey, + PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException { + String key = keyspaceName + "." + tableName + "." + primaryKey; + String lockId = createLockReference(key); + long leasePeriod = MusicUtil.getDefaultLockLeasePeriod(); + + ReturnType lockAcqResult = acquireLock(key, lockId); + + if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) { + logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId); + ResultSet result = criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockId); + deleteLock(key); + return result; + } else { + deleteLock(key); + logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId); + return null; + } + } + + + + /** + * authenticate user logic + * + * @param nameSpace + * @param userId + * @param password + * @param keyspace + * @param aid + * @param operation + * @return + * @throws Exception + */ + public static Map<String, Object> autheticateUser(String nameSpace, String userId, + String password, String keyspace, String aid, String operation) + throws Exception { + Map<String, Object> resultMap = new HashMap<>(); + String uuid = null; + resultMap = CachingUtil.validateRequest(nameSpace, userId, password, keyspace, aid, + operation); + if (!resultMap.isEmpty()) + return resultMap; + String isAAFApp = null; + try { + isAAFApp= CachingUtil.isAAFApplication(nameSpace); + } catch(MusicServiceException e) { + resultMap.put("Exception", e.getMessage()); + return resultMap; + } + if(isAAFApp == null) { + resultMap.put("Exception", "Namespace: "+nameSpace+" doesn't exist. Please make sure ns(appName)" + + " is correct and Application is onboarded."); + return resultMap; + } + boolean isAAF = Boolean.valueOf(isAAFApp); + if (userId == null || password == null) { + logger.error(EELFLoggerDelegate.errorLogger,"", AppMessages.MISSINGINFO ,ErrorSeverity.WARN, ErrorTypes.AUTHENTICATIONERROR); + logger.error(EELFLoggerDelegate.errorLogger,"One or more required headers is missing. userId: " + userId + + " :: password: " + password); + resultMap.put("Exception", + "UserId and Password are mandatory for the operation " + operation); + return resultMap; + } + if(!isAAF && !(operation.equals("createKeySpace"))) { + resultMap = CachingUtil.authenticateAIDUser(nameSpace, userId, password, keyspace); + if (!resultMap.isEmpty()) + return resultMap; + + } + if (isAAF && nameSpace != null && userId != null && password != null) { + boolean isValid = true; + try { + isValid = CachingUtil.authenticateAAFUser(nameSpace, userId, password, keyspace); + } catch (Exception e) { + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.AUTHENTICATIONERROR ,ErrorSeverity.WARN, ErrorTypes.AUTHENTICATIONERROR); + logger.error(EELFLoggerDelegate.errorLogger,"Got exception while AAF authentication for namespace " + nameSpace); + resultMap.put("Exception", e.getMessage()); + } + if (!isValid) { + logger.error(EELFLoggerDelegate.errorLogger,"", AppMessages.AUTHENTICATIONERROR ,ErrorSeverity.WARN, ErrorTypes.AUTHENTICATIONERROR); + resultMap.put("Exception", "User not authenticated..."); + } + if (!resultMap.isEmpty()) + return resultMap; + + } + + if (operation.equals("createKeySpace")) { + logger.info(EELFLoggerDelegate.applicationLogger,"AID is not provided. Creating new UUID for keyspace."); + PreparedQueryObject pQuery = new PreparedQueryObject(); + pQuery.appendQueryString( + "select uuid from admin.keyspace_master where application_name=? and username=? and keyspace_name=? allow filtering"); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), nameSpace)); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), userId)); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), + MusicUtil.DEFAULTKEYSPACENAME)); + + try { + Row rs = MusicCore.get(pQuery).one(); + uuid = rs.getUUID("uuid").toString(); + resultMap.put("uuid", "existing"); + } catch (Exception e) { + logger.info(EELFLoggerDelegate.applicationLogger,"No UUID found in DB. So creating new UUID."); + uuid = CachingUtil.generateUUID(); + resultMap.put("uuid", "new"); + } + resultMap.put("aid", uuid); + } + + return resultMap; + } + + /** + * @param lockName + * @return + */ + public static Map<String, Object> validateLock(String lockName) { + Map<String, Object> resultMap = new HashMap<>(); + String[] locks = lockName.split("\\."); + if(locks.length < 3) { + resultMap.put("Exception", "Invalid lock. Please make sure lock is of the type keyspaceName.tableName.primaryKey"); + return resultMap; + } + String keyspace= locks[0]; + if(keyspace.startsWith("$")) + keyspace = keyspace.substring(1); + resultMap.put("keyspace",keyspace); + return resultMap; + } +} |