diff options
5 files changed, 171 insertions, 58 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index b403dd2..6cc50ec 100755..100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -198,6 +198,14 @@ public class StateManager { eventualLock.unlock(); } } + + public String getMdbcServerName() { + return mdbcServerName; + } + + public void setMdbcServerName(String mdbcServerName) { + this.mdbcServerName = mdbcServerName; + } public void closeConnection(String connectionId){ //\TODO check if there is a race condition diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java index 338e697..c5c2b99 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java @@ -89,7 +89,7 @@ public class MdbcEventualTestClient { final String insertSQL = "INSERT INTO audit_log VALUES (1, 123, 123456789);"; final String insertSQL1 = "DELETE FROM audit_log WHERE PersonID=1;"; - final String insertSQL2 = "INSERT INTO audit_log VALUES (1, 123, 123456789);"; + final String insertSQL2 = "INSERT INTO audit_log VALUES (3, 123, 123456789);"; final String insertSQL3 = "UPDATE audit_log SET PersonID=124 where id=1;"; final String insertSQL4 = "INSERT INTO audit_log VALUES (2, 234, 123456789);"; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java index 49d4c71..22c532b 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java @@ -20,11 +20,7 @@ package org.onap.music.mdbc.mixins; import com.datastax.driver.core.ResultSet; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import org.json.JSONObject; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.exceptions.MusicLockingException; @@ -259,6 +255,15 @@ public interface MusicInterface { * @throws MDBCServiceException */ HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException; + + /** + * Function used to retrieve a given eventual transaction digest for the current node and deserialize it + * @param nodeName that identifies a node + * @return the deserialize transaction digest that can be applied to the local SQL database + * @throws MDBCServiceException + */ + + public LinkedHashMap<UUID, HashMap<Range,StagingTable>> getEveTxDigest(String nodeName) throws MDBCServiceException; /** * Use this functions to verify ownership, and own new ranges @@ -320,9 +325,9 @@ public interface MusicInterface { List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException; OwnershipAndCheckpoint getOwnAndCheck(); - - ArrayList<HashMap<Range, StagingTable>> getEveTxDigest() throws MDBCServiceException; void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException; + + public void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index 999c67f..963647c 100755..100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -38,7 +39,6 @@ import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiFunction; -import org.apache.commons.lang.NotImplementedException; import org.apache.commons.lang3.tuple.Pair; import org.json.JSONObject; import org.onap.music.datastore.Condition; @@ -58,7 +58,12 @@ import org.onap.music.mdbc.TableInfo; import org.onap.music.mdbc.ownership.Dag; import org.onap.music.mdbc.ownership.DagNode; import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint; -import org.onap.music.mdbc.tables.*; +import org.onap.music.mdbc.tables.MriReference; +import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.mdbc.tables.MusicTxDigestId; +import org.onap.music.mdbc.tables.RangeDependency; +import org.onap.music.mdbc.tables.StagingTable; +import org.onap.music.mdbc.tables.TxCommitProgress; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.ColumnDefinitions; import com.datastax.driver.core.DataType; @@ -119,6 +124,7 @@ public class MusicMixin implements MusicInterface { private String musicEventualTxDigestTableName = "musicevetxdigest"; private String musicRangeInformationTableName = "musicrangeinformation"; private String musicRangeDependencyTableName = "musicrangedependency"; + private String musicNodeInfoTableName = "nodeinfo"; private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicMixin.class); @@ -336,6 +342,7 @@ public class MusicMixin implements MusicInterface { try { createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number createMusicEventualTxDigest(); + createMusicNodeInfoTable(); createMusicRangeInformationTable(this.music_ns,this.musicRangeInformationTableName); createMusicRangeDependencyTable(); } @@ -982,12 +989,7 @@ public class MusicMixin implements MusicInterface { if(rt.getResult().getResult().toLowerCase().equals("failure")) { logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage()); } - /*Session sess = getMusicSession(); - SimpleStatement s = new SimpleStatement(cql); - s.setReadTimeoutMillis(60000); - synchronized (sess) { - sess.execute(s); - }*/ + } /** @@ -1319,6 +1321,9 @@ public class MusicMixin implements MusicInterface { @Override public void commitLog(DatabasePartition partition,List<Range> eventualRanges, HashMap<Range,StagingTable> transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{ + // first deal with commit for eventually consistent tables + filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper); + if(partition==null){ logger.warn("Trying tcommit log with null partition"); return; @@ -1328,8 +1333,7 @@ public class MusicMixin implements MusicInterface { logger.warn("Trying to commit log with empty ranges"); return; } - // first deal with commit for eventually consistent tables - filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper); + UUID mriIndex = partition.getMRIIndex(); String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex; @@ -1916,28 +1920,36 @@ public class MusicMixin implements MusicInterface { @Override - public ArrayList<HashMap<Range,StagingTable>> getEveTxDigest() throws MDBCServiceException { + public LinkedHashMap<UUID, HashMap<Range,StagingTable>> getEveTxDigest(String nodeName) throws MDBCServiceException { HashMap<Range,StagingTable> changes; - ArrayList<HashMap<Range,StagingTable>> ecDigestList = new ArrayList<HashMap<Range,StagingTable>>(); + String cql; + LinkedHashMap<UUID, HashMap<Range,StagingTable>> ecDigestInformation = new LinkedHashMap<UUID, HashMap<Range,StagingTable>>(); + String musicevetxdigestNodeinfoTimeID = getTxTimeIdFromNodeInfo(nodeName); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + + if (musicevetxdigestNodeinfoTimeID != null && !musicevetxdigestNodeinfoTimeID.isEmpty() ) { + // this will fetch only few records based on the time-stamp condition. + cql = String.format("SELECT * FROM %s.%s WHERE txtimeid > ?;", music_ns, this.musicEventualTxDigestTableName); + pQueryObject.appendQueryString(cql); + pQueryObject.addValue(musicevetxdigestNodeinfoTimeID); - //but was this timestamp is getting added as per post: https://dev.mysql.com/doc/refman/8.0/en/time-zone-leap-seconds.html - //Ex1: SELECT uuid, txDigest, UNIX_TIMESTAMP(ts) FROM ectxdigest ORDER by ts; - //Ex2: SELECT * FROM ectxdigest ORDER by ts; or SELECT * FROM ectxdigest - //####### this will pull all records.. but REPLAY will be against specific records once the NODE it back ON-Line. - // I should get the last record timestamp so that I can put a where condition. - //EX3: SELECT uuid, txDigest, UNIX_TIMESTAMP(ts) FROM ectxdigest where UNIX_TIMESTAMP(ts)>UNIX_TIMESTAMP(<<Date/Time value from others>>) ORDER by ts; - String cql = String.format("SELECT * FROM %s.%s ;", music_ns, this.musicEventualTxDigestTableName); - // Ex 1 & 2 might return millions of records!! things to consider outOfMemory issue, performance issue etc.. How to overcome?? - // Ex 3: will return less records compare to Ex:1 and Ex:2. - - // I need to get a ResultSet of all the records and give each row to the below HashMap. - ResultSet rs = executeMusicRead(cql); + } else { + // This is going to Fetch all the Transactiondigest records from the musicevetxdigest table. + cql = String.format("SELECT * FROM %s.%s ;", music_ns, this.musicEventualTxDigestTableName); + pQueryObject.appendQueryString(cql); + } + + // I need to get a ResultSet of all the records and give each row to the below HashMap. + ResultSet rs = executeMusicRead(pQueryObject.getQuery()); while (!rs.isExhausted()) { Row row = rs.one(); - String digest = row.getString("transactiondigest"); + String digest = row.getString("transactiondigest"); + //String txTimeId = row.getString("txtimeid"); //??? + UUID txTimeId = row.getUUID("txtimeid"); try { changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest); + } catch (IOException e) { logger.error("IOException when deserializing digest"); throw new MDBCServiceException("Deserializng digest failed with ioexception", e); @@ -1946,10 +1958,10 @@ public class MusicMixin implements MusicInterface { throw new MDBCServiceException("Deserializng digest failed with an invalid class", e); } - ecDigestList.add(changes); + ecDigestInformation.put(txTimeId, changes); + } - - return ecDigestList; + return ecDigestInformation; } @Override @@ -2477,8 +2489,7 @@ public class MusicMixin implements MusicInterface { return result.one(); } - private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject) - throws MDBCServiceException{ + private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject) throws MDBCServiceException{ ResultSet result = MusicCore.quorumGet(cqlObject); if(result == null || result.isExhausted()){ throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]"); @@ -2546,5 +2557,89 @@ public class MusicMixin implements MusicInterface { public OwnershipAndCheckpoint getOwnAndCheck(){ return ownAndCheck; } + + @Override + public void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException{ + + String cql = String.format("UPDATE %s.%s SET txtimeid = (%s), txupdatedatetime = now() WHERE nodename = ?;", music_ns, this.musicEventualTxDigestTableName, txTimeID); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + pQueryObject.addValue(nodeName); + + executeMusicWriteQuery(pQueryObject.getQuery()); + logger.info("Successfully updated nodeinfo table with txtimeid value: " + txTimeID + " against the node:" + nodeName); + + + } + + public void createMusicNodeInfoTable() throws MDBCServiceException { + createMusicNodeInfoTable(-1); + } + + /** + * This function creates the NodeInfo table. It contain information related + * to the nodes along with the updated transactionDigest details. + * * The schema of the table is + * * nodeId, uuid. + * * nodeName, text or varchar?? for now I am going ahead with "text". + * * createDateTime, TIMEUUID. + * * TxUpdateDateTime, TIMEUUID. + * * TxTimeID, TIMEUUID. + * * LastTxDigestID, uuid. (not needed as of now!!) + */ + private void createMusicNodeInfoTable(int nodeInfoTableNumber) throws MDBCServiceException { + String tableName = this.musicNodeInfoTableName; + if(nodeInfoTableNumber >= 0) { + tableName = tableName + + "-" + + Integer.toString(nodeInfoTableNumber); + } + + String priKey = "nodename"; + StringBuilder fields = new StringBuilder(); + fields.append("nodename text, "); + fields.append("createdatetime TIMEUUID, "); + fields.append("txupdatedatetime TIMEUUID, "); + fields.append("txtimeid TIMEUUID "); + //fields.append("LastTxDigestID uuid ");// Not needed as of now! + + String cql = String.format( + "CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", + this.music_ns, + tableName, + fields, + priKey); + + try { + executeMusicWriteQuery(this.music_ns,tableName,cql); + } catch (MDBCServiceException e) { + logger.error("Initialization error: Failure to create node information table"); + throw(e); + } + } + + public String getTxTimeIdFromNodeInfo(String nodeName) throws MDBCServiceException { + // expecting NodeName from base-0.json file: which is : NJNode + //String nodeName = MdbcServer.stateManager.getMdbcServerName(); + // this retrieves the NJNode row from Cassandra's NodeInfo table so that I can retrieve TimeStamp for further processing. + String cql = String.format("SELECT txtimeid FROM %s.%s WHERE nodeName = ?;", music_ns, musicNodeInfoTableName); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + pQueryObject.addValue(nodeName); + Row newRow; + try { + newRow = executeMusicUnlockedQuorumGet(pQueryObject); + } catch (MDBCServiceException e) { + logger.error("Get operation error: Failure to get row from nodeinfo with nodename:"+nodeName); + // TODO check underlying exception if no data and return empty string + return ""; + //throw new MDBCServiceException("error:Failure to retrive nodeinfo details information", e); + } + + String txtimeid = newRow.getString("txtimeid"); + + return txtimeid; + } + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java index 3b6953c..204292c 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java @@ -104,26 +104,31 @@ public class MusicTxDigest { * @param dbi interface to the database that will replay the operations * @throws MDBCServiceException */ - public static void replayDigest(MusicInterface mi, DBInterface dbi) throws MDBCServiceException { - //List<MusicTxDigestId> partitionsRedoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog(); - //From where should I fetch TransactionsIDs ??? from NEW TABLE ?? or EXISING TABLE ?? << what the new SITE_TABLE details?? - // --> It is a new table called ECTxDigest - //I should sort/ call a method which gives all the entires of a table based on the time-stamp from Low to High - - ArrayList<HashMap<Range, StagingTable>> ecTxDigest = mi.getEveTxDigest(); - - //for (MusicTxDigestId txId: partitionsRedoLogTxIds) { // partitionsRedoLogTxIds --> this comes from new table where timeStamp > currentTimeStamp ( THIS SHOULD BE lessthan.. which is ASC order) - //HashMap<Range, StagingTable> transaction = mi2.getEcTxDigest(); // Getting records from musictxdigest TABLE. - for (HashMap<Range, StagingTable> transaction: ecTxDigest) { - try { - dbi.replayTransaction(transaction); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??) - } catch (SQLException e) { - logger.error("EC:Rolling back the entire digest replay."); - return; - } - logger.info("EC: Successfully replayed transaction "); - } - } + public void replayDigest(MusicInterface mi, DBInterface dbi) throws MDBCServiceException { + HashMap<Range, StagingTable> transaction; + String nodeName = stateManager.getMdbcServerName(); + logger.info("Node Name: "+nodeName); + LinkedHashMap<UUID, HashMap<Range,StagingTable>> ecDigestInformation = mi.getEveTxDigest(nodeName); + Set<UUID> keys = ecDigestInformation.keySet(); + + for(UUID txTimeID:keys){ + transaction = (HashMap<Range,StagingTable>) ecDigestInformation.get(txTimeID); + + try { + dbi.replayTransaction(transaction); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??) + } catch (SQLException e) { + logger.error("EC:Rolling back the entire digest replay."); + return; + } + logger.info("EC: Successfully replayed transaction for txTimeID key: "+txTimeID); + + try { + mi.updateNodeInfoTableWithTxTimeIDKey(txTimeID, nodeName); + } catch (MDBCServiceException e) { + logger.error("EC:Rolling back the entire digest replay."); + } + } + } /** |