aboutsummaryrefslogtreecommitdiffstats
path: root/mdbc-server/src/main/java/org/onap
diff options
context:
space:
mode:
authorstatta <statta@research.att.com>2019-04-05 10:54:06 -0400
committerstatta <statta@research.att.com>2019-04-05 10:54:29 -0400
commit8ee3c10928c025c3908ce60314c40dd22755e544 (patch)
tree4692acecb9392d5ebfb34fd7748184739351009f /mdbc-server/src/main/java/org/onap
parent361533ff29d77c78a418967229e9fc6d72d46bd6 (diff)
Optimize eventual reads
Issue-ID: MUSIC-371 Change-Id: Ica2a27a16dd82e5c99cb5775d39c6526ed086187 Signed-off-by: statta <statta@research.att.com>
Diffstat (limited to 'mdbc-server/src/main/java/org/onap')
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java251
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java12
2 files changed, 141 insertions, 122 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index c176ad9..39b1e21 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -156,7 +156,7 @@ public class MusicMixin implements MusicInterface {
static {
// We only support the following type mappings currently (from DB -> Cassandra).
// Anything else will likely cause a NullPointerException
- typemap.put(Types.BIGINT, "BIGINT"); // aka. IDENTITY
+ typemap.put(Types.BIGINT, "BIGINT"); // aka. IDENTITY
typemap.put(Types.BLOB, "VARCHAR");
typemap.put(Types.BOOLEAN, "BOOLEAN");
typemap.put(Types.CLOB, "BLOB");
@@ -169,10 +169,10 @@ public class MusicMixin implements MusicInterface {
typemap.put(Types.TIMESTAMP, "VARCHAR");
typemap.put(Types.VARBINARY, "BLOB");
typemap.put(Types.VARCHAR, "VARCHAR");
- typemap.put(Types.CHAR, "VARCHAR");
+ typemap.put(Types.CHAR, "VARCHAR");
//The "Hacks", these don't have a direct mapping
- //typemap.put(Types.DATE, "VARCHAR");
- //typemap.put(Types.DATE, "TIMESTAMP");
+ //typemap.put(Types.DATE, "VARCHAR");
+ //typemap.put(Types.DATE, "TIMESTAMP");
}
@@ -389,12 +389,12 @@ public class MusicMixin implements MusicInterface {
@Override
public void createDirtyRowTable(TableInfo ti, String tableName) {
// create dirtybitsTable at all replicas
-// for (String repl : allReplicaIds) {
-//// String dirtyRowsTableName = "dirty_"+tableName+"_"+allReplicaIds[i];
-//// String dirtyTableQuery = "CREATE TABLE IF NOT EXISTS "+music_ns+"."+ dirtyRowsTableName+" (dirtyRowKeys text PRIMARY KEY);";
-// cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s_%s (dirtyRowKeys TEXT PRIMARY KEY);", music_ns, tableName, repl);
-// executeMusicWriteQuery(cql);
-// }
+// for (String repl : allReplicaIds) {
+//// String dirtyRowsTableName = "dirty_"+tableName+"_"+allReplicaIds[i];
+//// String dirtyTableQuery = "CREATE TABLE IF NOT EXISTS "+music_ns+"."+ dirtyRowsTableName+" (dirtyRowKeys text PRIMARY KEY);";
+// cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s_%s (dirtyRowKeys TEXT PRIMARY KEY);", music_ns, tableName, repl);
+// executeMusicWriteQuery(cql);
+// }
StringBuilder ddl = new StringBuilder("REPLICA__ TEXT");
StringBuilder cols = new StringBuilder("REPLICA__");
for (int i = 0; i < ti.columns.size(); i++) {
@@ -451,8 +451,8 @@ public class MusicMixin implements MusicInterface {
System.err.println("markDIrtyRow need to fix primary key");
}
String cql = String.format("INSERT INTO %s.DIRTY_%s (%s) VALUES (%s);", music_ns, tableName, cols.toString(), vals.toString());
- /*Session sess = getMusicSession();
- PreparedStatement ps = getPreparedStatementFromCache(cql);*/
+ /*Session sess = getMusicSession();
+ PreparedStatement ps = getPreparedStatementFromCache(cql);*/
String primaryKey;
if(ti.hasKey()) {
primaryKey = getMusicKeyFromRow(ti,tableName, keys);
@@ -476,13 +476,13 @@ public class MusicMixin implements MusicInterface {
pQueryObject.addValue(pkObj);
updateMusicDB(tableName, primaryKey, pQueryObject);
//if (!repl.equals(myId)) {
- /*logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
- vallist.set(0, repl);
- BoundStatement bound = ps.bind(vallist.toArray());
- bound.setReadTimeoutMillis(60000);
- synchronized (sess) {
- sess.execute(bound);
- }*/
+ /*logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
+ vallist.set(0, repl);
+ BoundStatement bound = ps.bind(vallist.toArray());
+ bound.setReadTimeoutMillis(60000);
+ synchronized (sess) {
+ sess.execute(bound);
+ }*/
//}
}
@@ -514,13 +514,13 @@ public class MusicMixin implements MusicInterface {
if(rt.getResult().getResult().toLowerCase().equals("failure")) {
System.out.println("Failure while cleanDirtyRow..."+rt.getMessage());
}
- /*Session sess = getMusicSession();
- PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(vallist.toArray());
- bound.setReadTimeoutMillis(60000);
- synchronized (sess) {
- sess.execute(bound);
- }*/
+ /*Session sess = getMusicSession();
+ PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(vallist.toArray());
+ bound.setReadTimeoutMillis(60000);
+ synchronized (sess) {
+ sess.execute(bound);
+ }*/
}
/**
* Get a list of "dirty rows" for a table. The dirty rows returned apply only to this replica,
@@ -533,14 +533,14 @@ public class MusicMixin implements MusicInterface {
String cql = String.format("SELECT * FROM %s.DIRTY_%s WHERE REPLICA__=?;", music_ns, tableName);
ResultSet results = null;
logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
-
- /*Session sess = getMusicSession();
- PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(new Object[] { myId });
- bound.setReadTimeoutMillis(60000);
- synchronized (sess) {
- results = sess.execute(bound);
- }*/
+
+ /*Session sess = getMusicSession();
+ PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(new Object[] { myId });
+ bound.setReadTimeoutMillis(60000);
+ synchronized (sess) {
+ results = sess.execute(bound);
+ }*/
PreparedQueryObject pQueryObject = new PreparedQueryObject();
pQueryObject.appendQueryString(cql);
try {
@@ -646,14 +646,14 @@ public class MusicMixin implements MusicInterface {
String cql = String.format("DELETE FROM %s.%s WHERE %s;", music_ns, tableName, where.toString());
logger.error(EELFLoggerDelegate.errorLogger,"Executing MUSIC write:"+ cql);
pQueryObject.appendQueryString(cql);
-
- /*PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(vallist.toArray());
- bound.setReadTimeoutMillis(60000);
- Session sess = getMusicSession();
- synchronized (sess) {
- sess.execute(bound);
- }*/
+
+ /*PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(vallist.toArray());
+ bound.setReadTimeoutMillis(60000);
+ Session sess = getMusicSession();
+ synchronized (sess) {
+ sess.execute(bound);
+ }*/
String primaryKey = getMusicKeyFromRow(ti,tableName, oldRow);
updateMusicDB(tableName, primaryKey, pQueryObject);
@@ -716,15 +716,15 @@ public class MusicMixin implements MusicInterface {
e.printStackTrace();
}
- /*
- Session sess = getMusicSession();
- PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(vallist.toArray());
- bound.setReadTimeoutMillis(60000);
- ResultSet dirtyRows = null;
- synchronized (sess) {
- dirtyRows = sess.execute(bound);
- }*/
+ /*
+ Session sess = getMusicSession();
+ PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(vallist.toArray());
+ bound.setReadTimeoutMillis(60000);
+ ResultSet dirtyRows = null;
+ synchronized (sess) {
+ dirtyRows = sess.execute(bound);
+ }*/
List<Row> rows = dirtyRows.all();
if (rows.isEmpty()) {
// No rows, the row must have been deleted
@@ -771,48 +771,48 @@ public class MusicMixin implements MusicInterface {
}
logger.debug("Blocking rowid: "+rowid);
- in_progress.add(rowid); // Block propagation of the following INSERT/UPDATE
+ in_progress.add(rowid); // Block propagation of the following INSERT/UPDATE
dbi.insertRowIntoSqlDb(tableName, map);
logger.debug("Unblocking rowid: "+rowid);
- in_progress.remove(rowid); // Unblock propagation
-
-// try {
-// String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString());
-// executeSQLWrite(sql);
-// } catch (SQLException e) {
-// logger.debug("Insert failed because row exists, do an update");
-// // TODO - rewrite this UPDATE command should not update key fields
-// String sql = String.format("UPDATE %s SET (%s) = (%s) WHERE %s", tableName, fields.toString(), values.toString(), where.toString());
-// try {
-// executeSQLWrite(sql);
-// } catch (SQLException e1) {
-// e1.printStackTrace();
-// }
-// }
+ in_progress.remove(rowid); // Unblock propagation
+
+// try {
+// String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString());
+// executeSQLWrite(sql);
+// } catch (SQLException e) {
+// logger.debug("Insert failed because row exists, do an update");
+// // TODO - rewrite this UPDATE command should not update key fields
+// String sql = String.format("UPDATE %s SET (%s) = (%s) WHERE %s", tableName, fields.toString(), values.toString(), where.toString());
+// try {
+// executeSQLWrite(sql);
+// } catch (SQLException e1) {
+// e1.printStackTrace();
+// }
+// }
ti = dbi.getTableInfo(tableName);
cleanDirtyRow(ti, tableName, new JSONObject(vallist));
-// String selectQuery = "select "+ primaryKeyName+" FROM "+tableName+" WHERE "+primaryKeyName+"="+primaryKeyValue+";";
-// java.sql.ResultSet rs = executeSQLRead(selectQuery);
-// String dbWriteQuery=null;
-// try {
-// if(rs.next()){//this entry is there, do an update
-// dbWriteQuery = "UPDATE "+tableName+" SET "+columnNameString+" = "+ valueString +"WHERE "+primaryKeyName+"="+primaryKeyValue+";";
-// }else
-// dbWriteQuery = "INSERT INTO "+tableName+" VALUES"+valueString+";";
-// executeSQLWrite(dbWriteQuery);
-// } catch (SQLException e) {
-// // ZZTODO Auto-generated catch block
-// e.printStackTrace();
-// }
+// String selectQuery = "select "+ primaryKeyName+" FROM "+tableName+" WHERE "+primaryKeyName+"="+primaryKeyValue+";";
+// java.sql.ResultSet rs = executeSQLRead(selectQuery);
+// String dbWriteQuery=null;
+// try {
+// if(rs.next()){//this entry is there, do an update
+// dbWriteQuery = "UPDATE "+tableName+" SET "+columnNameString+" = "+ valueString +"WHERE "+primaryKeyName+"="+primaryKeyValue+";";
+// }else
+// dbWriteQuery = "INSERT INTO "+tableName+" VALUES"+valueString+";";
+// executeSQLWrite(dbWriteQuery);
+// } catch (SQLException e) {
+// // ZZTODO Auto-generated catch block
+// e.printStackTrace();
+// }
//clean the music dirty bits table
-// String dirtyRowIdsTableName = music_ns+".DIRTY_"+tableName+"_"+myId;
-// String deleteQuery = "DELETE FROM "+dirtyRowIdsTableName+" WHERE dirtyRowKeys=$$"+primaryKeyValue+"$$;";
-// executeMusicWriteQuery(deleteQuery);
+// String dirtyRowIdsTableName = music_ns+".DIRTY_"+tableName+"_"+myId;
+// String deleteQuery = "DELETE FROM "+dirtyRowIdsTableName+" WHERE dirtyRowKeys=$$"+primaryKeyValue+"$$;";
+// executeMusicWriteQuery(deleteQuery);
}
private Object getValue(Row musicRow, String colname) {
ColumnDefinitions cdef = musicRow.getColumnDefinitions();
@@ -914,14 +914,14 @@ public class MusicMixin implements MusicInterface {
pQueryObject.appendQueryString(cql);
String primaryKey = getMusicKeyFromRow(ti,tableName, changedRow);
updateMusicDB(tableName, primaryKey, pQueryObject);
-
- /*PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(newrow);
- bound.setReadTimeoutMillis(60000);
- Session sess = getMusicSession();
- synchronized (sess) {
- sess.execute(bound);
- }*/
+
+ /*PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(newrow);
+ bound.setReadTimeoutMillis(60000);
+ Session sess = getMusicSession();
+ synchronized (sess) {
+ sess.execute(bound);
+ }*/
// Mark the dirty rows in music for all the replicas but us
markDirtyRow(ti,tableName, changedRow);
}
@@ -978,7 +978,7 @@ public class MusicMixin implements MusicInterface {
if(rt.getResult().getResult().toLowerCase().equals("failure")) {
logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage());
}
-
+
}
/**
@@ -1287,7 +1287,10 @@ public class MusicMixin implements MusicInterface {
@Override
public void commitLog(DatabasePartition partition,List<Range> eventualRanges, StagingTable transactionDigest,
String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException {
-
+
+ // first deal with commit for eventually consistent tables
+ filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper);
+
if(partition==null){
logger.warn("Trying tcommit log with null partition");
return;
@@ -1382,7 +1385,7 @@ public class MusicMixin implements MusicInterface {
throw new MDBCServiceException();
}
- if(!transactionDigest.isEmpty()) {
+ if(!transactionDigest.isEventualEmpty()) {
ByteBuffer serialized = transactionDigest.getSerializedEventuallyStagingAndClean();
if (serialized!=null && useCompression) {
@@ -1513,7 +1516,7 @@ public class MusicMixin implements MusicInterface {
throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e);
}
- return getMRIRowFromCassandraRow(newRow);
+ return getMRIRowFromCassandraRow(newRow);
}
@Override
@@ -1529,18 +1532,18 @@ public class MusicMixin implements MusicInterface {
logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName);
throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e);
}
- return getRangeDependenciesFromCassandraRow(newRow);
+ return getRangeDependenciesFromCassandraRow(newRow);
}
/**
* This function creates the TransactionInformation table. It contain information related
* to the transactions happening in a given partition.
- * * The schema of the table is
- * * Id, uiid.
- * * Partition, uuid id of the partition
- * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables
- * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables
- * * Redo: list of uiids associated to the Redo Records Table
+ * * The schema of the table is
+ * * Id, uiid.
+ * * Partition, uuid id of the partition
+ * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables
+ * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables
+ * * Redo: list of uiids associated to the Redo Records Table
*
*/
public static void createMusicRangeInformationTable(String namespace, String tableName) throws MDBCServiceException {
@@ -1727,8 +1730,8 @@ public class MusicMixin implements MusicInterface {
/**
* This function creates the MusicEveTxDigest table. It contain information related to each eventual transaction committed
- * * LeaseId: id associated with the lease, text
- * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
+ * * LeaseId: id associated with the lease, text
+ * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
* * TransactionDigest: text that contains all the changes in the transaction
*/
public static void createMusicEventualTxDigest(String musicEventualTxDigestTableName, String musicNamespace, int musicTxDigestTableNumber) throws MDBCServiceException {
@@ -1738,11 +1741,12 @@ public class MusicMixin implements MusicInterface {
"-" +
Integer.toString(musicTxDigestTableNumber);
}
- String priKey = "txTimeId";
+ String priKey = "txTimeId, year";
StringBuilder fields = new StringBuilder();
fields.append("txid uuid, ");
fields.append("transactiondigest blob, ");
fields.append("compressed boolean, ");
+ fields.append("year int, ");
fields.append("txTimeId TIMEUUID ");//notice lack of ','
String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
try {
@@ -1832,19 +1836,17 @@ public class MusicMixin implements MusicInterface {
public void addEventualTxDigest(MusicTxDigestId newId, ByteBuffer transactionDigest) throws MDBCServiceException {
//createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest);
PreparedQueryObject query = new PreparedQueryObject();
- String cqlQuery = "INSERT INTO " +
- this.music_ns +
- '.' +
- this.musicEventualTxDigestTableName +
- " (txid,transactiondigest,compressed,txTimeId) " +
- "VALUES (" +
- newId.transactionId+ ",'" +
- transactionDigest + "'," +
- useCompression + ","+
- // "toTimestamp(now())" +
- "now()" +
- ");";
- query.appendQueryString(cqlQuery);
+ int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR);
+
+
+ String cql = String.format("INSERT INTO %s.%s (txid,transactiondigest,compressed,year,txTimeId ) VALUES (?,?,?,?,now());",this.music_ns,
+ this.musicEventualTxDigestTableName);
+ query.appendQueryString(cql);
+ query.addValue( newId.transactionId);
+ query.addValue(transactionDigest);
+ query.addValue(useCompression);
+ query.addValue(year);
+ // query.appendQueryString(cqlQuery);
//\TODO check if I am not shooting on my own foot
try {
MusicCore.nonKeyRelatedPut(query,"critical");
@@ -1885,6 +1887,15 @@ public class MusicMixin implements MusicInterface {
@Override
public LinkedHashMap<UUID, StagingTable> getEveTxDigest(String nodeName) throws MDBCServiceException {
+ int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR);
+ StringBuffer yearSb = new StringBuffer();
+ String sep = "";
+ for (int y=2019; y<=year; y++) {
+ yearSb.append(sep);
+ yearSb.append(y);
+ sep = ",";
+ }
+
StagingTable changes;
String cql;
LinkedHashMap<UUID, StagingTable> ecDigestInformation = new LinkedHashMap<>();
@@ -1893,12 +1904,12 @@ public class MusicMixin implements MusicInterface {
if (musicevetxdigestNodeinfoTimeID != null) {
// this will fetch only few records based on the time-stamp condition.
- cql = String.format("SELECT * FROM %s.%s WHERE txtimeid > ? LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName);
+ cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) AND txtimeid > ? LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString());
pQueryObject.appendQueryString(cql);
pQueryObject.addValue(musicevetxdigestNodeinfoTimeID);
} else {
// This is going to Fetch all the Transactiondigest records from the musicevetxdigest table.
- cql = String.format("SELECT * FROM %s.%s LIMIT 10;", music_ns, this.musicEventualTxDigestTableName);
+ cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString());
pQueryObject.appendQueryString(cql);
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
index dbed9e4..26ee73c 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
@@ -91,7 +91,7 @@ public class StagingTable {
builderInitialized=true;
digestBuilder = CompleteDigest.newBuilder();
this.eventuallyConsistentRanges=eventuallyConsistentRanges;
- eventuallyBuilder = (!this.eventuallyConsistentRanges.isEmpty())?null:CompleteDigest.newBuilder();
+ eventuallyBuilder = (this.eventuallyConsistentRanges.isEmpty())?null:CompleteDigest.newBuilder();
}
public StagingTable(ByteBuffer serialized) throws MDBCServiceException {
@@ -242,7 +242,15 @@ public class StagingTable {
}
synchronized public boolean isEmpty() {
- return (digestBuilder.getRowsCount()==0);
+ return (digestBuilder.getRowsCount()==0 && eventuallyBuilder.getRowsCount()==0);
+ }
+
+ synchronized public boolean isStrongEmpty() {
+ return (digestBuilder.getRowsCount()==0);
+ }
+
+ synchronized public boolean isEventualEmpty() {
+ return (eventuallyBuilder.getRowsCount()==0);
}
synchronized public void clear() throws MDBCServiceException {