aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--[-rwxr-xr-x]mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java8
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java2
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java19
-rw-r--r--[-rwxr-xr-x]mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java155
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java45
5 files changed, 171 insertions, 58 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index b403dd2..6cc50ec 100755..100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -198,6 +198,14 @@ public class StateManager {
eventualLock.unlock();
}
}
+
+ public String getMdbcServerName() {
+ return mdbcServerName;
+ }
+
+ public void setMdbcServerName(String mdbcServerName) {
+ this.mdbcServerName = mdbcServerName;
+ }
public void closeConnection(String connectionId){
//\TODO check if there is a race condition
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java
index 338e697..c5c2b99 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java
@@ -89,7 +89,7 @@ public class MdbcEventualTestClient {
final String insertSQL = "INSERT INTO audit_log VALUES (1, 123, 123456789);";
final String insertSQL1 = "DELETE FROM audit_log WHERE PersonID=1;";
- final String insertSQL2 = "INSERT INTO audit_log VALUES (1, 123, 123456789);";
+ final String insertSQL2 = "INSERT INTO audit_log VALUES (3, 123, 123456789);";
final String insertSQL3 = "UPDATE audit_log SET PersonID=124 where id=1;";
final String insertSQL4 = "INSERT INTO audit_log VALUES (2, 234, 123456789);";
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
index 49d4c71..22c532b 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
@@ -20,11 +20,7 @@
package org.onap.music.mdbc.mixins;
import com.datastax.driver.core.ResultSet;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
import org.json.JSONObject;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.exceptions.MusicLockingException;
@@ -259,6 +255,15 @@ public interface MusicInterface {
* @throws MDBCServiceException
*/
HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException;
+
+ /**
+ * Function used to retrieve a given eventual transaction digest for the current node and deserialize it
+ * @param nodeName that identifies a node
+ * @return the deserialize transaction digest that can be applied to the local SQL database
+ * @throws MDBCServiceException
+ */
+
+ public LinkedHashMap<UUID, HashMap<Range,StagingTable>> getEveTxDigest(String nodeName) throws MDBCServiceException;
/**
* Use this functions to verify ownership, and own new ranges
@@ -320,9 +325,9 @@ public interface MusicInterface {
List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException;
OwnershipAndCheckpoint getOwnAndCheck();
-
- ArrayList<HashMap<Range, StagingTable>> getEveTxDigest() throws MDBCServiceException;
void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException;
+
+ public void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index 999c67f..963647c 100755..100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -38,7 +39,6 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONObject;
import org.onap.music.datastore.Condition;
@@ -58,7 +58,12 @@ import org.onap.music.mdbc.TableInfo;
import org.onap.music.mdbc.ownership.Dag;
import org.onap.music.mdbc.ownership.DagNode;
import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
-import org.onap.music.mdbc.tables.*;
+import org.onap.music.mdbc.tables.MriReference;
+import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
+import org.onap.music.mdbc.tables.RangeDependency;
+import org.onap.music.mdbc.tables.StagingTable;
+import org.onap.music.mdbc.tables.TxCommitProgress;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.DataType;
@@ -119,6 +124,7 @@ public class MusicMixin implements MusicInterface {
private String musicEventualTxDigestTableName = "musicevetxdigest";
private String musicRangeInformationTableName = "musicrangeinformation";
private String musicRangeDependencyTableName = "musicrangedependency";
+ private String musicNodeInfoTableName = "nodeinfo";
private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicMixin.class);
@@ -336,6 +342,7 @@ public class MusicMixin implements MusicInterface {
try {
createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number
createMusicEventualTxDigest();
+ createMusicNodeInfoTable();
createMusicRangeInformationTable(this.music_ns,this.musicRangeInformationTableName);
createMusicRangeDependencyTable();
}
@@ -982,12 +989,7 @@ public class MusicMixin implements MusicInterface {
if(rt.getResult().getResult().toLowerCase().equals("failure")) {
logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage());
}
- /*Session sess = getMusicSession();
- SimpleStatement s = new SimpleStatement(cql);
- s.setReadTimeoutMillis(60000);
- synchronized (sess) {
- sess.execute(s);
- }*/
+
}
/**
@@ -1319,6 +1321,9 @@ public class MusicMixin implements MusicInterface {
@Override
public void commitLog(DatabasePartition partition,List<Range> eventualRanges, HashMap<Range,StagingTable> transactionDigest,
String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{
+ // first deal with commit for eventually consistent tables
+ filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper);
+
if(partition==null){
logger.warn("Trying tcommit log with null partition");
return;
@@ -1328,8 +1333,7 @@ public class MusicMixin implements MusicInterface {
logger.warn("Trying to commit log with empty ranges");
return;
}
- // first deal with commit for eventually consistent tables
- filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper);
+
UUID mriIndex = partition.getMRIIndex();
String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex;
@@ -1916,28 +1920,36 @@ public class MusicMixin implements MusicInterface {
@Override
- public ArrayList<HashMap<Range,StagingTable>> getEveTxDigest() throws MDBCServiceException {
+ public LinkedHashMap<UUID, HashMap<Range,StagingTable>> getEveTxDigest(String nodeName) throws MDBCServiceException {
HashMap<Range,StagingTable> changes;
- ArrayList<HashMap<Range,StagingTable>> ecDigestList = new ArrayList<HashMap<Range,StagingTable>>();
+ String cql;
+ LinkedHashMap<UUID, HashMap<Range,StagingTable>> ecDigestInformation = new LinkedHashMap<UUID, HashMap<Range,StagingTable>>();
+ String musicevetxdigestNodeinfoTimeID = getTxTimeIdFromNodeInfo(nodeName);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+
+ if (musicevetxdigestNodeinfoTimeID != null && !musicevetxdigestNodeinfoTimeID.isEmpty() ) {
+ // this will fetch only few records based on the time-stamp condition.
+ cql = String.format("SELECT * FROM %s.%s WHERE txtimeid > ?;", music_ns, this.musicEventualTxDigestTableName);
+ pQueryObject.appendQueryString(cql);
+ pQueryObject.addValue(musicevetxdigestNodeinfoTimeID);
- //but was this timestamp is getting added as per post: https://dev.mysql.com/doc/refman/8.0/en/time-zone-leap-seconds.html
- //Ex1: SELECT uuid, txDigest, UNIX_TIMESTAMP(ts) FROM ectxdigest ORDER by ts;
- //Ex2: SELECT * FROM ectxdigest ORDER by ts; or SELECT * FROM ectxdigest
- //####### this will pull all records.. but REPLAY will be against specific records once the NODE it back ON-Line.
- // I should get the last record timestamp so that I can put a where condition.
- //EX3: SELECT uuid, txDigest, UNIX_TIMESTAMP(ts) FROM ectxdigest where UNIX_TIMESTAMP(ts)>UNIX_TIMESTAMP(<<Date/Time value from others>>) ORDER by ts;
- String cql = String.format("SELECT * FROM %s.%s ;", music_ns, this.musicEventualTxDigestTableName);
- // Ex 1 & 2 might return millions of records!! things to consider outOfMemory issue, performance issue etc.. How to overcome??
- // Ex 3: will return less records compare to Ex:1 and Ex:2.
-
- // I need to get a ResultSet of all the records and give each row to the below HashMap.
- ResultSet rs = executeMusicRead(cql);
+ } else {
+ // This is going to Fetch all the Transactiondigest records from the musicevetxdigest table.
+ cql = String.format("SELECT * FROM %s.%s ;", music_ns, this.musicEventualTxDigestTableName);
+ pQueryObject.appendQueryString(cql);
+ }
+
+ // I need to get a ResultSet of all the records and give each row to the below HashMap.
+ ResultSet rs = executeMusicRead(pQueryObject.getQuery());
while (!rs.isExhausted()) {
Row row = rs.one();
- String digest = row.getString("transactiondigest");
+ String digest = row.getString("transactiondigest");
+ //String txTimeId = row.getString("txtimeid"); //???
+ UUID txTimeId = row.getUUID("txtimeid");
try {
changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest);
+
} catch (IOException e) {
logger.error("IOException when deserializing digest");
throw new MDBCServiceException("Deserializng digest failed with ioexception", e);
@@ -1946,10 +1958,10 @@ public class MusicMixin implements MusicInterface {
throw new MDBCServiceException("Deserializng digest failed with an invalid class", e);
}
- ecDigestList.add(changes);
+ ecDigestInformation.put(txTimeId, changes);
+
}
-
- return ecDigestList;
+ return ecDigestInformation;
}
@Override
@@ -2477,8 +2489,7 @@ public class MusicMixin implements MusicInterface {
return result.one();
}
- private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject)
- throws MDBCServiceException{
+ private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject) throws MDBCServiceException{
ResultSet result = MusicCore.quorumGet(cqlObject);
if(result == null || result.isExhausted()){
throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]");
@@ -2546,5 +2557,89 @@ public class MusicMixin implements MusicInterface {
public OwnershipAndCheckpoint getOwnAndCheck(){
return ownAndCheck;
}
+
+ @Override
+ public void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException{
+
+ String cql = String.format("UPDATE %s.%s SET txtimeid = (%s), txupdatedatetime = now() WHERE nodename = ?;", music_ns, this.musicEventualTxDigestTableName, txTimeID);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ pQueryObject.addValue(nodeName);
+
+ executeMusicWriteQuery(pQueryObject.getQuery());
+ logger.info("Successfully updated nodeinfo table with txtimeid value: " + txTimeID + " against the node:" + nodeName);
+
+
+ }
+
+ public void createMusicNodeInfoTable() throws MDBCServiceException {
+ createMusicNodeInfoTable(-1);
+ }
+
+ /**
+ * This function creates the NodeInfo table. It contain information related
+ * to the nodes along with the updated transactionDigest details.
+ * * The schema of the table is
+ * * nodeId, uuid.
+ * * nodeName, text or varchar?? for now I am going ahead with "text".
+ * * createDateTime, TIMEUUID.
+ * * TxUpdateDateTime, TIMEUUID.
+ * * TxTimeID, TIMEUUID.
+ * * LastTxDigestID, uuid. (not needed as of now!!)
+ */
+ private void createMusicNodeInfoTable(int nodeInfoTableNumber) throws MDBCServiceException {
+ String tableName = this.musicNodeInfoTableName;
+ if(nodeInfoTableNumber >= 0) {
+ tableName = tableName +
+ "-" +
+ Integer.toString(nodeInfoTableNumber);
+ }
+
+ String priKey = "nodename";
+ StringBuilder fields = new StringBuilder();
+ fields.append("nodename text, ");
+ fields.append("createdatetime TIMEUUID, ");
+ fields.append("txupdatedatetime TIMEUUID, ");
+ fields.append("txtimeid TIMEUUID ");
+ //fields.append("LastTxDigestID uuid ");// Not needed as of now!
+
+ String cql = String.format(
+ "CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));",
+ this.music_ns,
+ tableName,
+ fields,
+ priKey);
+
+ try {
+ executeMusicWriteQuery(this.music_ns,tableName,cql);
+ } catch (MDBCServiceException e) {
+ logger.error("Initialization error: Failure to create node information table");
+ throw(e);
+ }
+ }
+
+ public String getTxTimeIdFromNodeInfo(String nodeName) throws MDBCServiceException {
+ // expecting NodeName from base-0.json file: which is : NJNode
+ //String nodeName = MdbcServer.stateManager.getMdbcServerName();
+ // this retrieves the NJNode row from Cassandra's NodeInfo table so that I can retrieve TimeStamp for further processing.
+ String cql = String.format("SELECT txtimeid FROM %s.%s WHERE nodeName = ?;", music_ns, musicNodeInfoTableName);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ pQueryObject.addValue(nodeName);
+ Row newRow;
+ try {
+ newRow = executeMusicUnlockedQuorumGet(pQueryObject);
+ } catch (MDBCServiceException e) {
+ logger.error("Get operation error: Failure to get row from nodeinfo with nodename:"+nodeName);
+ // TODO check underlying exception if no data and return empty string
+ return "";
+ //throw new MDBCServiceException("error:Failure to retrive nodeinfo details information", e);
+ }
+
+ String txtimeid = newRow.getString("txtimeid");
+
+ return txtimeid;
+ }
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
index 3b6953c..204292c 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
@@ -104,26 +104,31 @@ public class MusicTxDigest {
* @param dbi interface to the database that will replay the operations
* @throws MDBCServiceException
*/
- public static void replayDigest(MusicInterface mi, DBInterface dbi) throws MDBCServiceException {
- //List<MusicTxDigestId> partitionsRedoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog();
- //From where should I fetch TransactionsIDs ??? from NEW TABLE ?? or EXISING TABLE ?? << what the new SITE_TABLE details??
- // --> It is a new table called ECTxDigest
- //I should sort/ call a method which gives all the entires of a table based on the time-stamp from Low to High
-
- ArrayList<HashMap<Range, StagingTable>> ecTxDigest = mi.getEveTxDigest();
-
- //for (MusicTxDigestId txId: partitionsRedoLogTxIds) { // partitionsRedoLogTxIds --> this comes from new table where timeStamp > currentTimeStamp ( THIS SHOULD BE lessthan.. which is ASC order)
- //HashMap<Range, StagingTable> transaction = mi2.getEcTxDigest(); // Getting records from musictxdigest TABLE.
- for (HashMap<Range, StagingTable> transaction: ecTxDigest) {
- try {
- dbi.replayTransaction(transaction); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??)
- } catch (SQLException e) {
- logger.error("EC:Rolling back the entire digest replay.");
- return;
- }
- logger.info("EC: Successfully replayed transaction ");
- }
- }
+ public void replayDigest(MusicInterface mi, DBInterface dbi) throws MDBCServiceException {
+ HashMap<Range, StagingTable> transaction;
+ String nodeName = stateManager.getMdbcServerName();
+ logger.info("Node Name: "+nodeName);
+ LinkedHashMap<UUID, HashMap<Range,StagingTable>> ecDigestInformation = mi.getEveTxDigest(nodeName);
+ Set<UUID> keys = ecDigestInformation.keySet();
+
+ for(UUID txTimeID:keys){
+ transaction = (HashMap<Range,StagingTable>) ecDigestInformation.get(txTimeID);
+
+ try {
+ dbi.replayTransaction(transaction); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??)
+ } catch (SQLException e) {
+ logger.error("EC:Rolling back the entire digest replay.");
+ return;
+ }
+ logger.info("EC: Successfully replayed transaction for txTimeID key: "+txTimeID);
+
+ try {
+ mi.updateNodeInfoTableWithTxTimeIDKey(txTimeID, nodeName);
+ } catch (MDBCServiceException e) {
+ logger.error("EC:Rolling back the entire digest replay.");
+ }
+ }
+ }
/**