aboutsummaryrefslogtreecommitdiffstats
path: root/jar/src/main/java/org/onap/music/main/MusicCore.java
diff options
context:
space:
mode:
Diffstat (limited to 'jar/src/main/java/org/onap/music/main/MusicCore.java')
-rw-r--r--jar/src/main/java/org/onap/music/main/MusicCore.java992
1 files changed, 992 insertions, 0 deletions
diff --git a/jar/src/main/java/org/onap/music/main/MusicCore.java b/jar/src/main/java/org/onap/music/main/MusicCore.java
new file mode 100644
index 00000000..cad384a3
--- /dev/null
+++ b/jar/src/main/java/org/onap/music/main/MusicCore.java
@@ -0,0 +1,992 @@
+/*
+ * ============LICENSE_START==========================================
+ * org.onap.music
+ * ===================================================================
+ * Copyright (c) 2017 AT&T Intellectual Property
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=============================================
+ * ====================================================================
+ */
+package org.onap.music.main;
+
+
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.onap.music.datastore.MusicDataStore;
+import org.onap.music.datastore.PreparedQueryObject;
+import org.onap.music.datastore.jsonobjects.JsonKeySpace;
+import org.onap.music.eelf.logging.EELFLoggerDelegate;
+import org.onap.music.eelf.logging.format.AppMessages;
+import org.onap.music.eelf.logging.format.ErrorSeverity;
+import org.onap.music.eelf.logging.format.ErrorTypes;
+import org.onap.music.exceptions.MusicLockingException;
+import org.onap.music.exceptions.MusicQueryException;
+import org.onap.music.exceptions.MusicServiceException;
+import org.onap.music.lockingservice.MusicLockState;
+import org.onap.music.lockingservice.MusicLockState.LockStatus;
+import org.onap.music.lockingservice.MusicLockingService;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ColumnDefinitions.Definition;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.TableMetadata;
+
+/**
+ * This class .....
+ *
+ *
+ */
+public class MusicCore {
+
+ public static MusicLockingService mLockHandle = null;
+ public static MusicDataStore mDstoreHandle = null;
+ private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCore.class);
+
+ public static class Condition {
+ Map<String, Object> conditions;
+ PreparedQueryObject selectQueryForTheRow;
+
+ public Condition(Map<String, Object> conditions, PreparedQueryObject selectQueryForTheRow) {
+ this.conditions = conditions;
+ this.selectQueryForTheRow = selectQueryForTheRow;
+ }
+
+ public boolean testCondition() throws Exception {
+ // first generate the row
+ ResultSet results = quorumGet(selectQueryForTheRow);
+ Row row = results.one();
+ return getDSHandle().doesRowSatisfyCondition(row, conditions);
+ }
+ }
+
+
+ public static MusicLockingService getLockingServiceHandle() throws MusicLockingException {
+ logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
+ long start = System.currentTimeMillis();
+
+ if (mLockHandle == null) {
+ try {
+ mLockHandle = new MusicLockingService();
+ } catch (Exception e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ throw new MusicLockingException("Failed to aquire Locl store handle " + e);
+ }
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
+ return mLockHandle;
+ }
+
+ /**
+ *
+ * @param remoteIp
+ * @return
+ */
+ public static MusicDataStore getDSHandle(String remoteIp) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring data store handle");
+ long start = System.currentTimeMillis();
+ if (mDstoreHandle == null) {
+ try {
+ MusicUtil.loadProperties();
+ } catch (Exception e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "No properties file defined. Falling back to default.");
+ }
+ mDstoreHandle = new MusicDataStore(remoteIp);
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire data store handle:" + (end - start) + " ms");
+ return mDstoreHandle;
+ }
+
+ /**
+ *
+ * @return
+ * @throws MusicServiceException
+ */
+ public static MusicDataStore getDSHandle() throws MusicServiceException {
+ logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring data store handle");
+ long start = System.currentTimeMillis();
+ if (mDstoreHandle == null) {
+ try {
+ MusicUtil.loadProperties();
+ } catch (Exception e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "No properties file defined. Falling back to default.");
+ }
+ // Quick Fix - Best to put this into every call to getDSHandle?
+ if (! MusicUtil.getMyCassaHost().equals("localhost") ) {
+ mDstoreHandle = new MusicDataStore(MusicUtil.getMyCassaHost());
+ } else {
+ mDstoreHandle = new MusicDataStore();
+ }
+ }
+ if(mDstoreHandle.getSession() == null) {
+ String message = "Connection to Cassandra has not been enstablished."
+ + " Please check connection properites and reboot.";
+ logger.info(EELFLoggerDelegate.applicationLogger, message);
+ throw new MusicServiceException(message);
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire data store handle:" + (end - start) + " ms");
+ return mDstoreHandle;
+ }
+
+ public static String createLockReference(String lockName) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
+ long start = System.currentTimeMillis();
+ String lockId = null;
+ try {
+ lockId = getLockingServiceHandle().createLockId("/" + lockName);
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.CREATELOCK+lockName,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
+ return lockId;
+ }
+
+ /**
+ *
+ * @param key
+ * @return
+ */
+ public static boolean isTableOrKeySpaceLock(String key) {
+ String[] splitString = key.split("\\.");
+ if (splitString.length > 2)
+ return false;
+ else
+ return true;
+ }
+
+ /**
+ *
+ * @param key
+ * @return
+ */
+ public static MusicLockState getMusicLockState(String key) {
+ long start = System.currentTimeMillis();
+ try {
+ String[] splitString = key.split("\\.");
+ String keyspaceName = splitString[0];
+ String tableName = splitString[1];
+ String primaryKey = splitString[2];
+ MusicLockState mls;
+ String lockName = keyspaceName + "." + tableName + "." + primaryKey;
+ mls = getLockingServiceHandle().getLockState(lockName);
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to get lock state:" + (end - start) + " ms");
+ return mls;
+ } catch (NullPointerException | MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.INVALIDLOCK,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ return null;
+ }
+
+ public static ReturnType acquireLockWithLease(String key, String lockId, long leasePeriod) {
+ try {
+ long start = System.currentTimeMillis();
+ /* check if the current lock has exceeded its lease and if yes, release that lock */
+ MusicLockState mls = getMusicLockState(key);
+ if (mls != null) {
+ if (mls.getLockStatus().equals(LockStatus.LOCKED)) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"The current lock holder for " + key + " is " + mls.getLockHolder()
+ + ". Checking if it has exceeded lease");
+ long currentLockPeriod = System.currentTimeMillis() - mls.getLeaseStartTime();
+ long currentLeasePeriod = mls.getLeasePeriod();
+ if (currentLockPeriod > currentLeasePeriod) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"Lock period " + currentLockPeriod
+ + " has exceeded lease period " + currentLeasePeriod);
+ boolean voluntaryRelease = false;
+ String currentLockHolder = mls.getLockHolder();
+ mls = releaseLock(currentLockHolder, voluntaryRelease);
+ }
+ }
+ } else
+ logger.error(EELFLoggerDelegate.errorLogger,key, AppMessages.INVALIDLOCK,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+
+ /*
+ * call the traditional acquire lock now and if the result returned is true, set the
+ * begin time-stamp and lease period
+ */
+ if (acquireLock(key, lockId).getResult() == ResultType.SUCCESS) {
+ mls = getMusicLockState(key);// get latest state
+ if ( mls == null ) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"Music Lock State is null");
+ return new ReturnType(ResultType.FAILURE, "Could not acquire lock, Lock State is null");
+ }
+ if (mls.getLeaseStartTime() == -1) {// set it again only if it is not set already
+ mls.setLeaseStartTime(System.currentTimeMillis());
+ mls.setLeasePeriod(leasePeriod);
+ getLockingServiceHandle().setLockState(key, mls);
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire leased lock:" + (end - start) + " ms");
+ return new ReturnType(ResultType.SUCCESS, "Accquired lock");
+ } else {
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to fail to acquire leased lock:" + (end - start) + " ms");
+ return new ReturnType(ResultType.FAILURE, "Could not acquire lock");
+ }
+ } catch (Exception e) {
+ StringWriter sw = new StringWriter();
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+
+ String exceptionAsString = sw.toString();
+ return new ReturnType(ResultType.FAILURE,
+ "Exception thrown in acquireLockWithLease:\n" + exceptionAsString);
+ }
+ }
+
+ public static ReturnType acquireLock(String key, String lockId) throws MusicLockingException {
+ /*
+ * first check if I am on top. Since ids are not reusable there is no need to check
+ * lockStatus If the status is unlocked, then the above call will automatically return
+ * false.
+ */
+ Boolean result = false;
+ try {
+ result = getLockingServiceHandle().isMyTurn(lockId);
+ } catch (MusicLockingException e2) {
+ logger.error(EELFLoggerDelegate.errorLogger,AppMessages.INVALIDLOCK + lockId + " " + e2);
+ throw new MusicLockingException();
+ }
+ if (!result) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Not your turn, someone else has the lock");
+ try {
+ if (!getLockingServiceHandle().lockIdExists(lockId)) {
+ logger.info(EELFLoggerDelegate.applicationLogger, "In acquire lock: this lockId doesn't exist");
+ return new ReturnType(ResultType.FAILURE, "Lockid doesn't exist");
+ }
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.INVALIDLOCK+lockId,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ throw new MusicLockingException();
+ }
+ logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: returning failure");
+ return new ReturnType(ResultType.FAILURE, "Not your turn, someone else has the lock");
+ }
+
+
+ // this is for backward compatibility where locks could also be acquired on just
+ // keyspaces or tables.
+ if (isTableOrKeySpaceLock(key)) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: A table or keyspace lock so no need to perform sync...so returning true");
+ return new ReturnType(ResultType.SUCCESS, "A table or keyspace lock so no need to perform sync...so returning true");
+ }
+
+ // read the lock name corresponding to the key and if the status is locked or being locked,
+ // then return false
+ MusicLockState currentMls = null;
+ MusicLockState newMls = null;
+ try {
+ currentMls = getMusicLockState(key);
+ String currentLockHolder = currentMls.getLockHolder();
+ if (lockId.equals(currentLockHolder)) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: You already have the lock!");
+ return new ReturnType(ResultType.SUCCESS, "You already have the lock!");
+ }
+ } catch (NullPointerException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.INVALIDLOCK+lockId,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+
+ // change status to "being locked". This state transition is necessary to ensure syncing
+ // before granting the lock
+ String lockHolder = null;
+ boolean needToSyncQuorum = false;
+ if (currentMls != null)
+ needToSyncQuorum = currentMls.isNeedToSyncQuorum();
+
+
+ newMls = new MusicLockState(MusicLockState.LockStatus.BEING_LOCKED, lockHolder,
+ needToSyncQuorum);
+ try {
+ getLockingServiceHandle().setLockState(key, newMls);
+ } catch (MusicLockingException e1) {
+ logger.error(EELFLoggerDelegate.errorLogger,e1.getMessage(), AppMessages.LOCKSTATE+key,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Set lock state to being_locked");
+
+ // do syncing if this was a forced lock release
+ if (needToSyncQuorum) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Since there was a forcible release, need to sync quorum!");
+ try {
+ syncQuorum(key);
+ } catch (Exception e) {
+ logger.error(EELFLoggerDelegate.errorLogger,"Failed to set Lock state " + e);
+ }
+ }
+
+ // change status to locked
+ lockHolder = lockId;
+ needToSyncQuorum = false;
+ newMls = new MusicLockState(MusicLockState.LockStatus.LOCKED, lockHolder, needToSyncQuorum);
+ try {
+ getLockingServiceHandle().setLockState(key, newMls);
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKSTATE+key,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Set lock state to locked and assigned current lock ref "
+ + lockId + " as holder");
+
+ return new ReturnType(result?ResultType.SUCCESS:ResultType.FAILURE, "Set lock state to locked and assigned a lock holder");
+ }
+
+
+
+ /**
+ *
+ * @param keyspaceName
+ * @param kspObject
+ * @return
+ * @throws Exception
+ */
+ public boolean createKeyspace(String keyspaceName, JsonKeySpace kspObject) throws Exception {
+ return true;
+ }
+
+
+ private static void syncQuorum(String key) throws Exception {
+ logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
+ String[] splitString = key.split("\\.");
+ String keyspaceName = splitString[0];
+ String tableName = splitString[1];
+ String primaryKeyValue = splitString[2];
+ PreparedQueryObject selectQuery = new PreparedQueryObject();
+ PreparedQueryObject updateQuery = new PreparedQueryObject();
+
+ // get the primary key d
+ TableMetadata tableInfo = returnColumnMetadata(keyspaceName, tableName);
+ String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName();// we only support single
+ // primary key
+ DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
+ Object cqlFormattedPrimaryKeyValue =
+ MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
+
+ // get the row of data from a quorum
+ selectQuery.appendQueryString("SELECT * FROM " + keyspaceName + "." + tableName + " WHERE "
+ + primaryKeyName + "= ?" + ";");
+ selectQuery.addValue(cqlFormattedPrimaryKeyValue);
+ ResultSet results = null;
+ try {
+ results = getDSHandle().executeCriticalGet(selectQuery);
+ // write it back to a quorum
+ Row row = results.one();
+ ColumnDefinitions colInfo = row.getColumnDefinitions();
+ int totalColumns = colInfo.size();
+ int counter = 1;
+ StringBuilder fieldValueString = new StringBuilder("");
+ for (Definition definition : colInfo) {
+ String colName = definition.getName();
+ if (colName.equals(primaryKeyName))
+ continue;
+ DataType colType = definition.getType();
+ Object valueObj = getDSHandle().getColValue(row, colName, colType);
+ Object valueString = MusicUtil.convertToActualDataType(colType, valueObj);
+ fieldValueString.append(colName + " = ?");
+ updateQuery.addValue(valueString);
+ if (counter != (totalColumns - 1))
+ fieldValueString.append(",");
+ counter = counter + 1;
+ }
+ updateQuery.appendQueryString("UPDATE " + keyspaceName + "." + tableName + " SET "
+ + fieldValueString + " WHERE " + primaryKeyName + "= ? " + ";");
+ updateQuery.addValue(cqlFormattedPrimaryKeyValue);
+
+ getDSHandle().executePut(updateQuery, "critical");
+ } catch (MusicServiceException | MusicQueryException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.QUERYERROR +""+updateQuery ,ErrorSeverity.MAJOR, ErrorTypes.QUERYERROR);
+ }
+ }
+
+
+
+
+ /**
+ *
+ * @param query
+ * @return ResultSet
+ */
+ public static ResultSet quorumGet(PreparedQueryObject query) {
+ ResultSet results = null;
+ try {
+ results = getDSHandle().executeCriticalGet(query);
+ } catch (MusicServiceException | MusicQueryException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR);
+
+ }
+ return results;
+
+ }
+
+ /**
+ *
+ * @param results
+ * @return
+ * @throws MusicServiceException
+ */
+ public static Map<String, HashMap<String, Object>> marshallResults(ResultSet results) throws MusicServiceException {
+ return getDSHandle().marshalData(results);
+ }
+
+ /**
+ *
+ * @param lockName
+ * @return
+ */
+ public static String whoseTurnIsIt(String lockName) {
+
+ try {
+ return getLockingServiceHandle().whoseTurnIsIt("/" + lockName) + "";
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+lockName ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ return null;
+
+
+ }
+
+ /**
+ *
+ * @param lockId
+ * @return
+ */
+ public static String getLockNameFromId(String lockId) {
+ StringTokenizer st = new StringTokenizer(lockId);
+ return st.nextToken("$");
+ }
+
+ public static void destroyLockRef(String lockId) {
+ long start = System.currentTimeMillis();
+ try {
+ getLockingServiceHandle().unlockAndDeleteId(lockId);
+ } catch (MusicLockingException | NoNodeException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockId ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
+ }
+
+ public static MusicLockState releaseLock(String lockId, boolean voluntaryRelease) {
+ long start = System.currentTimeMillis();
+ try {
+ getLockingServiceHandle().unlockAndDeleteId(lockId);
+ } catch (MusicLockingException e1) {
+ logger.error(EELFLoggerDelegate.errorLogger,e1.getMessage(), AppMessages.RELEASELOCK+lockId ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ } catch (KeeperException.NoNodeException nne) {
+ logger.error(EELFLoggerDelegate.errorLogger,"Failed to release Lock " + lockId + " " + nne);
+ MusicLockState mls = new MusicLockState("Lock doesn't exists. Release lock operation failed.");
+ return mls;
+ }
+ String lockName = getLockNameFromId(lockId);
+ MusicLockState mls;
+ String lockHolder = null;
+ if (voluntaryRelease) {
+ mls = new MusicLockState(MusicLockState.LockStatus.UNLOCKED, lockHolder);
+ logger.info(EELFLoggerDelegate.applicationLogger,"In unlock: lock voluntarily released for " + lockId);
+ } else {
+ boolean needToSyncQuorum = true;
+ mls = new MusicLockState(MusicLockState.LockStatus.UNLOCKED, lockHolder,
+ needToSyncQuorum);
+ logger.info(EELFLoggerDelegate.applicationLogger,"In unlock: lock forcibly released for " + lockId);
+ }
+ try {
+ getLockingServiceHandle().setLockState(lockName, mls);
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.RELEASELOCK+lockId ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to release lock:" + (end - start) + " ms");
+ return mls;
+ }
+
+ public static void voluntaryReleaseLock(String lockId) throws MusicLockingException{
+ try {
+ getLockingServiceHandle().unlockAndDeleteId(lockId);
+ } catch (KeeperException.NoNodeException e) {
+ // ??? No way
+ }
+ }
+
+ /**
+ *
+ * @param lockName
+ * @throws MusicLockingException
+ */
+ public static void deleteLock(String lockName) throws MusicLockingException {
+ long start = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Deleting lock for " + lockName);
+ try {
+ getLockingServiceHandle().deleteLock("/" + lockName);
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DELTELOCK+lockName ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ throw new MusicLockingException(e.getMessage());
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to delete lock:" + (end - start) + " ms");
+ }
+
+
+
+ /**
+ *
+ * @param keyspace
+ * @param tablename
+ * @return
+ * @throws MusicServiceException
+ */
+ public static TableMetadata returnColumnMetadata(String keyspace, String tablename) throws MusicServiceException {
+ return getDSHandle().returnColumnMetadata(keyspace, tablename);
+ }
+
+
+ /**
+ *
+ * @param nodeName
+ */
+ public static void pureZkCreate(String nodeName) {
+ try {
+ getLockingServiceHandle().getzkLockHandle().createNode(nodeName);
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ }
+
+ /**
+ *
+ * @param nodeName
+ * @param data
+ */
+ public static void pureZkWrite(String nodeName, byte[] data) {
+ long start = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Performing zookeeper write to " + nodeName);
+ try {
+ getLockingServiceHandle().getzkLockHandle().setNodeData(nodeName, data);
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ logger.info(EELFLoggerDelegate.applicationLogger,"Performed zookeeper write to " + nodeName);
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the actual zk put:" + (end - start) + " ms");
+ }
+
+ /**
+ *
+ * @param nodeName
+ * @return
+ */
+ public static byte[] pureZkRead(String nodeName) {
+ long start = System.currentTimeMillis();
+ byte[] data = null;
+ try {
+ data = getLockingServiceHandle().getzkLockHandle().getNodeData(nodeName);
+ } catch (MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
+ }
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the actual zk put:" + (end - start) + " ms");
+ return data;
+ }
+
+
+
+ // Prepared Query Additions.
+
+ /**
+ *
+ * @param keyspaceName
+ * @param tableName
+ * @param primaryKey
+ * @param queryObject
+ * @return ReturnType
+ * @throws MusicServiceException
+ */
+ public static ReturnType eventualPut(PreparedQueryObject queryObject) {
+ boolean result = false;
+ try {
+ result = getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
+ } catch (MusicServiceException | MusicQueryException ex) {
+ logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
+ logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
+ return new ReturnType(ResultType.FAILURE, ex.getMessage());
+ }
+ if (result) {
+ return new ReturnType(ResultType.SUCCESS, "Success");
+ } else {
+ return new ReturnType(ResultType.FAILURE, "Failure");
+ }
+ }
+
+ /**
+ *
+ * @param keyspaceName
+ * @param tableName
+ * @param primaryKey
+ * @param queryObject
+ * @param lockId
+ * @return
+ */
+ public static ReturnType criticalPut(String keyspaceName, String tableName, String primaryKey,
+ PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
+ long start = System.currentTimeMillis();
+
+ try {
+ MusicLockState mls = getLockingServiceHandle()
+ .getLockState(keyspaceName + "." + tableName + "." + primaryKey);
+ if (mls.getLockHolder().equals(lockId) == true) {
+ if (conditionInfo != null)
+ try {
+ if (conditionInfo.testCondition() == false)
+ return new ReturnType(ResultType.FAILURE,
+ "Lock acquired but the condition is not true");
+ } catch (Exception e) {
+ return new ReturnType(ResultType.FAILURE,
+ "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
+ + e.getMessage());
+ }
+ getDSHandle().executePut(queryObject, MusicUtil.CRITICAL);
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
+ return new ReturnType(ResultType.SUCCESS, "Update performed");
+ } else
+ return new ReturnType(ResultType.FAILURE,
+ "Cannot perform operation since you are the not the lock holder");
+ } catch (MusicQueryException | MusicServiceException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
+ return new ReturnType(ResultType.FAILURE,
+ "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
+ + e.getMessage());
+ }catch(MusicLockingException ex){
+ return new ReturnType(ResultType.FAILURE,ex.getMessage());
+ }
+
+ }
+
+ /**
+ *
+ * @param queryObject
+ * @param consistency
+ * @return Boolean Indicates success or failure
+ * @throws MusicServiceException
+ *
+ *
+ */
+ public static ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException {
+ // this is mainly for some functions like keyspace creation etc which does not
+ // really need the bells and whistles of Music locking.
+ boolean result = false;
+ try {
+ result = getDSHandle().executePut(queryObject, consistency);
+ } catch (MusicQueryException | MusicServiceException ex) {
+ logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
+ throw new MusicServiceException(ex.getMessage());
+ }
+ return result?ResultType.SUCCESS:ResultType.FAILURE;
+ }
+
+ /**
+ * This method performs DDL operation on cassandra.
+ *
+ * @param queryObject query object containing prepared query and values
+ * @return ResultSet
+ * @throws MusicServiceException
+ */
+ public static ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
+ ResultSet results = null;
+ try {
+ results = getDSHandle().executeEventualGet(queryObject);
+ } catch (MusicQueryException | MusicServiceException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
+ throw new MusicServiceException(e.getMessage());
+ }
+ return results;
+ }
+
+ /**
+ * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
+ * is used to check if the resource is free.
+ *
+ * @param keyspaceName name of the keyspace
+ * @param tableName name of the table
+ * @param primaryKey primary key value
+ * @param queryObject query object containing prepared query and values
+ * @param lockId lock ID to check if the resource is free to perform the operation.
+ * @return ResultSet
+ */
+ public static ResultSet criticalGet(String keyspaceName, String tableName, String primaryKey,
+ PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
+ ResultSet results = null;
+ try {
+ MusicLockState mls = getLockingServiceHandle()
+ .getLockState(keyspaceName + "." + tableName + "." + primaryKey);
+ if (mls.getLockHolder().equals(lockId)) {
+ results = getDSHandle().executeCriticalGet(queryObject);
+ } else
+ throw new MusicServiceException("YOU DO NOT HAVE THE LOCK");
+ } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
+ }
+ return results;
+ }
+
+ /**
+ * This method performs DML operation on cassandra, when the lock of the dd is acquired.
+ *
+ * @param keyspaceName name of the keyspace
+ * @param tableName name of the table
+ * @param primaryKey primary key value
+ * @param queryObject query object containing prepared query and values
+ * @return ReturnType
+ * @throws MusicLockingException
+ */
+ public static ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
+ PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
+
+ long start = System.currentTimeMillis();
+ String key = keyspaceName + "." + tableName + "." + primaryKey;
+ String lockId = createLockReference(key);
+ long lockCreationTime = System.currentTimeMillis();
+ ReturnType lockAcqResult = acquireLock(key, lockId);
+ long lockAcqTime = System.currentTimeMillis();
+ if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
+ ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
+ queryObject, lockId, conditionInfo);
+ long criticalPutTime = System.currentTimeMillis();
+ voluntaryReleaseLock(lockId);
+ long lockDeleteTime = System.currentTimeMillis();
+ String timingInfo = "|lock creation time:" + (lockCreationTime - start)
+ + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
+ + "|critical put time:" + (criticalPutTime - lockAcqTime)
+ + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
+ criticalPutResult.setTimingInfo(timingInfo);
+ return criticalPutResult;
+ } else {
+ logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
+ destroyLockRef(lockId);
+ return lockAcqResult;
+ }
+ }
+
+ /**
+ * this function is mainly for the benchmarks to see the effect of lock deletion.
+ *
+ * @param keyspaceName
+ * @param tableName
+ * @param primaryKey
+ * @param queryObject
+ * @param conditionInfo
+ * @return
+ * @throws MusicLockingException
+ */
+ public static ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName,
+ String primaryKey, PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
+
+ long start = System.currentTimeMillis();
+ String key = keyspaceName + "." + tableName + "." + primaryKey;
+ String lockId = createLockReference(key);
+ long lockCreationTime = System.currentTimeMillis();
+ long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
+ ReturnType lockAcqResult = acquireLock(key, lockId);
+ long lockAcqTime = System.currentTimeMillis();
+ if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
+ ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
+ queryObject, lockId, conditionInfo);
+ long criticalPutTime = System.currentTimeMillis();
+ deleteLock(key);
+ long lockDeleteTime = System.currentTimeMillis();
+ String timingInfo = "|lock creation time:" + (lockCreationTime - start)
+ + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
+ + "|critical put time:" + (criticalPutTime - lockAcqTime)
+ + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
+ criticalPutResult.setTimingInfo(timingInfo);
+ return criticalPutResult;
+ } else {
+ logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
+ deleteLock(key);
+ return lockAcqResult;
+ }
+ }
+
+
+
+
+ /**
+ * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
+ *
+ * @param keyspaceName name of the keyspace
+ * @param tableName name of the table
+ * @param primaryKey primary key value
+ * @param queryObject query object containing prepared query and values
+ * @return ResultSet
+ * @throws MusicServiceException
+ * @throws MusicLockingException
+ */
+ public static ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
+ PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {
+ String key = keyspaceName + "." + tableName + "." + primaryKey;
+ String lockId = createLockReference(key);
+ long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
+ ReturnType lockAcqResult = acquireLock(key, lockId);
+ if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
+ ResultSet result =
+ criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockId);
+ voluntaryReleaseLock(lockId);
+ return result;
+ } else {
+ destroyLockRef(lockId);
+ logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
+ return null;
+ }
+ }
+
+ public static ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
+ PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {
+ String key = keyspaceName + "." + tableName + "." + primaryKey;
+ String lockId = createLockReference(key);
+ long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
+
+ ReturnType lockAcqResult = acquireLock(key, lockId);
+
+ if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+ logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
+ ResultSet result = criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockId);
+ deleteLock(key);
+ return result;
+ } else {
+ deleteLock(key);
+ logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
+ return null;
+ }
+ }
+
+
+
+ /**
+ * authenticate user logic
+ *
+ * @param nameSpace
+ * @param userId
+ * @param password
+ * @param keyspace
+ * @param aid
+ * @param operation
+ * @return
+ * @throws Exception
+ */
+ public static Map<String, Object> autheticateUser(String nameSpace, String userId,
+ String password, String keyspace, String aid, String operation)
+ throws Exception {
+ Map<String, Object> resultMap = new HashMap<>();
+ String uuid = null;
+ resultMap = CachingUtil.validateRequest(nameSpace, userId, password, keyspace, aid,
+ operation);
+ if (!resultMap.isEmpty())
+ return resultMap;
+ String isAAFApp = null;
+ try {
+ isAAFApp= CachingUtil.isAAFApplication(nameSpace);
+ } catch(MusicServiceException e) {
+ resultMap.put("Exception", e.getMessage());
+ return resultMap;
+ }
+ if(isAAFApp == null) {
+ resultMap.put("Exception", "Namespace: "+nameSpace+" doesn't exist. Please make sure ns(appName)"
+ + " is correct and Application is onboarded.");
+ return resultMap;
+ }
+ boolean isAAF = Boolean.valueOf(isAAFApp);
+ if (userId == null || password == null) {
+ logger.error(EELFLoggerDelegate.errorLogger,"", AppMessages.MISSINGINFO ,ErrorSeverity.WARN, ErrorTypes.AUTHENTICATIONERROR);
+ logger.error(EELFLoggerDelegate.errorLogger,"One or more required headers is missing. userId: " + userId
+ + " :: password: " + password);
+ resultMap.put("Exception",
+ "UserId and Password are mandatory for the operation " + operation);
+ return resultMap;
+ }
+ if(!isAAF && !(operation.equals("createKeySpace"))) {
+ resultMap = CachingUtil.authenticateAIDUser(nameSpace, userId, password, keyspace);
+ if (!resultMap.isEmpty())
+ return resultMap;
+
+ }
+ if (isAAF && nameSpace != null && userId != null && password != null) {
+ boolean isValid = true;
+ try {
+ isValid = CachingUtil.authenticateAAFUser(nameSpace, userId, password, keyspace);
+ } catch (Exception e) {
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.AUTHENTICATIONERROR ,ErrorSeverity.WARN, ErrorTypes.AUTHENTICATIONERROR);
+ logger.error(EELFLoggerDelegate.errorLogger,"Got exception while AAF authentication for namespace " + nameSpace);
+ resultMap.put("Exception", e.getMessage());
+ }
+ if (!isValid) {
+ logger.error(EELFLoggerDelegate.errorLogger,"", AppMessages.AUTHENTICATIONERROR ,ErrorSeverity.WARN, ErrorTypes.AUTHENTICATIONERROR);
+ resultMap.put("Exception", "User not authenticated...");
+ }
+ if (!resultMap.isEmpty())
+ return resultMap;
+
+ }
+
+ if (operation.equals("createKeySpace")) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"AID is not provided. Creating new UUID for keyspace.");
+ PreparedQueryObject pQuery = new PreparedQueryObject();
+ pQuery.appendQueryString(
+ "select uuid from admin.keyspace_master where application_name=? and username=? and keyspace_name=? allow filtering");
+ pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), nameSpace));
+ pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), userId));
+ pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(),
+ MusicUtil.DEFAULTKEYSPACENAME));
+
+ try {
+ Row rs = MusicCore.get(pQuery).one();
+ uuid = rs.getUUID("uuid").toString();
+ resultMap.put("uuid", "existing");
+ } catch (Exception e) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"No UUID found in DB. So creating new UUID.");
+ uuid = CachingUtil.generateUUID();
+ resultMap.put("uuid", "new");
+ }
+ resultMap.put("aid", uuid);
+ }
+
+ return resultMap;
+ }
+
+ /**
+ * @param lockName
+ * @return
+ */
+ public static Map<String, Object> validateLock(String lockName) {
+ Map<String, Object> resultMap = new HashMap<>();
+ String[] locks = lockName.split("\\.");
+ if(locks.length < 3) {
+ resultMap.put("Exception", "Invalid lock. Please make sure lock is of the type keyspaceName.tableName.primaryKey");
+ return resultMap;
+ }
+ String keyspace= locks[0];
+ if(keyspace.startsWith("$"))
+ keyspace = keyspace.substring(1);
+ resultMap.put("keyspace",keyspace);
+ return resultMap;
+ }
+}