diff options
Diffstat (limited to 'src/main/java/org')
19 files changed, 373 insertions, 539 deletions
diff --git a/src/main/java/org/onap/music/mdbc/DatabaseOperations.java b/src/main/java/org/onap/music/mdbc/DatabaseOperations.java index b9412b7..c384199 100644 --- a/src/main/java/org/onap/music/mdbc/DatabaseOperations.java +++ b/src/main/java/org/onap/music/mdbc/DatabaseOperations.java @@ -1,5 +1,8 @@ package org.onap.music.mdbc; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.TupleValue; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.datastore.PreparedQueryObject; @@ -9,262 +12,128 @@ import org.onap.music.exceptions.MusicServiceException; import org.onap.music.main.MusicCore; import org.onap.music.main.ResultType; import org.onap.music.main.ReturnType; +import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.mdbc.tables.MusicTxDigestId; +import org.onap.music.mdbc.tables.PartitionInformation; +import org.onap.music.mdbc.tables.StagingTable; +import java.io.IOException; import java.util.*; +import com.datastax.driver.core.utils.UUIDs; + public class DatabaseOperations { private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(DatabaseOperations.class); /** * This functions is used to generate cassandra uuid * @return a random UUID that can be used for fields of type uuid */ - public static String generateUniqueKey() { - return UUID.randomUUID().toString(); + public static UUID generateUniqueKey() { + return UUIDs.random(); } - /** - * This functions returns the primary key used to managed a specific row in the TableToPartition tables in Music - * @param namespace namespace where the TableToPartition resides - * @param tableToPartitionTableName name of the tableToPartition table - * @param tableName name of the application table that is being added to the system - * @return primary key to be used with MUSIC - */ - public static String getTableToPartitionPrimaryKey(String namespace, String tableToPartitionTableName, String tableName){ - return namespace+"."+tableToPartitionTableName+"."+tableName; + public static void createMusicTxDigest(String musicNamespace, String musicTxDigestTableName) + throws MDBCServiceException { + createMusicTxDigest(musicNamespace,musicTxDigestTableName,-1); } /** - * Create a new row for a table, with not assigned partition - * @param namespace namespace where the TableToPartition resides - * @param tableToPartitionTableName name of the tableToPartition table - * @param tableName name of the application table that is being added to the system - * @param lockId if the lock for this key is already hold, this is the id of that lock. - * May be <code>null</code> if lock is not hold for the corresponding key - */ - public static void createNewTableToPartitionRow(String namespace, String tableToPartitionTableName, - String tableName,String lockId) throws MDBCServiceException { - final String primaryKey = getTableToPartitionPrimaryKey(namespace,tableToPartitionTableName,tableName); - StringBuilder insert = new StringBuilder("INSERT INTO ") - .append(namespace) - .append('.') - .append(tableToPartitionTableName) - .append(" (tablename) VALUES ") - .append("('") - .append(tableName) - .append("');"); - PreparedQueryObject query = new PreparedQueryObject(); - query.appendQueryString(insert.toString()); - try { - executedLockedPut(namespace,tableToPartitionTableName,tableName,query,lockId,null); - } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to create new row table to partition table "); - throw new MDBCServiceException("Initialization error: Failure to create new row table to partition table"); - } - } - - /** - * Update the partition to which a table belongs - * @param namespace namespace where the TableToPartition resides - * @param tableToPartitionTableName name of the tableToPartition table - * @param table name of the application table that is being added to the system - * @param newPartition partition to which the application table is assigned - * @param lockId if the lock for this key is already hold, this is the id of that lock. - * May be <code>null</code> if lock is not hold for the corresponding key - */ - public static void updateTableToPartition(String namespace, String tableToPartitionTableName, - String table, String newPartition, String lockId) throws MDBCServiceException { - final String primaryKey = getTableToPartitionPrimaryKey(namespace,tableToPartitionTableName,table); - PreparedQueryObject query = new PreparedQueryObject(); - StringBuilder update = new StringBuilder("UPDATE ") - .append(namespace) - .append('.') - .append(tableToPartitionTableName) - .append(" SET previouspartitions = previouspartitions + {") - .append(newPartition) - .append("}, partition = " ) - .append(newPartition) - .append(" WHERE tablename = '") - .append(table) - .append("';"); - query.appendQueryString(update.toString()); - try { - executedLockedPut(namespace,tableToPartitionTableName,table,query,lockId,null); - } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to update a row in table to partition table "); - throw new MDBCServiceException("Initialization error: Failure to update a row in table to partition table"); - } - } - - - public static String getPartitionInformationPrimaryKey(String namespace, String partitionInformationTable, String partition){ - return namespace+"."+partitionInformationTable+"."+partition; - } - - /** - * Create a new row, when a new partition is initialized - * @param namespace namespace to which the partition info table resides in Cassandra - * @param partitionInfoTableName name of the partition information table - * @param replicationFactor associated replicated factor for the partition (max of all the tables) - * @param tables list of tables that are within this partitoin - * @param lockId if the lock for this key is already hold, this is the id of that lock. May be <code>null</code> if lock is not hold for the corresponding key - * @return the partition uuid associated to the new row + * This function creates the MusicTxDigest table. It contain information related to each transaction committed + * * LeaseId: id associated with the lease, text + * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later + * * TransactionDigest: text that contains all the changes in the transaction */ - public static String createPartitionInfoRow(String namespace, String partitionInfoTableName, - int replicationFactor, List<String> tables, String lockId) throws MDBCServiceException { - String id = generateUniqueKey(); - final String primaryKey = getPartitionInformationPrimaryKey(namespace,partitionInfoTableName,id); - StringBuilder insert = new StringBuilder("INSERT INTO ") - .append(namespace) - .append('.') - .append(partitionInfoTableName) - .append(" (partition,replicationfactor,tables) VALUES ") - .append("(") - .append(id) - .append(",") - .append(replicationFactor) - .append(",{"); - boolean first = true; - for(String table: tables){ - if(!first){ - insert.append(","); - } - first = false; - insert.append("'") - .append(table) - .append("'"); + public static void createMusicTxDigest(String musicNamespace, String musicTxDigestTableName, + int musicTxDigestTableNumber) throws MDBCServiceException { + String tableName = musicTxDigestTableName; + if(musicTxDigestTableNumber >= 0) { + tableName = tableName + + "-" + + Integer.toString(musicTxDigestTableNumber); } - insert.append("});"); - PreparedQueryObject query = new PreparedQueryObject(); - query.appendQueryString(insert.toString()); + String priKey = "txid"; + StringBuilder fields = new StringBuilder(); + fields.append("txid uuid, "); + fields.append("transactiondigest text ");//notice lack of ',' + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey); try { - executedLockedPut(namespace,partitionInfoTableName,id,query,lockId,null); + executeMusicWriteQuery(musicNamespace,tableName,cql); } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to create new row in partition information table "); - throw new MDBCServiceException("Initialization error: Failure to create new row in partition information table"); + logger.error("Initialization error: Failure to create redo records table"); + throw(e); } - return id; } /** - * Update the TIT row and table that currently handles the partition - * @param namespace namespace to which the partition info table resides in Cassandra - * @param partitionInfoTableName name of the partition information table - * @param partitionId row identifier for the partition being modiefd - * @param newTitRow new TIT row and table that are handling this partition - * @param owner owner that is handling the new tit row (url to the corresponding etdb nodej - * @param lockId if the lock for this key is already hold, this is the id of that lock. May be <code>null</code> if lock is not hold for the corresponding key + * This function creates the TransactionInformation table. It contain information related + * to the transactions happening in a given partition. + * * The schema of the table is + * * Id, uiid. + * * Partition, uuid id of the partition + * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables + * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables + * * Redo: list of uiids associated to the Redo Records Table + * */ - public static void updateRedoRow(String namespace, String partitionInfoTableName, String partitionId, - RedoRow newTitRow, String owner, String lockId) throws MDBCServiceException { - final String primaryKey = getTableToPartitionPrimaryKey(namespace,partitionInfoTableName,partitionId); - PreparedQueryObject query = new PreparedQueryObject(); - String newOwner = (owner==null)?"":owner; - StringBuilder update = new StringBuilder("UPDATE ") - .append(namespace) - .append('.') - .append(partitionInfoTableName) - .append(" SET currentowner='") - .append(newOwner) - .append("', latesttitindex=") - .append(newTitRow.getRedoRowIndex()) - .append(", latesttittable='") - .append(newTitRow.getRedoTableName()) - .append("' WHERE partition = ") - .append(partitionId) - .append(";"); - query.appendQueryString(update.toString()); + public static void createMusicRangeInformationTable(String musicNamespace, String musicRangeInformationTableName) throws MDBCServiceException { + String tableName = musicRangeInformationTableName; + String priKey = "rangeid"; + StringBuilder fields = new StringBuilder(); + fields.append("rangeid uuid, "); + fields.append("keys set<text>, "); + fields.append("ownerid text, "); + fields.append("metricprocessid text, "); + //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly + fields.append("txredolog list<frozen<tuple<text,uuid>>> "); + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey); try { - executedLockedPut(namespace,partitionInfoTableName,partitionId,query,lockId,null); + executeMusicWriteQuery(musicNamespace,tableName,cql); } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to add new owner to partition in music table "); - throw new MDBCServiceException("Initialization error:Failure to add new owner to partition in music table "); + logger.error("Initialization error: Failure to create transaction information table"); + throw(e); } } /** - * Create the first row in the history of the redo history table for a given partition - * @param namespace namespace to which the redo history table resides in Cassandra - * @param redoHistoryTableName name of the table where the row is being created - * @param firstTitRow first tit associated to the partition - * @param partitionId partition for which a history is created + * Creates a new empty tit row + * @param namespace namespace where the tit table is located + * @param mriTableName name of the corresponding mri table where the new row is added + * @param processId id of the process that is going to own initially this. + * @return uuid associated to the new row */ - public static void createRedoHistoryBeginRow(String namespace, String redoHistoryTableName, - RedoRow firstTitRow, String partitionId, String lockId) throws MDBCServiceException { - createRedoHistoryRow(namespace,redoHistoryTableName,firstTitRow,partitionId, new ArrayList<>(),lockId); + public static UUID createEmptyMriRow(String namespace, String mriTableName, + String processId, String lockId, List<Range> ranges) throws MDBCServiceException { + UUID id = generateUniqueKey(); + return createEmptyMriRow(namespace,mriTableName,id,processId,lockId,ranges); } - /** - * Create a new row on the history for a given partition - * @param namespace namespace to which the redo history table resides in Cassandra - * @param redoHistoryTableName name of the table where the row is being created - * @param currentRow new tit row associated to the partition - * @param partitionId partition for which a history is created - * @param parentsRows parent tit rows associated to this partition - */ - public static void createRedoHistoryRow(String namespace, String redoHistoryTableName, - RedoRow currentRow, String partitionId, List<RedoRow> parentsRows, String lockId) throws MDBCServiceException { - final String primaryKey = partitionId+"-"+currentRow.getRedoTableName()+"-"+currentRow.getRedoRowIndex(); + public static UUID createEmptyMriRow(String namespace, String mriTableName, UUID id, String processId, String lockId, + List<Range> ranges) throws MDBCServiceException{ StringBuilder insert = new StringBuilder("INSERT INTO ") .append(namespace) .append('.') - .append(redoHistoryTableName) - .append(" (partition,redotable,redoindex,previousredo) VALUES ") + .append(mriTableName) + .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ") .append("(") - .append(partitionId) - .append(",'") - .append(currentRow.getRedoTableName()) - .append("',") - .append(currentRow.getRedoRowIndex()) + .append(id) .append(",{"); - boolean first = true; - for(RedoRow parent: parentsRows){ - if(!first){ - insert.append(","); - } - else{ - first = false; + boolean first=true; + for(Range r: ranges){ + if(first){ first=false; } + else { + insert.append(','); } - insert.append("('") - .append(parent.getRedoTableName()) - .append("',") - .append(parent.getRedoRowIndex()) - .append("),"); + insert.append("'").append(r.toString()).append("'"); } - insert.append("});"); + insert.append("},'") + .append((lockId==null)?"":lockId) + .append("','") + .append(processId) + .append("',[]);"); PreparedQueryObject query = new PreparedQueryObject(); query.appendQueryString(insert.toString()); try { - executedLockedPut(namespace,redoHistoryTableName,primaryKey,query,lockId,null); - } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to add new row to redo history"); - throw new MDBCServiceException("Initialization error:Failure to add new row to redo history"); - } - } - - /** - * Creates a new empty tit row - * @param namespace namespace where the tit table is located - * @param titTableName name of the corresponding tit table where the new row is added - * @param partitionId partition to which the redo log is hold - * @return uuid associated to the new row - */ - public static String CreateEmptyTitRow(String namespace, String titTableName, - String partitionId, String lockId) throws MDBCServiceException { - String id = generateUniqueKey(); - StringBuilder insert = new StringBuilder("INSERT INTO ") - .append(namespace) - .append('.') - .append(titTableName) - .append(" (id,applied,latestapplied,partition,redo) VALUES ") - .append("(") - .append(id) - .append(",false,-1,") - .append(partitionId) - .append(",[]);"); - PreparedQueryObject query = new PreparedQueryObject(); - query.appendQueryString(insert.toString()); - try { - executedLockedPut(namespace,titTableName,id,query,lockId,null); + executeLockedPut(namespace,mriTableName,id.toString(),query,lockId,null); } catch (MDBCServiceException e) { logger.error("Initialization error: Failure to add new row to transaction information"); throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); @@ -272,66 +141,62 @@ public class DatabaseOperations { return id; } - /** - * This function creates the Table To Partition table. It contain information related to - */ - public static void CreateTableToPartitionTable(String musicNamespace, String tableToPartitionTableName) - throws MDBCServiceException { - String tableName = tableToPartitionTableName; - String priKey = "tablename"; - StringBuilder fields = new StringBuilder(); - fields.append("tablename text, "); - fields.append("partition uuid, "); - fields.append("previouspartitions set<uuid> "); - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", - musicNamespace, tableName, fields, priKey); + public static MusicRangeInformationRow getMriRow(String namespace, String mriTableName, UUID id, String lockId) + throws MDBCServiceException{ + String cql = String.format("SELECT * FROM %s.%s WHERE rangeid = ?;", namespace, mriTableName); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + pQueryObject.addValue(id); + Row newRow; try { - executeMusicWriteQuery(musicNamespace,tableName,cql); + newRow = executeLockedGet(namespace,mriTableName,pQueryObject,id.toString(),lockId); } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to create table to partition table"); - throw(e); + logger.error("Get operationt error: Failure to get row from MRI "+mriTableName); + throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); } +// public MusicRangeInformationRow(UUID index, List<MusicTxDigestId> redoLog, PartitionInformation partition, + // String ownerId, String metricProcessId) { + List<TupleValue> log = newRow.getList("txredolog",TupleValue.class); + List<MusicTxDigestId> digestIds = new ArrayList<>(); + for(TupleValue t: log){ + //final String tableName = t.getString(0); + final UUID index = t.getUUID(1); + digestIds.add(new MusicTxDigestId(index)); + } + List<Range> partitions = new ArrayList<>(); + Set<String> tables = newRow.getSet("keys",String.class); + for (String table:tables){ + partitions.add(new Range(table)); + } + return new MusicRangeInformationRow(id,digestIds,new PartitionInformation(partitions),newRow.getString("ownerid"),newRow.getString("metricprocessid")); + } - public static void CreatePartitionInfoTable(String musicNamespace, String partitionInformationTableName) - throws MDBCServiceException { - String tableName = partitionInformationTableName; - String priKey = "partition"; - StringBuilder fields = new StringBuilder(); - fields.append("partition uuid, "); - fields.append("latesttittable text, "); - fields.append("latesttitindex uuid, "); - fields.append("tables set<text>, "); - fields.append("replicationfactor int, "); - fields.append("currentowner text"); - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", - musicNamespace, tableName, fields, priKey); + public static HashMap<Range,StagingTable> getTransactionDigest(String namespace, String musicTxDigestTable, MusicTxDigestId id) + throws MDBCServiceException{ + String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", namespace, musicTxDigestTable); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + pQueryObject.addValue(id.tablePrimaryKey); + Row newRow; try { - executeMusicWriteQuery(musicNamespace,tableName,cql); + newRow = executeUnlockedQuorumGet(pQueryObject); } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to create partition information table"); - throw(e); + logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.tablePrimaryKey); + throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); } - } - - public static void CreateRedoHistoryTable(String musicNamespace, String redoHistoryTableName) - throws MDBCServiceException { - String tableName = redoHistoryTableName; - String priKey = "partition,redotable,redoindex"; - StringBuilder fields = new StringBuilder(); - fields.append("partition uuid, "); - fields.append("redotable text, "); - fields.append("redoindex uuid, "); - //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly - fields.append("previousredo set<frozen<tuple<text,uuid>>>"); - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", - musicNamespace, tableName, fields, priKey); + String digest = newRow.getString("transactiondigest"); + HashMap<Range,StagingTable> changes; try { - executeMusicWriteQuery(musicNamespace,tableName,cql); - } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to create redo history table"); - throw(e); + changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest); + } catch (IOException e) { + logger.error("IOException when deserializing digest failed with an invalid class for id:"+id.tablePrimaryKey); + throw new MDBCServiceException("Deserializng digest failed with ioexception"); + } catch (ClassNotFoundException e) { + logger.error("Deserializng digest failed with an invalid class for id:"+id.tablePrimaryKey); + throw new MDBCServiceException("Deserializng digest failed with an invalid class"); } + return changes; } /** @@ -346,16 +211,45 @@ public class DatabaseOperations { try { rt = MusicCore.createTable(keyspace,table,pQueryObject,"critical"); } catch (MusicServiceException e) { + //\TODO: handle better, at least transform into an MDBCServiceException e.printStackTrace(); } - if (rt.getResult().toLowerCase().equals("failure")) { + String result = rt.getResult(); + if (result==null || result.toLowerCase().equals("failure")) { throw new MDBCServiceException("Music eventual put failed"); } } - protected static void executedLockedPut(String namespace, String tableName, - String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId, - MusicCore.Condition conditionInfo) throws MDBCServiceException { + protected static Row executeLockedGet(String keyspace, String table, PreparedQueryObject cqlObject, String primaryKey, + String lock) + throws MDBCServiceException{ + ResultSet result; + try { + result = MusicCore.criticalGet(keyspace,table,primaryKey,cqlObject,lock); + } catch(MusicServiceException e){ + //\TODO: handle better, at least transform into an MDBCServiceException + e.printStackTrace(); + throw new MDBCServiceException("Error executing critical get"); + } + if(result.isExhausted()){ + throw new MDBCServiceException("There is not a row that matches the id "+primaryKey); + } + return result.one(); + } + + protected static Row executeUnlockedQuorumGet(PreparedQueryObject cqlObject) + throws MDBCServiceException{ + ResultSet result = MusicCore.quorumGet(cqlObject); + //\TODO: handle better, at least transform into an MDBCServiceException + if(result.isExhausted()){ + throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]"); + } + return result.one(); + } + + protected static void executeLockedPut(String namespace, String tableName, + String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId, + MusicCore.Condition conditionInfo) throws MDBCServiceException { ReturnType rt ; if(lockId==null) { try { @@ -380,7 +274,7 @@ public class DatabaseOperations { } public static void createNamespace(String namespace, int replicationFactor) throws MDBCServiceException { - Map<String,Object> replicationInfo = new HashMap<String, Object>(); + Map<String,Object> replicationInfo = new HashMap<>(); replicationInfo.put("'class'", "'SimpleStrategy'"); replicationInfo.put("'replication_factor'", replicationFactor); @@ -391,75 +285,32 @@ public class DatabaseOperations { try { MusicCore.nonKeyRelatedPut(queryObject, "critical"); } catch (MusicServiceException e) { - if (e.getMessage().equals("Keyspace "+namespace+" already exists")) { - // ignore - } else { + if (!e.getMessage().equals("Keyspace "+namespace+" already exists")) { logger.error("Error creating namespace: "+namespace); throw new MDBCServiceException("Error creating namespace: "+namespace+". Internal error:"+e.getErrorMessage()); } } } - - /** - * This function creates the MusicTxDigest table. It contain information related to each transaction committed - * * LeaseId: id associated with the lease, text - * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later - * * TransactionDigest: text that contains all the changes in the transaction - */ - public static void CreateMusicTxDigest(int musicTxDigestTableNumber, String musicNamespace, String musicTxDigestTableName) throws MDBCServiceException { - String tableName = musicTxDigestTableName; - if(musicTxDigestTableNumber >= 0) { - StringBuilder table = new StringBuilder(); - table.append(tableName); - table.append("-"); - table.append(Integer.toString(musicTxDigestTableNumber)); - tableName=table.toString(); - } - String priKey = "leaseid,leasecounter"; - StringBuilder fields = new StringBuilder(); - fields.append("leaseid text, "); - fields.append("leasecounter varint, "); - fields.append("transactiondigest text ");//notice lack of ',' - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey); - try { - executeMusicWriteQuery(musicNamespace,tableName,cql); - } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to create redo records table"); - throw(e); - } - } - - /** - * This function creates the TransactionInformation table. It contain information related - * to the transactions happening in a given partition. - * * The schema of the table is - * * Id, uiid. - * * Partition, uuid id of the partition - * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables - * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables - * * Redo: list of uiids associated to the Redo Records Table - * - */ - public static void CreateMusicRangeInformationTable(String musicNamespace, String musicRangeInformationTableName) throws MDBCServiceException { - String tableName = musicRangeInformationTableName; - String priKey = "id"; - StringBuilder fields = new StringBuilder(); - fields.append("id uuid, "); - fields.append("partition uuid, "); - fields.append("latestapplied int, "); - fields.append("applied boolean, "); - //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly - fields.append("redo list<frozen<tuple<text,tuple<text,varint>>>> "); - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey); + public static void createTxDigestRow(String namespace, String musicTxDigestTable, MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException { + PreparedQueryObject query = new PreparedQueryObject(); + String cqlQuery = "INSERT INTO " + + namespace + + '.' + + musicTxDigestTable + + " (txid,transactiondigest) " + + "VALUES (" + + newId.tablePrimaryKey + ",'" + + transactionDigest + + "');"; + query.appendQueryString(cqlQuery); + //\TODO check if I am not shooting on my own foot try { - executeMusicWriteQuery(musicNamespace,tableName,cql); - } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to create transaction information table"); - throw(e); + MusicCore.nonKeyRelatedPut(query,"critical"); + } catch (MusicServiceException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.tablePrimaryKey.toString()+ "with error "+e.getErrorMessage()); + throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.tablePrimaryKey.toString()); } } - - } diff --git a/src/main/java/org/onap/music/mdbc/DatabasePartition.java b/src/main/java/org/onap/music/mdbc/DatabasePartition.java index 79abd3b..5d91dca 100644 --- a/src/main/java/org/onap/music/mdbc/DatabasePartition.java +++ b/src/main/java/org/onap/music/mdbc/DatabasePartition.java @@ -3,12 +3,12 @@ package org.onap.music.mdbc; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; -import java.util.HashSet; -import java.util.Set; +import java.util.*; import org.onap.music.logging.EELFLoggerDelegate; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import org.onap.music.mdbc.tables.MriReference; /** * A database range contain information about what ranges should be hosted in the current MDBC instance @@ -19,11 +19,10 @@ public class DatabasePartition { private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(DatabasePartition.class); private String musicRangeInformationTable;//Table that currently contains the REDO log for this partition - private String musicRangeInformationIndex;//Index that can be obtained either from + private UUID musicRangeInformationIndex;//Index that can be obtained either from private String musicTxDigestTable; - private String partitionId; private String lockId; - protected Set<Range> ranges; + protected List<Range> ranges; /** * Each range represents a partition of the database, a database partition is a union of this partitions. @@ -31,15 +30,15 @@ public class DatabasePartition { */ public DatabasePartition() { - ranges = new HashSet<>(); + ranges = new ArrayList<>(); } - public DatabasePartition(Set<Range> knownRanges, String mriIndex, String mriTable, String partitionId, String lockId, String musicTxDigestTable) { + public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String mriTable, String lockId, String musicTxDigestTable) { if(knownRanges != null) { ranges = knownRanges; } else { - ranges = new HashSet<>(); + ranges = new ArrayList<>(); } if(musicTxDigestTable != null) { @@ -53,7 +52,7 @@ public class DatabasePartition { this.setMusicRangeInformationIndex(mriIndex); } else { - this.setMusicRangeInformationIndex(""); + this.setMusicRangeInformationIndex(null); } if(mriTable != null) { @@ -62,13 +61,6 @@ public class DatabasePartition { else { this.setMusicRangeInformationTable(""); } - - if(partitionId != null) { - this.setPartitionId(partitionId); - } - else { - this.setPartitionId(""); - } if(lockId != null) { this.setLockId(lockId); @@ -86,11 +78,11 @@ public class DatabasePartition { this.musicRangeInformationTable = musicRangeInformationTable; } - public String getMusicRangeInformationIndex() { - return musicRangeInformationIndex; + public MriReference getMusicRangeInformationIndex() { + return new MriReference(musicRangeInformationTable,musicRangeInformationIndex); } - public void setMusicRangeInformationIndex(String musicRangeInformationIndex) { + public void setMusicRangeInformationIndex(UUID musicRangeInformationIndex) { this.musicRangeInformationIndex = musicRangeInformationIndex; } @@ -106,9 +98,7 @@ public class DatabasePartition { throw new IllegalArgumentException("Range is already contain by a previous range"); } } - if(!ranges.contains(newRange)) { - ranges.add(newRange); - } + ranges.add(newRange); } /** @@ -163,14 +153,6 @@ public class DatabasePartition { return range; } - public String getPartitionId() { - return partitionId; - } - - public void setPartitionId(String partitionId) { - this.partitionId = partitionId; - } - public String getLockId() { return lockId; } diff --git a/src/main/java/org/onap/music/mdbc/Range.java b/src/main/java/org/onap/music/mdbc/Range.java index b33fb1c..8ed0150 100644 --- a/src/main/java/org/onap/music/mdbc/Range.java +++ b/src/main/java/org/onap/music/mdbc/Range.java @@ -1,6 +1,7 @@ package org.onap.music.mdbc; import java.io.Serializable; +import java.util.Objects; /** @@ -19,13 +20,24 @@ public class Range implements Serializable { this.table = table; } + public String toString(){return table;} + /** * Compares to Range types - * @param other the other range against which this is compared + * @param o the other range against which this is compared * @return the equality result */ - public boolean equal(Range other) { - return (table == other.table); + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Range r = (Range) o; + return (table.equals(r.table)); + } + + @Override + public int hashCode(){ + return Objects.hash(table); } public boolean overlaps(Range other) { diff --git a/src/main/java/org/onap/music/mdbc/RedoRow.java b/src/main/java/org/onap/music/mdbc/RedoRow.java index db17e60..c9c83eb 100644 --- a/src/main/java/org/onap/music/mdbc/RedoRow.java +++ b/src/main/java/org/onap/music/mdbc/RedoRow.java @@ -1,12 +1,14 @@ package org.onap.music.mdbc; +import java.util.UUID; + public class RedoRow { private String redoTableName; - private String redoRowIndex; + private UUID redoRowIndex; public RedoRow(){} - public RedoRow(String redoTableName, String redoRowIndex){ + public RedoRow(String redoTableName, UUID redoRowIndex){ this.redoRowIndex = redoRowIndex; this.redoTableName = redoTableName; } @@ -19,11 +21,11 @@ public class RedoRow { this.redoTableName = redoTableName; } - public String getRedoRowIndex() { + public UUID getRedoRowIndex() { return redoRowIndex; } - public void setRedoRowIndex(String redoRowIndex) { + public void setRedoRowIndex(UUID redoRowIndex) { this.redoRowIndex = redoRowIndex; } } diff --git a/src/main/java/org/onap/music/mdbc/StateManager.java b/src/main/java/org/onap/music/mdbc/StateManager.java index b2c2adb..4c7b9aa 100644 --- a/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/src/main/java/org/onap/music/mdbc/StateManager.java @@ -65,7 +65,7 @@ public class StateManager { } protected void init(String mixin, String cassandraUrl) throws MDBCServiceException { - this.musicManager = MixinFactory.createMusicInterface(mixin, cassandraUrl, info,ranges); + this.musicManager = MixinFactory.createMusicInterface(mixin, cassandraUrl, info); this.musicManager.createKeyspace(); try { this.musicManager.initializeMetricDataStructures(); diff --git a/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java b/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java index 02de1b8..156c901 100644 --- a/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java +++ b/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java @@ -11,8 +11,7 @@ import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; -import java.util.HashSet; -import java.util.Set; +import java.util.*; public class NodeConfiguration { @@ -22,14 +21,15 @@ public class NodeConfiguration { public DatabasePartition partition; public String nodeName; - public NodeConfiguration(String tables, String mriIndex, String mriTableName, String partitionId, String sqlDatabaseName, String node, String redoRecordsTable){ - partition = new DatabasePartition(toRanges(tables), mriIndex, mriTableName, partitionId, null, redoRecordsTable) ; + public NodeConfiguration(String tables, UUID mriIndex, String mriTableName, String sqlDatabaseName, String node, String redoRecordsTable){ + // public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String mriTable, String lockId, String musicTxDigestTable) { + partition = new DatabasePartition(toRanges(tables), mriIndex, mriTableName, null, redoRecordsTable) ; this.sqlDatabaseName = sqlDatabaseName; this.nodeName = node; } - protected Set<Range> toRanges(String tables){ - Set<Range> newRange = new HashSet<>(); + protected List<Range> toRanges(String tables){ + List<Range> newRange = new ArrayList<>(); String[] tablesArray=tables.split(","); for(String table: tablesArray) { newRange.add(new Range(table)); diff --git a/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java index 07f87cf..c9f36e5 100644 --- a/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java +++ b/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java @@ -3,8 +3,9 @@ package org.onap.music.mdbc.configurations; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.mdbc.DatabaseOperations; +import org.onap.music.mdbc.Range; import org.onap.music.mdbc.RedoRow; -import org.onap.music.mdbc.mixins.CassandraMixin; + import com.google.gson.Gson; import org.onap.music.datastore.PreparedQueryObject; import org.onap.music.exceptions.MusicServiceException; @@ -15,6 +16,7 @@ import java.io.FileNotFoundException; import java.io.FileReader; import java.util.ArrayList; import java.util.List; +import java.util.UUID; public class TablesConfiguration { @@ -43,12 +45,6 @@ public class TablesConfiguration { initInternalNamespace(); DatabaseOperations.createNamespace(musicNamespace, internalReplicationFactor); List<NodeConfiguration> nodeConfigs = new ArrayList<>(); - String ttpName = (tableToPartitionName==null || tableToPartitionName.isEmpty())?CassandraMixin.TABLE_TO_PARTITION_TABLE_NAME:tableToPartitionName; - DatabaseOperations.CreateTableToPartitionTable(musicNamespace,ttpName); - String pitName = (partitionInformationTableName==null || partitionInformationTableName.isEmpty())?CassandraMixin.PARTITION_INFORMATION_TABLE_NAME:partitionInformationTableName; - DatabaseOperations.CreatePartitionInfoTable(musicNamespace,pitName); - String rhName = (redoHistoryTableName==null || redoHistoryTableName.isEmpty())?CassandraMixin.REDO_HISTORY_TABLE_NAME:redoHistoryTableName; - DatabaseOperations.CreateRedoHistoryTable(musicNamespace,rhName); if(partitions == null){ logger.error("Partitions was not correctly initialized"); throw new MDBCServiceException("Partition was not correctly initialized"); @@ -57,10 +53,10 @@ public class TablesConfiguration { String mriTableName = partitionInfo.mriTableName; mriTableName = (mriTableName==null || mriTableName.isEmpty())?TIT_TABLE_NAME:mriTableName; //0) Create the corresponding Music Range Information table - DatabaseOperations.CreateMusicRangeInformationTable(musicNamespace,mriTableName); + DatabaseOperations.createMusicRangeInformationTable(musicNamespace,mriTableName); String musicTxDigestTableName = partitionInfo.mtxdTableName; musicTxDigestTableName = (musicTxDigestTableName==null || musicTxDigestTableName.isEmpty())? MUSIC_TX_DIGEST_TABLE_NAME :musicTxDigestTableName; - DatabaseOperations.CreateMusicTxDigest(-1,musicNamespace,musicTxDigestTableName); + DatabaseOperations.createMusicTxDigest(musicNamespace,musicTxDigestTableName); String partitionId; if(partitionInfo.partitionId==null || partitionInfo.partitionId.isEmpty()){ if(partitionInfo.replicationFactor==0){ @@ -68,25 +64,29 @@ public class TablesConfiguration { throw new MDBCServiceException("Replication factor and partition id are both empty, and this is an invalid configuration"); } //1) Create a row in the partition info table - partitionId = DatabaseOperations.createPartitionInfoRow(musicNamespace,pitName,partitionInfo.replicationFactor,partitionInfo.tables,null); + //partitionId = DatabaseOperations.createPartitionInfoRow(musicNamespace,pitName,partitionInfo.replicationFactor,partitionInfo.tables,null); } else{ partitionId = partitionInfo.partitionId; } //2) Create a row in the transaction information table - String mriTableIndex = DatabaseOperations.CreateEmptyTitRow(musicNamespace,mriTableName,partitionId,null); + UUID mriTableIndex = DatabaseOperations.createEmptyMriRow(musicNamespace,mriTableName,"",null,partitionInfo.getTables()); //3) Add owner and tit information to partition info table RedoRow newRedoRow = new RedoRow(mriTableName,mriTableIndex); - DatabaseOperations.updateRedoRow(musicNamespace,pitName,partitionId,newRedoRow,partitionInfo.owner,null); + //DatabaseOperations.updateRedoRow(musicNamespace,pitName,partitionId,newRedoRow,partitionInfo.owner,null); //4) Update ttp with the new partition - for(String table: partitionInfo.tables) { - DatabaseOperations.updateTableToPartition(musicNamespace, ttpName, table, partitionId, null); - } + //for(String table: partitionInfo.tables) { + //DatabaseOperations.updateTableToPartition(musicNamespace, ttpName, table, partitionId, null); + //} //5) Add it to the redo history table - DatabaseOperations.createRedoHistoryBeginRow(musicNamespace,rhName,newRedoRow,partitionId,null); + //DatabaseOperations.createRedoHistoryBeginRow(musicNamespace,rhName,newRedoRow,partitionId,null); //6) Create config for this node - nodeConfigs.add(new NodeConfiguration(String.join(",",partitionInfo.tables),mriTableIndex,mriTableName,partitionId,sqlDatabaseName,partitionInfo.owner,musicTxDigestTableName)); + StringBuilder newStr = new StringBuilder(); + for(Range r: partitionInfo.tables){ + newStr.append(r.toString()).append(","); + } + nodeConfigs.add(new NodeConfiguration(newStr.toString(),mriTableIndex,mriTableName,sqlDatabaseName,partitionInfo.owner,musicTxDigestTableName)); } return nodeConfigs; } @@ -121,18 +121,18 @@ public class TablesConfiguration { } public class PartitionInformation{ - private List<String> tables; + private List<Range> tables; private String owner; private String mriTableName; private String mtxdTableName; private String partitionId; private int replicationFactor; - public List<String> getTables() { + public List<Range> getTables() { return tables; } - public void setTables(List<String> tables) { + public void setTables(List<Range> tables) { this.tables = tables; } diff --git a/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json b/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json index e67dd0b..383593a 100644 --- a/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json +++ b/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json @@ -1,19 +1,23 @@ { - "partitions" : [ + "partitions": [ { - "tables":["table11"], - "owner":"", - "mriTableName":"musicrangeinformation", - "mtxdTableName":"musictxdigest", - "partitionId":"", - "replicationFactor":1 + "tables": [ + { + "table": "table11" + } + ], + "owner": "", + "mriTableName": "musicrangeinformation", + "mtxdTableName": "musictxdigest", + "partitionId": "", + "replicationFactor": 1 } ], - "musicNamespace":"namespace", - "tableToPartitionName":"tabletopartition", - "partitionInformationTableName":"partitioninfo", - "redoHistoryTableName":"redohistory", - "sqlDatabaseName":"test", - "internalNamespace":"music_internal", - "internalReplicationFactor":1 + "musicNamespace": "namespace", + "tableToPartitionName": "tabletopartition", + "partitionInformationTableName": "partitioninfo", + "redoHistoryTableName": "redohistory", + "sqlDatabaseName": "test", + "internalNamespace": "music_internal", + "internalReplicationFactor": 1 } diff --git a/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java b/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java index 372224d..1efb795 100755 --- a/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java +++ b/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java @@ -38,8 +38,8 @@ public class Cassandra2Mixin extends CassandraMixin { super(); } - public Cassandra2Mixin(String url, Properties info, DatabasePartition ranges) throws MusicServiceException { - super(url, info,ranges); + public Cassandra2Mixin(String url, Properties info) throws MusicServiceException { + super(url, info); } /** diff --git a/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java b/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java index cb9c6e2..75eca0e 100755 --- a/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java +++ b/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java @@ -16,8 +16,9 @@ import java.util.TreeSet; import java.util.UUID; import org.onap.music.mdbc.*; +import org.onap.music.mdbc.DatabaseOperations; import org.onap.music.mdbc.tables.PartitionInformation; -import org.onap.music.mdbc.tables.MusixTxDigestId; +import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MusicRangeInformationRow; @@ -86,10 +87,6 @@ public class CassandraMixin implements MusicInterface { /** Namespace for the tables in MUSIC (Cassandra) */ public static final String DEFAULT_MUSIC_NAMESPACE = "namespace"; - /** Name of the tables required for MDBC */ - public static final String TABLE_TO_PARTITION_TABLE_NAME = "tabletopartition"; - public static final String PARTITION_INFORMATION_TABLE_NAME = "partitioninfo"; - public static final String REDO_HISTORY_TABLE_NAME= "redohistory"; //\TODO Add logic to change the names when required and create the tables when necessary private String musicTxDigestTableName = "musictxdigest"; private String musicRangeInformationTableName = "musicrangeinformation"; @@ -119,7 +116,6 @@ public class CassandraMixin implements MusicInterface { //typemap.put(Types.DATE, "TIMESTAMP"); } - protected DatabasePartition ranges; protected final String music_ns; protected final String myId; protected final String[] allReplicaIds; @@ -140,8 +136,7 @@ public class CassandraMixin implements MusicInterface { this.allReplicaIds = null; } - public CassandraMixin(String url, Properties info, DatabasePartition ranges) throws MusicServiceException { - this.ranges = ranges; + public CassandraMixin(String url, Properties info) throws MusicServiceException { // Default values -- should be overridden in the Properties // Default to using the host_ids of the various peers as the replica IDs (this is probably preferred) this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS); @@ -220,11 +215,8 @@ public class CassandraMixin implements MusicInterface { @Override public void initializeMetricDataStructures() throws MDBCServiceException { try { - DatabaseOperations.CreateMusicTxDigest(-1, music_ns, musicTxDigestTableName);//\TODO If we start partitioning the data base, we would need to use the redotable number - DatabaseOperations.CreateMusicRangeInformationTable(music_ns, musicRangeInformationTableName); - DatabaseOperations.CreateTableToPartitionTable(music_ns, TABLE_TO_PARTITION_TABLE_NAME); - DatabaseOperations.CreatePartitionInfoTable(music_ns, PARTITION_INFORMATION_TABLE_NAME); - DatabaseOperations.CreateRedoHistoryTable(music_ns, REDO_HISTORY_TABLE_NAME); + DatabaseOperations.createMusicTxDigest(music_ns, musicTxDigestTableName);//\TODO If we start partitioning the data base, we would need to use the redotable number + DatabaseOperations.createMusicRangeInformationTable(music_ns, musicRangeInformationTableName); } catch(MDBCServiceException e){ logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC"); @@ -559,26 +551,9 @@ public class CassandraMixin implements MusicInterface { sess.execute(bound); }*/ String primaryKey = getMusicKeyFromRow(ti,tableName, oldRow); - if(MusicMixin.criticalTables.contains(tableName)) { - ReturnType rt = null; - try { - rt = MusicCore.atomicPut(music_ns, tableName, primaryKey, pQueryObject, null); - } catch (MusicLockingException e) { - e.printStackTrace(); - } catch (MusicServiceException e) { - e.printStackTrace(); - } catch (MusicQueryException e) { - e.printStackTrace(); - } - if(rt.getResult().getResult().toLowerCase().equals("failure")) { - System.out.println("Failure while critical put..."+rt.getMessage()); - } - } else { - ReturnType rt = MusicCore.eventualPut(pQueryObject); - if(rt.getResult().getResult().toLowerCase().equals("failure")) { - System.out.println("Failure while critical put..."+rt.getMessage()); - } - } + + updateMusicDB(tableName, primaryKey, pQueryObject); + // Mark the dirty rows in music for all the replicas but us markDirtyRow(ti,tableName, oldRow); } @@ -934,7 +909,7 @@ public class CassandraMixin implements MusicInterface { * Return the function for cassandra's primary key generation */ public String generateUniqueKey() { - return UUID.randomUUID().toString(); + return DatabaseOperations.generateUniqueKey().toString(); } @Override @@ -1023,27 +998,29 @@ public class CassandraMixin implements MusicInterface { } - private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, String uuid, String table, UUID redoUuid){ + private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, String table, UUID redoUuid){ PreparedQueryObject query = new PreparedQueryObject(); StringBuilder appendBuilder = new StringBuilder(); appendBuilder.append("UPDATE ") .append(music_ns) .append(".") .append(mriTable) - .append(" SET redo = redo +[('") + .append(" SET txredolog = txredolog +[('") .append(table) .append("',") .append(redoUuid) - .append(")] WHERE id = ") + .append(")] WHERE rangeid = ") .append(uuid) .append(";"); query.appendQueryString(appendBuilder.toString()); return query; } - protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition, String keyspace, String table, String key) throws MDBCServiceException { + protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException { + MriReference mriIndex = partition.getMusicRangeInformationIndex(); String lockId; lockId = MusicCore.createLockReference(fullyQualifiedKey); + //\TODO Handle better failures to acquire locks ReturnType lockReturn; try { lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId); @@ -1062,11 +1039,11 @@ public class CassandraMixin implements MusicInterface { try { MusicCore.forciblyReleaseLock(fullyQualifiedKey,lockId); CassaLockStore lockingServiceHandle = MusicCore.getLockingServiceHandle(); - CassaLockStore.LockObject lockOwner = lockingServiceHandle.peekLockQueue(keyspace, table, key); + CassaLockStore.LockObject lockOwner = lockingServiceHandle.peekLockQueue(music_ns, partition.getMusicRangeInformationTable(), mriIndex.index.toString()); while(lockOwner.lockRef != lockId) { MusicCore.forciblyReleaseLock(fullyQualifiedKey, lockOwner.lockRef); try { - lockOwner = lockingServiceHandle.peekLockQueue(keyspace, table, key); + lockOwner = lockingServiceHandle.peekLockQueue(music_ns, partition.getMusicRangeInformationTable(), mriIndex.index.toString()); } catch(NullPointerException e){ //Ignore null pointer exception lockId = MusicCore.createLockReference(fullyQualifiedKey); @@ -1093,35 +1070,11 @@ public class CassandraMixin implements MusicInterface { return lockId; } - protected void pushRowToMtxd(UUID commitId, HashMap<Range,StagingTable> transactionDigest) throws MDBCServiceException{ - PreparedQueryObject query = new PreparedQueryObject(); - StringBuilder cqlQuery = new StringBuilder("INSERT INTO ") - .append(music_ns) - .append('.') - .append(musicTxDigestTableName) - .append(" (txid,transactiondigest) ") - .append("VALUES ('") - .append( commitId ).append(",'"); - try { - cqlQuery.append( MDBCUtils.toString(transactionDigest) ); - } catch (IOException e) { - logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+commitId); - throw new MDBCServiceException("Transaction Digest serialization was invalid for commit "+commitId); - } - cqlQuery.append("');"); - query.appendQueryString(cqlQuery.toString()); - //\TODO check if I am not shooting on my own foot - try { - MusicCore.nonKeyRelatedPut(query,"critical"); - } catch (MusicServiceException e) { - logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+commitId); - throw new MDBCServiceException("Transaction Digest serialization for commit "+commitId); - } - } - protected void appendIndexToMri(String lockId, UUID commitId, String MriIndex) throws MDBCServiceException{ + + protected void appendIndexToMri(String lockId, UUID commitId, UUID MriIndex) throws MDBCServiceException{ PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MriIndex, musicTxDigestTableName, commitId); - ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, MriIndex, appendQuery, lockId, null); + ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, MriIndex.toString(), appendQuery, lockId, null); if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){ logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage()); throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage()); @@ -1130,16 +1083,16 @@ public class CassandraMixin implements MusicInterface { @Override public void commitLog(DBInterface dbi, DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{ - String MriIndex = partition.getMusicRangeInformationIndex(); - if(MriIndex.isEmpty()) { + MriReference mriIndex = partition.getMusicRangeInformationIndex(); + if(mriIndex==null) { //\TODO Fetch MriIndex from the Range Information Table throw new MDBCServiceException("TIT Index retrieval not yet implemented"); } - String fullyQualifiedTitKey = music_ns+"."+ musicRangeInformationTableName +"."+MriIndex; + String fullyQualifiedMriKey = music_ns+"."+ mriIndex.table+"."+mriIndex.index.toString(); //0. See if reference to lock was already created String lockId = partition.getLockId(); if(lockId == null || lockId.isEmpty()) { - lockId = createAndAssignLock(fullyQualifiedTitKey,partition,music_ns, musicRangeInformationTableName,MriIndex); + lockId = createAndAssignLock(fullyQualifiedMriKey,partition); } UUID commitId; @@ -1154,14 +1107,20 @@ public class CassandraMixin implements MusicInterface { //Add creation type of transaction digest //1. Push new row to RRT and obtain its index - pushRowToMtxd(commitId, transactionDigest); - + String serializedTransactionDigest; + try { + serializedTransactionDigest = MDBCUtils.toString(transactionDigest); + } catch (IOException e) { + throw new MDBCServiceException("Failed to serialized transaction digest with error "+e.toString()); + } + MusicTxDigestId digestId = new MusicTxDigestId(commitId); + addTxDigest(musicTxDigestTableName, digestId, serializedTransactionDigest); //2. Save RRT index to RQ if(progressKeeper!= null) { - progressKeeper.setRecordId(txId,new MusixTxDigestId(commitId)); + progressKeeper.setRecordId(txId,digestId); } //3. Append RRT index into the corresponding TIT row array - appendIndexToMri(lockId,commitId,MriIndex); + appendToRedoLog(mriIndex,partition,digestId); } /** @@ -1184,10 +1143,8 @@ public class CassandraMixin implements MusicInterface { rs = sess.execute(bound); } - // //should never reach here logger.error(EELFLoggerDelegate.errorLogger, "Could not find the row in the primary key"); - return null; } @@ -1214,34 +1171,47 @@ public class CassandraMixin implements MusicInterface { } @Override - - public MusicRangeInformationRow getMusicRangeInformation(UUID id){ - throw new UnsupportedOperationException(); + public MusicRangeInformationRow getMusicRangeInformation(DatabasePartition partition) throws MDBCServiceException { + //TODO: verify that lock id is valid before calling the database operations function + MriReference reference = partition.getMusicRangeInformationIndex(); + return DatabaseOperations.getMriRow(music_ns,reference.table,reference.index,partition.getLockId()); } @Override - public MriReference createMusicRangeInformation(MusicRangeInformationRow info){ + public DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException { + DatabasePartition newPartition = new DatabasePartition(info.partition.ranges,info.index, + musicRangeInformationTableName,null,musicTxDigestTableName); + String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+info.index.toString(); + String lockId = createAndAssignLock(fullyQualifiedMriKey,newPartition); + DatabaseOperations.createEmptyMriRow(music_ns,musicRangeInformationTableName,info.metricProcessId,lockId,info.partition.ranges); throw new UnsupportedOperationException(); } @Override - public void appendToRedoLog(MriReference mriRowId, DatabasePartition partition, MusixTxDigestId newRecord){ - throw new UnsupportedOperationException(); + public void appendToRedoLog(MriReference mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException { + PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, mriRowId.index, musicTxDigestTableName, newRecord.tablePrimaryKey); + ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, mriRowId.index.toString(), appendQuery, partition.getLockId(), null); + if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){ + logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage()); + throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage()); + } } @Override - public void addTxDigest(String musicTxDigestTable, MusixTxDigestId newId, String transactionDigest){ - throw new UnsupportedOperationException(); + public void addTxDigest(String musicTxDigestTable, MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException { + DatabaseOperations.createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest); } @Override - public List<PartitionInformation> getPartitionInformation(DatabasePartition partition){ - throw new UnsupportedOperationException(); + public PartitionInformation getPartitionInformation(DatabasePartition partition) throws MDBCServiceException { + //\TODO We may want to cache this information to avoid going to the database to obtain this simple information + MusicRangeInformationRow row = getMusicRangeInformation(partition); + return row.partition; } @Override - public HashMap<Range,StagingTable> getTransactionDigest(MusixTxDigestId id){ - throw new UnsupportedOperationException(); + public HashMap<Range,StagingTable> getTransactionDigest(MusicTxDigestId id) throws MDBCServiceException { + return DatabaseOperations.getTransactionDigest(music_ns, musicTxDigestTableName, id); } @Override diff --git a/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java b/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java index de46187..c0c6a64 100755 --- a/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java +++ b/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java @@ -63,7 +63,7 @@ public class MixinFactory { * @param info the Properties to use as an argument to the constructor * @return the newly constructed MusicInterface, or null if one cannot be found. */ - public static MusicInterface createMusicInterface(String name, String url, Properties info, DatabasePartition ranges) { + public static MusicInterface createMusicInterface(String name, String url, Properties info) { for (Class<?> cl : Utils.getClassesImplementing(MusicInterface.class)) { try { Constructor<?> con = cl.getConstructor(); @@ -72,10 +72,10 @@ public class MixinFactory { String miname = mi.getMixinName(); logger.info(EELFLoggerDelegate.applicationLogger, "Checking "+miname); if (miname.equalsIgnoreCase(name)) { - con = cl.getConstructor(String.class, Properties.class, DatabasePartition.class); + con = cl.getConstructor(String.class, Properties.class); if (con != null) { logger.info(EELFLoggerDelegate.applicationLogger,"Found match: "+miname); - return (MusicInterface) con.newInstance(url, info, ranges); + return (MusicInterface) con.newInstance(url, info); } } } diff --git a/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java index abf8f36..35cfb00 100755 --- a/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java +++ b/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java @@ -12,7 +12,7 @@ import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; import org.onap.music.mdbc.tables.PartitionInformation; -import org.onap.music.mdbc.tables.MusixTxDigestId; +import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MusicRangeInformationRow; @@ -151,17 +151,17 @@ public interface MusicInterface { */ void commitLog(DBInterface dbi, DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException; - MusicRangeInformationRow getMusicRangeInformation(UUID id); + MusicRangeInformationRow getMusicRangeInformation(DatabasePartition partition) throws MDBCServiceException; - MriReference createMusicRangeInformation(MusicRangeInformationRow info); + DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException; - void appendToRedoLog(MriReference mriRowId, DatabasePartition partition, MusixTxDigestId newRecord); + void appendToRedoLog(MriReference mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException; - void addTxDigest(String musicTxDigestTable, MusixTxDigestId newId, String transactionDigest); + void addTxDigest(String musicTxDigestTable, MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException; - List<PartitionInformation> getPartitionInformation(DatabasePartition partition); + PartitionInformation getPartitionInformation(DatabasePartition partition) throws MDBCServiceException; - HashMap<Range,StagingTable> getTransactionDigest(MusixTxDigestId id); + HashMap<Range,StagingTable> getTransactionDigest(MusicTxDigestId id) throws MDBCServiceException; void own(List<Range> ranges); diff --git a/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index f7b667d..58ed35c 100644 --- a/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -13,7 +13,7 @@ import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; import org.onap.music.mdbc.tables.PartitionInformation; -import org.onap.music.mdbc.tables.MusixTxDigestId; +import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MusicRangeInformationRow; @@ -189,26 +189,26 @@ public class MusicMixin implements MusicInterface { } @Override - public HashMap<Range, StagingTable> getTransactionDigest(MusixTxDigestId id) { + public HashMap<Range, StagingTable> getTransactionDigest(MusicTxDigestId id) { return null; } @Override - public List<PartitionInformation> getPartitionInformation(DatabasePartition partition) { + public PartitionInformation getPartitionInformation(DatabasePartition partition) { return null; } @Override - public MriReference createMusicRangeInformation(MusicRangeInformationRow info) { + public DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) { return null; } @Override - public void appendToRedoLog(MriReference mriRowId, DatabasePartition partition, MusixTxDigestId newRecord) { + public void appendToRedoLog(MriReference mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) { } @Override - public void addTxDigest(String musicTxDigestTable, MusixTxDigestId newId, String transactionDigest) { + public void addTxDigest(String musicTxDigestTable, MusicTxDigestId newId, String transactionDigest) { } @Override @@ -227,7 +227,7 @@ public class MusicMixin implements MusicInterface { } @Override - public MusicRangeInformationRow getMusicRangeInformation(UUID id){ + public MusicRangeInformationRow getMusicRangeInformation(DatabasePartition partition){ return null; } } diff --git a/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java b/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java index 5d7bc97..8a1d2e8 100644 --- a/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java +++ b/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java @@ -6,11 +6,17 @@ import java.util.UUID; public final class MusicRangeInformationRow { public final UUID index; public final PartitionInformation partition; - public final List<MusixTxDigestId> redoLog; + public final List<MusicTxDigestId> redoLog; + public final String ownerId; + public final String metricProcessId; - public MusicRangeInformationRow(UUID index, List<MusixTxDigestId> redoLog, PartitionInformation partition) { + public MusicRangeInformationRow(UUID index, List<MusicTxDigestId> redoLog, PartitionInformation partition, + String ownerId, String metricProcessId) { this.index = index; this.redoLog = redoLog; this.partition = partition; + this.ownerId = ownerId; + this.metricProcessId = metricProcessId; } + } diff --git a/src/main/java/org/onap/music/mdbc/tables/MusixTxDigestId.java b/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java index 0eccd53..5b8fadd 100644 --- a/src/main/java/org/onap/music/mdbc/tables/MusixTxDigestId.java +++ b/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java @@ -2,10 +2,10 @@ package org.onap.music.mdbc.tables; import java.util.UUID; -public final class MusixTxDigestId { +public final class MusicTxDigestId { public final UUID tablePrimaryKey; - public MusixTxDigestId(UUID primaryKey) { + public MusicTxDigestId(UUID primaryKey) { this.tablePrimaryKey= primaryKey; } diff --git a/src/main/java/org/onap/music/mdbc/tables/Operation.java b/src/main/java/org/onap/music/mdbc/tables/Operation.java index 85428a7..d3aabe0 100644 --- a/src/main/java/org/onap/music/mdbc/tables/Operation.java +++ b/src/main/java/org/onap/music/mdbc/tables/Operation.java @@ -25,4 +25,12 @@ public final class Operation implements Serializable{ public OperationType getOperationType() { return this.TYPE; } + + @Override + public boolean equals(Object o){ + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Operation r = (Operation) o; + return TYPE.equals(r.TYPE) && NEW_VAL.equals(r.NEW_VAL); + } } diff --git a/src/main/java/org/onap/music/mdbc/tables/PartitionInformation.java b/src/main/java/org/onap/music/mdbc/tables/PartitionInformation.java index 3f99098..2f048b7 100644 --- a/src/main/java/org/onap/music/mdbc/tables/PartitionInformation.java +++ b/src/main/java/org/onap/music/mdbc/tables/PartitionInformation.java @@ -1,11 +1,13 @@ package org.onap.music.mdbc.tables; +import org.onap.music.mdbc.Range; + import java.util.List; public class PartitionInformation { - public final List<String> tables; + public final List<Range> ranges; - public PartitionInformation(List<String> tables) { - this.tables=tables; + public PartitionInformation(List<Range> ranges) { + this.ranges=ranges; } } diff --git a/src/main/java/org/onap/music/mdbc/tables/TxCommitProgress.java b/src/main/java/org/onap/music/mdbc/tables/TxCommitProgress.java index 73ef4b2..02942c6 100644 --- a/src/main/java/org/onap/music/mdbc/tables/TxCommitProgress.java +++ b/src/main/java/org/onap/music/mdbc/tables/TxCommitProgress.java @@ -1,6 +1,5 @@ package org.onap.music.mdbc.tables; -import java.math.BigInteger; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -9,7 +8,6 @@ import com.datastax.driver.core.utils.UUIDs; import org.onap.music.logging.EELFLoggerDelegate; import java.sql.Connection; -import java.util.concurrent.atomic.AtomicReference; public class TxCommitProgress{ @@ -71,7 +69,7 @@ public class TxCommitProgress{ return prog.getConnection(); } - public void setRecordId(String txId, MusixTxDigestId recordId){ + public void setRecordId(String txId, MusicTxDigestId recordId){ CommitProgress prog = transactionInfo.get(txId); if(prog == null){ logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when setting record Id",txId); @@ -79,7 +77,7 @@ public class TxCommitProgress{ prog.setRecordId(recordId); } - public MusixTxDigestId getRecordId(String txId) { + public MusicTxDigestId getRecordId(String txId) { CommitProgress prog = transactionInfo.get(txId); if(prog == null){ logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when getting record Id",txId); @@ -124,10 +122,10 @@ final class CommitProgress{ private boolean MusicDone; // indicates if music commit was already performed, atomic bool private Connection connection;// reference to a connection object. This is used to complete a commit if it failed in the original thread. private Long timestamp; // last time this data structure was updated - private MusixTxDigestId musixTxDigestId;// record id for each partition + private MusicTxDigestId musicTxDigestId;// record id for each partition public CommitProgress(String id,Connection conn){ - musixTxDigestId =null; + musicTxDigestId =null; lTxId = id; commitRequested = false; SQLDone = false; @@ -148,7 +146,7 @@ final class CommitProgress{ public synchronized void reinitialize() { commitId = null; - musixTxDigestId =null; + musicTxDigestId =null; commitRequested = false; SQLDone = false; MusicDone = false; @@ -179,17 +177,17 @@ final class CommitProgress{ return timestamp; } - public synchronized void setRecordId(MusixTxDigestId id) { - musixTxDigestId = id; + public synchronized void setRecordId(MusicTxDigestId id) { + musicTxDigestId = id; timestamp = System.currentTimeMillis(); } public synchronized boolean isRedoRecordAssigned() { - return this.musixTxDigestId !=null; + return this.musicTxDigestId !=null; } - public synchronized MusixTxDigestId getRecordId() { - return musixTxDigestId; + public synchronized MusicTxDigestId getRecordId() { + return musicTxDigestId; } public synchronized UUID getCommitId() { diff --git a/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java b/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java index 53bbc53..afd1a47 100644 --- a/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java +++ b/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java @@ -5,6 +5,8 @@ import org.onap.music.mdbc.configurations.NodeConfiguration; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; +import java.util.UUID; + public class CreatePartition { public static final EELFLoggerDelegate LOG = EELFLoggerDelegate.getLogger(CreatePartition.class); @@ -23,9 +25,6 @@ public class CreatePartition { @Parameter(names = { "-r", "--music-tx-digest-table-name" }, required = true, description = "Music Transaction Digest Table name") private String mtxdTable; - @Parameter(names = { "-p", "--partition-id" }, required = true, - description = "Partition Id") - private String partitionId; @Parameter(names = { "-h", "-help", "--help" }, help = true, description = "Print the help message") private boolean help = false; @@ -36,7 +35,7 @@ public class CreatePartition { } public void convert(){ - config = new NodeConfiguration(tables, mriIndex,mriTable,partitionId,"test","", mtxdTable); + config = new NodeConfiguration(tables, UUID.fromString(mriIndex),mriTable,"test","", mtxdTable); } public void saveToFile(){ |