diff options
Diffstat (limited to 'mdbc-server')
3 files changed, 193 insertions, 139 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index c176ad9..39b1e21 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -156,7 +156,7 @@ public class MusicMixin implements MusicInterface { static { // We only support the following type mappings currently (from DB -> Cassandra). // Anything else will likely cause a NullPointerException - typemap.put(Types.BIGINT, "BIGINT"); // aka. IDENTITY + typemap.put(Types.BIGINT, "BIGINT"); // aka. IDENTITY typemap.put(Types.BLOB, "VARCHAR"); typemap.put(Types.BOOLEAN, "BOOLEAN"); typemap.put(Types.CLOB, "BLOB"); @@ -169,10 +169,10 @@ public class MusicMixin implements MusicInterface { typemap.put(Types.TIMESTAMP, "VARCHAR"); typemap.put(Types.VARBINARY, "BLOB"); typemap.put(Types.VARCHAR, "VARCHAR"); - typemap.put(Types.CHAR, "VARCHAR"); + typemap.put(Types.CHAR, "VARCHAR"); //The "Hacks", these don't have a direct mapping - //typemap.put(Types.DATE, "VARCHAR"); - //typemap.put(Types.DATE, "TIMESTAMP"); + //typemap.put(Types.DATE, "VARCHAR"); + //typemap.put(Types.DATE, "TIMESTAMP"); } @@ -389,12 +389,12 @@ public class MusicMixin implements MusicInterface { @Override public void createDirtyRowTable(TableInfo ti, String tableName) { // create dirtybitsTable at all replicas -// for (String repl : allReplicaIds) { -//// String dirtyRowsTableName = "dirty_"+tableName+"_"+allReplicaIds[i]; -//// String dirtyTableQuery = "CREATE TABLE IF NOT EXISTS "+music_ns+"."+ dirtyRowsTableName+" (dirtyRowKeys text PRIMARY KEY);"; -// cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s_%s (dirtyRowKeys TEXT PRIMARY KEY);", music_ns, tableName, repl); -// executeMusicWriteQuery(cql); -// } +// for (String repl : allReplicaIds) { +//// String dirtyRowsTableName = "dirty_"+tableName+"_"+allReplicaIds[i]; +//// String dirtyTableQuery = "CREATE TABLE IF NOT EXISTS "+music_ns+"."+ dirtyRowsTableName+" (dirtyRowKeys text PRIMARY KEY);"; +// cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s_%s (dirtyRowKeys TEXT PRIMARY KEY);", music_ns, tableName, repl); +// executeMusicWriteQuery(cql); +// } StringBuilder ddl = new StringBuilder("REPLICA__ TEXT"); StringBuilder cols = new StringBuilder("REPLICA__"); for (int i = 0; i < ti.columns.size(); i++) { @@ -451,8 +451,8 @@ public class MusicMixin implements MusicInterface { System.err.println("markDIrtyRow need to fix primary key"); } String cql = String.format("INSERT INTO %s.DIRTY_%s (%s) VALUES (%s);", music_ns, tableName, cols.toString(), vals.toString()); - /*Session sess = getMusicSession(); - PreparedStatement ps = getPreparedStatementFromCache(cql);*/ + /*Session sess = getMusicSession(); + PreparedStatement ps = getPreparedStatementFromCache(cql);*/ String primaryKey; if(ti.hasKey()) { primaryKey = getMusicKeyFromRow(ti,tableName, keys); @@ -476,13 +476,13 @@ public class MusicMixin implements MusicInterface { pQueryObject.addValue(pkObj); updateMusicDB(tableName, primaryKey, pQueryObject); //if (!repl.equals(myId)) { - /*logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql); - vallist.set(0, repl); - BoundStatement bound = ps.bind(vallist.toArray()); - bound.setReadTimeoutMillis(60000); - synchronized (sess) { - sess.execute(bound); - }*/ + /*logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql); + vallist.set(0, repl); + BoundStatement bound = ps.bind(vallist.toArray()); + bound.setReadTimeoutMillis(60000); + synchronized (sess) { + sess.execute(bound); + }*/ //} } @@ -514,13 +514,13 @@ public class MusicMixin implements MusicInterface { if(rt.getResult().getResult().toLowerCase().equals("failure")) { System.out.println("Failure while cleanDirtyRow..."+rt.getMessage()); } - /*Session sess = getMusicSession(); - PreparedStatement ps = getPreparedStatementFromCache(cql); - BoundStatement bound = ps.bind(vallist.toArray()); - bound.setReadTimeoutMillis(60000); - synchronized (sess) { - sess.execute(bound); - }*/ + /*Session sess = getMusicSession(); + PreparedStatement ps = getPreparedStatementFromCache(cql); + BoundStatement bound = ps.bind(vallist.toArray()); + bound.setReadTimeoutMillis(60000); + synchronized (sess) { + sess.execute(bound); + }*/ } /** * Get a list of "dirty rows" for a table. The dirty rows returned apply only to this replica, @@ -533,14 +533,14 @@ public class MusicMixin implements MusicInterface { String cql = String.format("SELECT * FROM %s.DIRTY_%s WHERE REPLICA__=?;", music_ns, tableName); ResultSet results = null; logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql); - - /*Session sess = getMusicSession(); - PreparedStatement ps = getPreparedStatementFromCache(cql); - BoundStatement bound = ps.bind(new Object[] { myId }); - bound.setReadTimeoutMillis(60000); - synchronized (sess) { - results = sess.execute(bound); - }*/ + + /*Session sess = getMusicSession(); + PreparedStatement ps = getPreparedStatementFromCache(cql); + BoundStatement bound = ps.bind(new Object[] { myId }); + bound.setReadTimeoutMillis(60000); + synchronized (sess) { + results = sess.execute(bound); + }*/ PreparedQueryObject pQueryObject = new PreparedQueryObject(); pQueryObject.appendQueryString(cql); try { @@ -646,14 +646,14 @@ public class MusicMixin implements MusicInterface { String cql = String.format("DELETE FROM %s.%s WHERE %s;", music_ns, tableName, where.toString()); logger.error(EELFLoggerDelegate.errorLogger,"Executing MUSIC write:"+ cql); pQueryObject.appendQueryString(cql); - - /*PreparedStatement ps = getPreparedStatementFromCache(cql); - BoundStatement bound = ps.bind(vallist.toArray()); - bound.setReadTimeoutMillis(60000); - Session sess = getMusicSession(); - synchronized (sess) { - sess.execute(bound); - }*/ + + /*PreparedStatement ps = getPreparedStatementFromCache(cql); + BoundStatement bound = ps.bind(vallist.toArray()); + bound.setReadTimeoutMillis(60000); + Session sess = getMusicSession(); + synchronized (sess) { + sess.execute(bound); + }*/ String primaryKey = getMusicKeyFromRow(ti,tableName, oldRow); updateMusicDB(tableName, primaryKey, pQueryObject); @@ -716,15 +716,15 @@ public class MusicMixin implements MusicInterface { e.printStackTrace(); } - /* - Session sess = getMusicSession(); - PreparedStatement ps = getPreparedStatementFromCache(cql); - BoundStatement bound = ps.bind(vallist.toArray()); - bound.setReadTimeoutMillis(60000); - ResultSet dirtyRows = null; - synchronized (sess) { - dirtyRows = sess.execute(bound); - }*/ + /* + Session sess = getMusicSession(); + PreparedStatement ps = getPreparedStatementFromCache(cql); + BoundStatement bound = ps.bind(vallist.toArray()); + bound.setReadTimeoutMillis(60000); + ResultSet dirtyRows = null; + synchronized (sess) { + dirtyRows = sess.execute(bound); + }*/ List<Row> rows = dirtyRows.all(); if (rows.isEmpty()) { // No rows, the row must have been deleted @@ -771,48 +771,48 @@ public class MusicMixin implements MusicInterface { } logger.debug("Blocking rowid: "+rowid); - in_progress.add(rowid); // Block propagation of the following INSERT/UPDATE + in_progress.add(rowid); // Block propagation of the following INSERT/UPDATE dbi.insertRowIntoSqlDb(tableName, map); logger.debug("Unblocking rowid: "+rowid); - in_progress.remove(rowid); // Unblock propagation - -// try { -// String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString()); -// executeSQLWrite(sql); -// } catch (SQLException e) { -// logger.debug("Insert failed because row exists, do an update"); -// // TODO - rewrite this UPDATE command should not update key fields -// String sql = String.format("UPDATE %s SET (%s) = (%s) WHERE %s", tableName, fields.toString(), values.toString(), where.toString()); -// try { -// executeSQLWrite(sql); -// } catch (SQLException e1) { -// e1.printStackTrace(); -// } -// } + in_progress.remove(rowid); // Unblock propagation + +// try { +// String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString()); +// executeSQLWrite(sql); +// } catch (SQLException e) { +// logger.debug("Insert failed because row exists, do an update"); +// // TODO - rewrite this UPDATE command should not update key fields +// String sql = String.format("UPDATE %s SET (%s) = (%s) WHERE %s", tableName, fields.toString(), values.toString(), where.toString()); +// try { +// executeSQLWrite(sql); +// } catch (SQLException e1) { +// e1.printStackTrace(); +// } +// } ti = dbi.getTableInfo(tableName); cleanDirtyRow(ti, tableName, new JSONObject(vallist)); -// String selectQuery = "select "+ primaryKeyName+" FROM "+tableName+" WHERE "+primaryKeyName+"="+primaryKeyValue+";"; -// java.sql.ResultSet rs = executeSQLRead(selectQuery); -// String dbWriteQuery=null; -// try { -// if(rs.next()){//this entry is there, do an update -// dbWriteQuery = "UPDATE "+tableName+" SET "+columnNameString+" = "+ valueString +"WHERE "+primaryKeyName+"="+primaryKeyValue+";"; -// }else -// dbWriteQuery = "INSERT INTO "+tableName+" VALUES"+valueString+";"; -// executeSQLWrite(dbWriteQuery); -// } catch (SQLException e) { -// // ZZTODO Auto-generated catch block -// e.printStackTrace(); -// } +// String selectQuery = "select "+ primaryKeyName+" FROM "+tableName+" WHERE "+primaryKeyName+"="+primaryKeyValue+";"; +// java.sql.ResultSet rs = executeSQLRead(selectQuery); +// String dbWriteQuery=null; +// try { +// if(rs.next()){//this entry is there, do an update +// dbWriteQuery = "UPDATE "+tableName+" SET "+columnNameString+" = "+ valueString +"WHERE "+primaryKeyName+"="+primaryKeyValue+";"; +// }else +// dbWriteQuery = "INSERT INTO "+tableName+" VALUES"+valueString+";"; +// executeSQLWrite(dbWriteQuery); +// } catch (SQLException e) { +// // ZZTODO Auto-generated catch block +// e.printStackTrace(); +// } //clean the music dirty bits table -// String dirtyRowIdsTableName = music_ns+".DIRTY_"+tableName+"_"+myId; -// String deleteQuery = "DELETE FROM "+dirtyRowIdsTableName+" WHERE dirtyRowKeys=$$"+primaryKeyValue+"$$;"; -// executeMusicWriteQuery(deleteQuery); +// String dirtyRowIdsTableName = music_ns+".DIRTY_"+tableName+"_"+myId; +// String deleteQuery = "DELETE FROM "+dirtyRowIdsTableName+" WHERE dirtyRowKeys=$$"+primaryKeyValue+"$$;"; +// executeMusicWriteQuery(deleteQuery); } private Object getValue(Row musicRow, String colname) { ColumnDefinitions cdef = musicRow.getColumnDefinitions(); @@ -914,14 +914,14 @@ public class MusicMixin implements MusicInterface { pQueryObject.appendQueryString(cql); String primaryKey = getMusicKeyFromRow(ti,tableName, changedRow); updateMusicDB(tableName, primaryKey, pQueryObject); - - /*PreparedStatement ps = getPreparedStatementFromCache(cql); - BoundStatement bound = ps.bind(newrow); - bound.setReadTimeoutMillis(60000); - Session sess = getMusicSession(); - synchronized (sess) { - sess.execute(bound); - }*/ + + /*PreparedStatement ps = getPreparedStatementFromCache(cql); + BoundStatement bound = ps.bind(newrow); + bound.setReadTimeoutMillis(60000); + Session sess = getMusicSession(); + synchronized (sess) { + sess.execute(bound); + }*/ // Mark the dirty rows in music for all the replicas but us markDirtyRow(ti,tableName, changedRow); } @@ -978,7 +978,7 @@ public class MusicMixin implements MusicInterface { if(rt.getResult().getResult().toLowerCase().equals("failure")) { logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage()); } - + } /** @@ -1287,7 +1287,10 @@ public class MusicMixin implements MusicInterface { @Override public void commitLog(DatabasePartition partition,List<Range> eventualRanges, StagingTable transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException { - + + // first deal with commit for eventually consistent tables + filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper); + if(partition==null){ logger.warn("Trying tcommit log with null partition"); return; @@ -1382,7 +1385,7 @@ public class MusicMixin implements MusicInterface { throw new MDBCServiceException(); } - if(!transactionDigest.isEmpty()) { + if(!transactionDigest.isEventualEmpty()) { ByteBuffer serialized = transactionDigest.getSerializedEventuallyStagingAndClean(); if (serialized!=null && useCompression) { @@ -1513,7 +1516,7 @@ public class MusicMixin implements MusicInterface { throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e); } - return getMRIRowFromCassandraRow(newRow); + return getMRIRowFromCassandraRow(newRow); } @Override @@ -1529,18 +1532,18 @@ public class MusicMixin implements MusicInterface { logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName); throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e); } - return getRangeDependenciesFromCassandraRow(newRow); + return getRangeDependenciesFromCassandraRow(newRow); } /** * This function creates the TransactionInformation table. It contain information related * to the transactions happening in a given partition. - * * The schema of the table is - * * Id, uiid. - * * Partition, uuid id of the partition - * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables - * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables - * * Redo: list of uiids associated to the Redo Records Table + * * The schema of the table is + * * Id, uiid. + * * Partition, uuid id of the partition + * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables + * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables + * * Redo: list of uiids associated to the Redo Records Table * */ public static void createMusicRangeInformationTable(String namespace, String tableName) throws MDBCServiceException { @@ -1727,8 +1730,8 @@ public class MusicMixin implements MusicInterface { /** * This function creates the MusicEveTxDigest table. It contain information related to each eventual transaction committed - * * LeaseId: id associated with the lease, text - * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later + * * LeaseId: id associated with the lease, text + * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later * * TransactionDigest: text that contains all the changes in the transaction */ public static void createMusicEventualTxDigest(String musicEventualTxDigestTableName, String musicNamespace, int musicTxDigestTableNumber) throws MDBCServiceException { @@ -1738,11 +1741,12 @@ public class MusicMixin implements MusicInterface { "-" + Integer.toString(musicTxDigestTableNumber); } - String priKey = "txTimeId"; + String priKey = "txTimeId, year"; StringBuilder fields = new StringBuilder(); fields.append("txid uuid, "); fields.append("transactiondigest blob, "); fields.append("compressed boolean, "); + fields.append("year int, "); fields.append("txTimeId TIMEUUID ");//notice lack of ',' String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey); try { @@ -1832,19 +1836,17 @@ public class MusicMixin implements MusicInterface { public void addEventualTxDigest(MusicTxDigestId newId, ByteBuffer transactionDigest) throws MDBCServiceException { //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest); PreparedQueryObject query = new PreparedQueryObject(); - String cqlQuery = "INSERT INTO " + - this.music_ns + - '.' + - this.musicEventualTxDigestTableName + - " (txid,transactiondigest,compressed,txTimeId) " + - "VALUES (" + - newId.transactionId+ ",'" + - transactionDigest + "'," + - useCompression + ","+ - // "toTimestamp(now())" + - "now()" + - ");"; - query.appendQueryString(cqlQuery); + int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR); + + + String cql = String.format("INSERT INTO %s.%s (txid,transactiondigest,compressed,year,txTimeId ) VALUES (?,?,?,?,now());",this.music_ns, + this.musicEventualTxDigestTableName); + query.appendQueryString(cql); + query.addValue( newId.transactionId); + query.addValue(transactionDigest); + query.addValue(useCompression); + query.addValue(year); + // query.appendQueryString(cqlQuery); //\TODO check if I am not shooting on my own foot try { MusicCore.nonKeyRelatedPut(query,"critical"); @@ -1885,6 +1887,15 @@ public class MusicMixin implements MusicInterface { @Override public LinkedHashMap<UUID, StagingTable> getEveTxDigest(String nodeName) throws MDBCServiceException { + int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR); + StringBuffer yearSb = new StringBuffer(); + String sep = ""; + for (int y=2019; y<=year; y++) { + yearSb.append(sep); + yearSb.append(y); + sep = ","; + } + StagingTable changes; String cql; LinkedHashMap<UUID, StagingTable> ecDigestInformation = new LinkedHashMap<>(); @@ -1893,12 +1904,12 @@ public class MusicMixin implements MusicInterface { if (musicevetxdigestNodeinfoTimeID != null) { // this will fetch only few records based on the time-stamp condition. - cql = String.format("SELECT * FROM %s.%s WHERE txtimeid > ? LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName); + cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) AND txtimeid > ? LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString()); pQueryObject.appendQueryString(cql); pQueryObject.addValue(musicevetxdigestNodeinfoTimeID); } else { // This is going to Fetch all the Transactiondigest records from the musicevetxdigest table. - cql = String.format("SELECT * FROM %s.%s LIMIT 10;", music_ns, this.musicEventualTxDigestTableName); + cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString()); pQueryObject.appendQueryString(cql); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java index dbed9e4..26ee73c 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java @@ -91,7 +91,7 @@ public class StagingTable { builderInitialized=true; digestBuilder = CompleteDigest.newBuilder(); this.eventuallyConsistentRanges=eventuallyConsistentRanges; - eventuallyBuilder = (!this.eventuallyConsistentRanges.isEmpty())?null:CompleteDigest.newBuilder(); + eventuallyBuilder = (this.eventuallyConsistentRanges.isEmpty())?null:CompleteDigest.newBuilder(); } public StagingTable(ByteBuffer serialized) throws MDBCServiceException { @@ -242,7 +242,15 @@ public class StagingTable { } synchronized public boolean isEmpty() { - return (digestBuilder.getRowsCount()==0); + return (digestBuilder.getRowsCount()==0 && eventuallyBuilder.getRowsCount()==0); + } + + synchronized public boolean isStrongEmpty() { + return (digestBuilder.getRowsCount()==0); + } + + synchronized public boolean isEventualEmpty() { + return (eventuallyBuilder.getRowsCount()==0); } synchronized public void clear() throws MDBCServiceException { diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java index aba8cb4..41a943e 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java @@ -20,24 +20,22 @@ package org.onap.music.mdbc.mixins; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.junit.Assert.*; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.Session; - -import java.util.*; - +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.UUID; +import java.util.function.Consumer; import org.cassandraunit.utils.EmbeddedCassandraServerHelper; - - import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; -import org.junit.rules.Timeout; import org.onap.music.datastore.MusicDataStore; import org.onap.music.datastore.MusicDataStoreHandle; import org.onap.music.exceptions.MDBCServiceException; @@ -51,12 +49,13 @@ import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.StateManager; -import org.onap.music.mdbc.TestUtils; -import org.onap.music.mdbc.ownership.Dag; -import org.onap.music.mdbc.ownership.DagNode; +import org.onap.music.mdbc.proto.ProtoDigest.Digest.CompleteDigest; import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.MusicTxDigestId; -import org.onap.music.service.impl.MusicCassaCore; +import org.onap.music.mdbc.tables.StagingTable; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import com.google.protobuf.InvalidProtocolBufferException; public class MusicMixinTest { @@ -109,6 +108,7 @@ public class MusicMixinTest { Properties properties = new Properties(); properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,keyspace); properties.setProperty(MusicMixin.KEY_MY_ID,mdbcServerName); + properties.setProperty(MusicMixin.KEY_COMPRESSION, Boolean.toString(true)); mixin=new MusicMixin(stateManager, mdbcServerName,properties); } catch (MDBCServiceException e) { fail("error creating music mixin"); @@ -246,4 +246,39 @@ public class MusicMixinTest { @Test public void relinquishIfRequired() { } + + @Test + public void getEveTxDigest() throws Exception { + + mixin.createMusicEventualTxDigest(); + ByteBuffer compressed = mockCompressedProtoByteBuff(); + MusicTxDigestId digestId = new MusicTxDigestId(UUID.randomUUID(), 1); + mixin.addEventualTxDigest(digestId, compressed); + + LinkedHashMap<UUID, StagingTable> digest = mixin.getEveTxDigest("n1"); + + Consumer<Map.Entry<UUID,StagingTable>> consumer = new Consumer<Map.Entry<UUID,StagingTable>>() { + + @Override + public void accept(Entry<UUID, StagingTable> mapEntry) { + assertNotNull(mapEntry.getValue()); + } + + }; + + digest.entrySet().forEach(consumer); + + + + + } + + protected ByteBuffer mockCompressedProtoByteBuff() throws MDBCServiceException, InvalidProtocolBufferException { + CompleteDigest instance = CompleteDigest.getDefaultInstance(); + // CompleteDigest instance = CompleteDigest.parseFrom(ByteBuffer.wrap("Test".getBytes())); + byte[] bytes = instance.toByteArray(); + ByteBuffer serialized = ByteBuffer.wrap(bytes); + ByteBuffer compressed = StagingTable.Compress(serialized); + return compressed; + } } |