aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorst782s <statta@research.att.com>2019-01-21 07:21:46 -0500
committerst782s <statta@research.att.com>2019-01-22 15:57:08 -0500
commitcfb34c1b4f6701555df3bf7b9bdbf8caace966bc (patch)
tree92f0d8bed956213306d05177e5a54a6c2bf04b53
parent71f938d4b604fbe8d16c1fefceb29587c427377e (diff)
Add Eventual consistency logic
Issue-ID: MUSIC-276 Change-Id: Ie6b2508c57f0a7b677f48f87c991adcd613147cc Signed-off-by: st782s <statta@research.att.com>
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java28
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java8
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/Range.java12
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java10
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java50
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java10
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java139
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java143
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java275
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java9
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java2
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java79
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java10
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java2
14 files changed, 624 insertions, 153 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
index 629380d..6c1163a 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
@@ -53,6 +53,7 @@ import org.onap.music.mdbc.ownership.Dag;
import org.onap.music.mdbc.ownership.DagNode;
import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
import org.onap.music.mdbc.query.QueryProcessor;
+import org.onap.music.mdbc.tables.MusicTxDigest;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.TxCommitProgress;
@@ -69,8 +70,8 @@ import org.onap.music.mdbc.tables.TxCommitProgress;
public class MdbcConnection implements Connection {
private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MdbcConnection.class);
- private final String id; // This is the transaction id, assigned to this connection. There is no need to change the id, if connection is reused
- private final Connection jdbcConn; // the JDBC Connection to the actual underlying database
+ private final String id; // This is the transaction id, assigned to this connection. There is no need to change the id, if connection is reused
+ private final Connection jdbcConn; // the JDBC Connection to the actual underlying database
private final MusicInterface mi;
private final TxCommitProgress progressKeeper;
private final DBInterface dbi;
@@ -80,7 +81,7 @@ public class MdbcConnection implements Connection {
private DatabasePartition partition;
public MdbcConnection(String id, String url, Connection c, Properties info, MusicInterface mi,
- TxCommitProgress progressKeeper, DatabasePartition partition, StateManager statemanager) throws MDBCServiceException {
+ TxCommitProgress progressKeeper, DatabasePartition partition, StateManager statemanager) throws MDBCServiceException {
this.id = id;
this.table_set = Collections.synchronizedSet(new HashSet<String>());
this.transactionDigest = new HashMap<Range,StagingTable>();
@@ -161,7 +162,7 @@ public class MdbcConnection implements Connection {
throw new SQLException("tx id is null");
}
try {
- mi.commitLog(partition, transactionDigest, id, progressKeeper);
+ mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper);
} catch (MDBCServiceException e) {
// TODO Auto-generated catch block
logger.error("Cannot commit log to music" + e.getStackTrace());
@@ -203,7 +204,7 @@ public class MdbcConnection implements Connection {
try {
logger.debug(EELFLoggerDelegate.applicationLogger, " commit ");
// transaction was committed -- add all the updates into the REDO-Log in MUSIC
- mi.commitLog(partition, transactionDigest, id, progressKeeper);
+ mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper);
} catch (MDBCServiceException e) {
//If the commit fail, then a new commitId should be used
logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
@@ -226,7 +227,8 @@ public class MdbcConnection implements Connection {
//\TODO try to execute outside of the critical path of commit
try {
- relinquishIfRequired(partition);
+ if(partition != null)
+ relinquishIfRequired(partition);
} catch (MDBCServiceException e) {
logger.warn("Error trying to relinquish: "+partition.toString());
}
@@ -489,12 +491,20 @@ public class MdbcConnection implements Connection {
//Parse tables from the sql query
Map<String, List<String>> tableToInstruction = QueryProcessor.extractTableFromQuery(sql);
//Check ownership of keys
- List<Range> ranges = MDBCUtils.getTables(tableToInstruction);
- this.partition = own(ranges);
+ List<Range> queryTables = MDBCUtils.getTables(tableToInstruction);
+ // filter out ranges that fall under Eventually consistent
+ // category as these tables do not need ownership
+ List<Range> scQueryTables = filterEveTables( queryTables);
+ this.partition = own(scQueryTables);
dbi.preStatementHook(sql);
}
+ private List<Range> filterEveTables(List<Range> queryTables) {
+ queryTables.removeAll(statemanager.getEventualRanges());
+ return queryTables;
+ }
+
/**
* Code to be run within the DB driver after a SQL statement has been executed. This is where remote
* statement actions can be copied back to Cassandra/MUSIC.
@@ -510,7 +520,7 @@ public class MdbcConnection implements Connection {
* in order to prevent multiple threads from running this code in parallel.
*/
public void synchronizeTables() throws QueryException {
- Set<String> set1 = dbi.getSQLTableSet(); // set of tables in the database
+ Set<String> set1 = dbi.getSQLTableSet(); // set of tables in the database
logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1);
for (String tableName : set1) {
// This map will be filled in if this table was previously discovered
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
index cb0cea9..9fb36ae 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
@@ -58,6 +58,8 @@ public class MdbcServerLogic extends JdbcMeta{
public MdbcServerLogic(String Url, Properties info, NodeConfiguration config) throws SQLException, MDBCServiceException {
super(Url,info);
this.manager = new StateManager(Url,info,config.nodeName, config.sqlDatabaseName); //FIXME: db name should not be passed in ahead of time
+ manager.setEventualRanges(config.getEventual().getRanges());
+
this.info = info;
int concurrencyLevel = Integer.parseInt(
info.getProperty(ConnectionCacheSettings.CONCURRENCY_LEVEL.key(),
@@ -293,6 +295,12 @@ public class MdbcServerLogic extends JdbcMeta{
}
}
+
+
+
+
+
+
@Override
public void rollback(ConnectionHandle ch) {
try {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
index 214678a..efc89cd 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
@@ -33,10 +33,10 @@ public class Range implements Serializable, Cloneable{
private static final long serialVersionUID = 1610744496930800088L;
- public String table;
+ private String table;
public Range(String table) {
- this.table = table;
+ this.table = table.toUpperCase();
}
public String toString(){return table;}
@@ -51,12 +51,12 @@ public class Range implements Serializable, Cloneable{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Range r = (Range) o;
- return (this.overlaps(r)) && (r.overlaps(this));
+ return (this.overlaps(r)) && (r.overlaps(this));
}
@Override
public int hashCode(){
- return table.hashCode();
+ return table.hashCode();
}
@Override
@@ -76,4 +76,8 @@ public class Range implements Serializable, Cloneable{
return table.equals(other.table);
}
+ public String getTable() {
+ return table;
+ }
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index 1359a0d..4c5a3ed 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -79,7 +79,7 @@ public class StateManager {
/** Identifier for this server instance */
private String mdbcServerName;
private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition
-
+ private List<Range> eventualRanges;
public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
this.sqlDBName = sqlDBName;
@@ -150,6 +150,14 @@ public class StateManager {
}
+ public List<Range> getEventualRanges() {
+ return eventualRanges;
+ }
+
+ public void setEventualRanges(List<Range> eventualRanges) {
+ this.eventualRanges = eventualRanges;
+ }
+
public void closeConnection(String connectionId){
//\TODO check if there is a race condition
if(mdbcConnections.containsKey(connectionId)) {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java
new file mode 100644
index 0000000..0021bcc
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+package org.onap.music.mdbc.configurations;
+
+import java.util.List;
+import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.Range;
+
+/**
+ * This class represents meta information of tables categorized as eventually consistent
+ */
+public class Eventual {
+
+ private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Eventual.class);
+
+ protected List<Range> ranges;
+
+ public Eventual(List<Range> ranges) {
+ super();
+ this.ranges = ranges;
+ }
+
+ public List<Range> getRanges() {
+ return ranges;
+ }
+
+ public void setRanges(List<Range> ranges) {
+ this.ranges = ranges;
+ }
+
+
+
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
index 7a6aad7..96dc65f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
@@ -37,12 +37,14 @@ public class NodeConfiguration {
private static transient final EELFLoggerDelegate LOG = EELFLoggerDelegate.getLogger(NodeConfiguration.class);
public DatabasePartition partition;
+ public Eventual eventual;
public String nodeName;
public String sqlDatabaseName;
public NodeConfiguration(String tables, UUID mriIndex, String sqlDatabaseName, String node){
// public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String mriTable, String lockId, String musicTxDigestTable) {
partition = new DatabasePartition(toRanges(tables), mriIndex, null) ;
+ eventual = new Eventual(toRanges(tables));
this.nodeName = node;
this.sqlDatabaseName = sqlDatabaseName;
}
@@ -87,4 +89,12 @@ public class NodeConfiguration {
NodeConfiguration config = gson.fromJson(br, NodeConfiguration.class);
return config;
}
+
+ public Eventual getEventual() {
+ return eventual;
+ }
+
+ public void setEventual(Eventual eventual) {
+ this.eventual = eventual;
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java
new file mode 100644
index 0000000..338e697
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java
@@ -0,0 +1,139 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+package org.onap.music.mdbc.examples;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class MdbcEventualTestClient {
+
+
+
+ public static void main(String[] args){
+ try {
+ Class.forName("org.apache.calcite.avatica.remote.Driver");
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ Connection connection;
+ try {
+ connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ return;
+ }
+
+ try {
+ connection.setAutoCommit(false);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ return;
+ }
+
+
+ final String sql = "CREATE TABLE IF NOT EXISTS audit_log (\n" +
+ " id int,\n" +
+ " PersonID int,\n" +
+ " timeID bigint,\n" +
+ " PRIMARY KEY (id)" +
+ ");";
+ Statement stmt;
+ try {
+ stmt = connection.createStatement();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ return;
+ }
+
+ boolean execute = true;
+// try {
+// execute = stmt.execute(sql);
+// } catch (SQLException e) {
+// e.printStackTrace();
+// return;
+// }
+
+ if (execute) {
+ try {
+ connection.commit();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+
+ final String insertSQL = "INSERT INTO audit_log VALUES (1, 123, 123456789);";
+ final String insertSQL1 = "DELETE FROM audit_log WHERE PersonID=1;";
+ final String insertSQL2 = "INSERT INTO audit_log VALUES (1, 123, 123456789);";
+ final String insertSQL3 = "UPDATE audit_log SET PersonID=124 where id=1;";
+ final String insertSQL4 = "INSERT INTO audit_log VALUES (2, 234, 123456789);";
+
+
+ Statement insertStmt;
+ try {
+ insertStmt = connection.createStatement();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ return;
+ }
+
+ try {
+ execute = insertStmt.execute(insertSQL);
+ execute = insertStmt.execute(insertSQL1);
+ execute = insertStmt.execute(insertSQL2);
+ execute = insertStmt.execute(insertSQL3);
+ execute = insertStmt.execute(insertSQL4);
+
+ } catch (SQLException e) {
+ e.printStackTrace();
+ return;
+ }
+
+ try {
+ connection.commit();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ return;
+ }
+
+ try {
+ stmt.close();
+ insertStmt.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+
+
+ }
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
index 8abfba1..35b6121 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,19 +20,22 @@
package org.onap.music.mdbc.mixins;
import com.datastax.driver.core.ResultSet;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-
import org.json.JSONObject;
-
import org.onap.music.exceptions.MDBCServiceException;
-import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.exceptions.MusicLockingException;
+import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
+import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
+import org.onap.music.mdbc.tables.StagingTable;
+import org.onap.music.mdbc.tables.TxCommitProgress;
import org.onap.music.mdbc.ownership.Dag;
import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
import org.onap.music.mdbc.tables.*;
@@ -100,7 +103,7 @@ public interface MusicInterface {
/**
* This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables.
* The keyspace name comes from the initialization properties passed to the JDBC driver.
- * @throws MusicServiceException
+ * @throws MusicServiceException
*/
void createKeyspace() throws MDBCServiceException;
/**
@@ -167,7 +170,7 @@ public interface MusicInterface {
* @param changedRow This is information about the row that has changed
*/
void updateDirtyRowAndEntityTableInMusic(TableInfo ti, String tableName, JSONObject changedRow);
-
+
Object[] getObjects(TableInfo ti, String tableName, JSONObject row);
/**
* Returns the primary key associated with the given row
@@ -182,21 +185,22 @@ public interface MusicInterface {
* Commits the corresponding REDO-log into MUSIC
*
* @param partition information related to ownership of partitions, used to verify ownership when commiting the Tx
+ * @param eventualRanges
* @param transactionDigest digest of the transaction that is being committed into the Redo log in music. It has to
- * be a HashMap, because it is required to be serializable
+ * be a HashMap, because it is required to be serializable
* @param txId id associated with the log being send
* @param progressKeeper data structure that is used to handle to detect failures, and know what to do
* @throws MDBCServiceException
*/
- void commitLog(DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException;
-
+ void commitLog(DatabasePartition partition, List<Range> eventualRanges, HashMap<Range,StagingTable> transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException;
+
- /**
- * This function is used to obtain the information related to a specific row in the MRI table
- * @param partitionIndex index of the row that is going to be retrieved
- * @return all the information related to the table
- * @throws MDBCServiceException
- */
+ /**
+ * This function is used to obtain the information related to a specific row in the MRI table
+ * @param partitionIndex index of the row that is going to be retrieved
+ * @return all the information related to the table
+ * @throws MDBCServiceException
+ */
MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException;
/**
@@ -208,11 +212,11 @@ public interface MusicInterface {
RangeDependency getMusicRangeDependency(Range baseRange) throws MDBCServiceException;
/**
- * This function is used to create a new row in the MRI table
- * @param info the information used to create the row
- * @return the new partition object that contain the new information used to create the row
- * @throws MDBCServiceException
- */
+ * This function is used to create a new row in the MRI table
+ * @param info the information used to create the row
+ * @return the new partition object that contain the new information used to create the row
+ * @throws MDBCServiceException
+ */
DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException;
/**
@@ -223,48 +227,58 @@ public interface MusicInterface {
void createMusicRangeDependency(RangeDependency rangeAndDependencies) throws MDBCServiceException;
/**
- * This function is used to append an index to the redo log in a MRI row
- * @param partition information related to ownership of partitions, used to verify ownership
- * @param newRecord index of the new record to be appended to the redo log
- * @throws MDBCServiceException
- */
- void appendToRedoLog(DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException;
+ * This function is used to append an index to the redo log in a MRI row
+ * @param partition information related to ownership of partitions, used to verify ownership
+ * @param newRecord index of the new record to be appended to the redo log
+ * @throws MDBCServiceException
+ */
+ void appendToRedoLog( DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException;
- /**
- * This functions adds the tx digest to
- * @param newId id used as index in the MTD table
- * @param transactionDigest digest that contains all the changes performed in the transaction
- * @throws MDBCServiceException
- */
+ /**
+ * This functions adds the tx digest to
+ * @param newId id used as index in the MTD table
+ * @param transactionDigest digest that contains all the changes performed in the transaction
+ * @throws MDBCServiceException
+ */
void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException;
-
+
/**
- * Function used to retrieve a given transaction digest and deserialize it
- * @param id of the transaction digest to be retrieved
- * @return the deserialize transaction digest that can be applied to the local SQL database
- * @throws MDBCServiceException
- */
+ * This functions adds the eventual tx digest to
+ * @param newId id used as index in the MTD table
+ * @param transactionDigest digest that contains all the changes performed in the transaction
+ * @throws MDBCServiceException
+ */
+
+ void addEventualTxDigest(MusicTxDigestId newId, String transactionDigest)
+ throws MDBCServiceException;
+
+ /**
+ * Function used to retrieve a given transaction digest and deserialize it
+ * @param id of the transaction digest to be retrieved
+ * @return the deserialize transaction digest that can be applied to the local SQL database
+ * @throws MDBCServiceException
+ */
HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException;
- /**
- * Use this functions to verify ownership, and own new ranges
- * @param ranges the ranges that should be own after calling this function
- * @param partition current information of the ownership in the system
+ /**
+ * Use this functions to verify ownership, and own new ranges
+ * @param ranges the ranges that should be own after calling this function
+ * @param partition current information of the ownership in the system
* @param ownOpId is the id used to describe this ownership operation (it is not used to create the new row, if any is
* required
* @return an object indicating the status of the own function result
- * @throws MDBCServiceException
- */
+ * @throws MDBCServiceException
+ */
OwnershipReturn own(List<Range> ranges, DatabasePartition partition, UUID ownOpId) throws MDBCServiceException;
- /**
- * This function relinquish ownership, if it is time to do it, it should be used at the end of a commit operation
- * @param partition information of the partition that is currently being owned
- * @throws MDBCServiceException
- */
+ /**
+ * This function relinquish ownership, if it is time to do it, it should be used at the end of a commit operation
+ * @param partition information of the partition that is currently being owned
+ * @throws MDBCServiceException
+ */
void relinquishIfRequired(DatabasePartition partition) throws MDBCServiceException;
- /**
+ /**
* This function is in charge of owning all the ranges requested and creating a new row that show the ownership of all
* those ranges.
* @param rangeId new id to be used in the new row
@@ -276,17 +290,17 @@ public interface MusicInterface {
//OwnershipReturn appendRange(String rangeId, List<Range> ranges, DatabasePartition partition) throws MDBCServiceException;
/**
- * This functions relinquishes a range
- * @param ownerId id of the current ownerh
- * @param rangeId id of the range to be relinquished
- * @throws MusicLockingException
- */
+ * This functions relinquishes a range
+ * @param ownerId id of the current ownerh
+ * @param rangeId id of the range to be relinquished
+ * @throws MusicLockingException
+ */
void relinquish(String ownerId, String rangeId) throws MDBCServiceException;
- /**
- * This function return all the range indexes that are currently hold by any of the connections in the system
- * @return list of ids of rows in MRI
- */
+ /**
+ * This function return all the range indexes that are currently hold by any of the connections in the system
+ * @return list of ids of rows in MRI
+ */
List<UUID> getPartitionIndexes() throws MDBCServiceException;
/**
@@ -294,7 +308,7 @@ public interface MusicInterface {
* @param digest this contain all the changes that were perfomed in this digest
* @throws MDBCServiceException
*/
- void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException;
+ void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException;
/**
* This function is in charge of deleting old mri rows that are not longer contain
@@ -303,8 +317,11 @@ public interface MusicInterface {
*/
void deleteOldMriRows(Map<UUID,String> oldRowsAndLocks) throws MDBCServiceException;
- List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException;
+ List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException;
- OwnershipAndCheckpoint getOwnAndCheck();
+ OwnershipAndCheckpoint getOwnAndCheck();
+
+
+ ArrayList<HashMap<Range, StagingTable>> getEveTxDigest() throws MDBCServiceException;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index 068a64d..6eacb4f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -23,27 +23,45 @@ import java.io.IOException;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.sql.Types;
-import java.util.*;
-
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.function.BiFunction;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;
-import org.onap.music.mdbc.*;
-import org.onap.music.mdbc.ownership.Dag;
-import org.onap.music.mdbc.ownership.DagNode;
-import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
-import org.onap.music.mdbc.tables.*;
import org.json.JSONObject;
+import org.onap.music.datastore.Condition;
import org.onap.music.datastore.PreparedQueryObject;
+import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.exceptions.MusicLockingException;
import org.onap.music.exceptions.MusicQueryException;
import org.onap.music.exceptions.MusicServiceException;
-import org.onap.music.datastore.Condition;
+import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.main.MusicCore;
import org.onap.music.main.ResultType;
import org.onap.music.main.ReturnType;
-
-import org.onap.music.exceptions.MDBCServiceException;
-import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.DatabasePartition;
+import org.onap.music.mdbc.MDBCUtils;
+import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.TableInfo;
+import org.onap.music.mdbc.ownership.Dag;
+import org.onap.music.mdbc.ownership.DagNode;
+import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
+import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
+import org.onap.music.mdbc.tables.RangeDependency;
+import org.onap.music.mdbc.tables.StagingTable;
+import org.onap.music.mdbc.tables.TxCommitProgress;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.DataType;
@@ -101,6 +119,7 @@ public class MusicMixin implements MusicInterface {
//\TODO Add logic to change the names when required and create the tables when necessary
private String musicTxDigestTableName = "musictxdigest";
+ private String musicEventualTxDigestTableName = "musicevetxdigest";
private String musicRangeInformationTableName = "musicrangeinformation";
private String musicRangeDependencyTableName = "musicrangedependency";
@@ -193,6 +212,7 @@ public class MusicMixin implements MusicInterface {
private OwnershipAndCheckpoint ownAndCheck;
public MusicMixin() {
+
//this.logger = null;
this.musicAddress = null;
this.music_ns = null;
@@ -272,7 +292,6 @@ public class MusicMixin implements MusicInterface {
Row row = rs.one();
return (row == null) ? "UNKNOWN" : row.getUUID("HOST_ID").toString();
}
-
private String getAllHostIds() {
ResultSet results = null;
try {
@@ -297,7 +316,6 @@ public class MusicMixin implements MusicInterface {
public String getMixinName() {
return "cassandra";
}
-
/**
* Do what is needed to close down the MUSIC connection.
*/
@@ -316,6 +334,7 @@ public class MusicMixin implements MusicInterface {
createKeyspace();
try {
createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number
+ createMusicEventualTxDigest();
createMusicRangeInformationTable();
createMusicRangeDependencyTable();
}
@@ -1254,6 +1273,8 @@ public class MusicMixin implements MusicInterface {
return newRanges;
}
+
+
protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException {
UUID mriIndex = partition.getMRIIndex();
String lockId;
@@ -1290,8 +1311,21 @@ public class MusicMixin implements MusicInterface {
}
}
- public void commitLog(DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest,
- String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{
+
+ /**
+ * Writes the transaction information to metric's txDigest and musicRangeInformation table
+ * This officially commits the transaction globally
+ */
+ @Override
+ public void commitLog(DatabasePartition partition, List<Range> eventualRanges, HashMap<Range,StagingTable> transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{
+
+ // first deal with commit for eventually consistent tables
+ filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper);
+
+ // if strong consistency tables are not present in the transaction then return
+ if(partition == null || partition.getMRIIndex() == null)
+ return;
+
UUID mriIndex = partition.getMRIIndex();
String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex;
//0. See if reference to lock was already created
@@ -1312,6 +1346,10 @@ public class MusicMixin implements MusicInterface {
//Add creation type of transaction digest
//1. Push new row to RRT and obtain its index
+ if(transactionDigest == null || transactionDigest.isEmpty()) {
+ return;
+ }
+
String serializedTransactionDigest;
try {
serializedTransactionDigest = MDBCUtils.toString(transactionDigest);
@@ -1328,6 +1366,66 @@ public class MusicMixin implements MusicInterface {
appendToRedoLog(partition,digestId);
}
+ private void filterAndAddEventualTxDigest(List<Range> eventualRanges,
+ HashMap<Range, StagingTable> transactionDigest, String txId,
+ TxCommitProgress progressKeeper) throws MDBCServiceException {
+
+ if(eventualRanges == null || eventualRanges.isEmpty()) {
+ return;
+ }
+
+ HashMap<Range,StagingTable> eventualTransactionDigest = new HashMap<Range,StagingTable>();
+
+ for(Range eventualRange: eventualRanges) {
+ transactionDigest.computeIfPresent(eventualRange, new BiFunction<Range,StagingTable,StagingTable>() {
+
+ @Override
+ public StagingTable apply(Range key, StagingTable value) {
+ eventualTransactionDigest.put(key, value);
+ //transactionDigest.remove(key);
+ return null;
+ }
+
+ });
+ }
+
+ UUID commitId = getCommitId(txId, progressKeeper);
+
+ //1. Push new row to RRT
+
+ String serializedTransactionDigest;
+ if(eventualTransactionDigest != null && !eventualTransactionDigest.isEmpty()) {
+
+ try {
+ serializedTransactionDigest = MDBCUtils.toString(eventualTransactionDigest);
+ } catch (IOException e) {
+ throw new MDBCServiceException("Failed to serialized transaction digest with error "+e.toString(), e);
+ }
+ MusicTxDigestId digestId = new MusicTxDigestId(commitId,-1);
+ addEventualTxDigest(digestId, serializedTransactionDigest);
+
+ if(progressKeeper!= null) {
+ progressKeeper.setRecordId(txId,digestId);
+ }
+ }
+
+
+ }
+
+ private UUID getCommitId(String txId, TxCommitProgress progressKeeper)
+ throws MDBCServiceException {
+ UUID commitId;
+ //Generate a local commit id
+ if(progressKeeper.containsTx(txId)) {
+ commitId = progressKeeper.getCommitId(txId);
+ }
+ else{
+ logger.error(EELFLoggerDelegate.errorLogger, "Tx with id "+txId+" was not created in the TxCommitProgress ");
+ throw new MDBCServiceException("Tx with id "+txId+" was not created in the TxCommitProgress ");
+ }
+ return commitId;
+ }
+
/**
* @param tableName
* @param string
@@ -1452,10 +1550,10 @@ public class MusicMixin implements MusicInterface {
String cql = String.format("SELECT * FROM %s.%s WHERE range = ?;", music_ns, musicRangeDependencyTableName);
PreparedQueryObject pQueryObject = new PreparedQueryObject();
pQueryObject.appendQueryString(cql);
- pQueryObject.addValue(baseRange.table);
+ pQueryObject.addValue(baseRange.getTable());
Row newRow;
try {
- newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,baseRange.table,null);
+ newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,baseRange.getTable(),null);
} catch (MDBCServiceException e) {
logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName);
throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e);
@@ -1515,12 +1613,12 @@ public class MusicMixin implements MusicInterface {
public void createMusicRangeDependency(RangeDependency rangeAndDependencies) throws MDBCServiceException {
StringBuilder insert = new StringBuilder("INSERT INTO ")
.append(this.music_ns)
- .append('.')
- .append(this.musicRangeDependencyTableName)
- .append(" (range,dependencies) VALUES ")
- .append("(")
- .append(rangeAndDependencies.getRange().table)
- .append(",{");
+ .append('.')
+ .append(this.musicRangeDependencyTableName)
+ .append(" (range,dependencies) VALUES ")
+ .append("(")
+ .append(rangeAndDependencies.getRange().getTable())
+ .append(",{");
boolean first=true;
for(Range r: rangeAndDependencies.dependentRanges()){
if(first){ first=false; }
@@ -1537,16 +1635,16 @@ public class MusicMixin implements MusicInterface {
private UUID createEmptyMriRow(List<Range> rangesCopy) {
- //TODO: THis should call one of the other createMRIRows
- UUID id = generateUniqueKey();
+ //TODO: THis should call one of the other createMRIRows
+ UUID id = generateUniqueKey();
StringBuilder insert = new StringBuilder("INSERT INTO ")
.append(this.music_ns)
- .append('.')
- .append(this.musicRangeInformationTableName)
- .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ")
- .append("(")
- .append(id)
- .append(",{");
+ .append('.')
+ .append(this.musicRangeInformationTableName)
+ .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ")
+ .append("(")
+ .append(id)
+ .append(",{");
boolean first=true;
for(Range r: rangesCopy){
if(first){ first=false; }
@@ -1565,7 +1663,7 @@ public class MusicMixin implements MusicInterface {
MusicCore.eventualPut(query);
return id;
}
-
+
/**
* Creates a new empty MRI row
@@ -1635,14 +1733,46 @@ public class MusicMixin implements MusicInterface {
public void createMusicTxDigest() throws MDBCServiceException {
createMusicTxDigest(-1);
}
+
+ public void createMusicEventualTxDigest() throws MDBCServiceException {
+ createMusicEventualTxDigest(-1);
+ }
/**
- * This function creates the MusicTxDigest table. It contain information related to each transaction committed
+ * This function creates the MusicEveTxDigest table. It contain information related to each eventual transaction committed
* * LeaseId: id associated with the lease, text
* * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
* * TransactionDigest: text that contains all the changes in the transaction
*/
+ private void createMusicEventualTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException {
+ String tableName = this.musicEventualTxDigestTableName;
+ if(musicTxDigestTableNumber >= 0) {
+ tableName = tableName +
+ "-" +
+ Integer.toString(musicTxDigestTableNumber);
+ }
+ String priKey = "txTimeId";
+ StringBuilder fields = new StringBuilder();
+ fields.append("txid uuid, ");
+ fields.append("transactiondigest text, ");
+ fields.append("txTimeId TIMEUUID ");//notice lack of ','
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey);
+ try {
+ executeMusicWriteQuery(this.music_ns,tableName,cql);
+ } catch (MDBCServiceException e) {
+ logger.error("Initialization error: Failure to create redo records table");
+ throw(e);
+ }
+ }
+
+
+ /**
+ * This function creates the MusicTxDigest table. It contain information related to each transaction committed
+ * * LeaseId: id associated with the lease, text
+ * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
+ * * TransactionDigest: text that contains all the changes in the transaction
+ */
private void createMusicTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException {
String tableName = this.musicTxDigestTableName;
if(musicTxDigestTableNumber >= 0) {
@@ -1704,6 +1834,35 @@ public class MusicMixin implements MusicInterface {
throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString(), e);
}
}
+
+ /**
+ * Writes the Eventual transaction history to the evetxDigest
+ */
+ @Override
+ public void addEventualTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException {
+ //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest);
+ PreparedQueryObject query = new PreparedQueryObject();
+ String cqlQuery = "INSERT INTO " +
+ this.music_ns +
+ '.' +
+ this.musicEventualTxDigestTableName +
+ " (txid,transactiondigest,txTimeId) " +
+ "VALUES (" +
+ newId.txId + ",'" +
+ transactionDigest +
+ "'," +
+ // "toTimestamp(now())" +
+ "now()" +
+ ");";
+ query.appendQueryString(cqlQuery);
+ //\TODO check if I am not shooting on my own foot
+ try {
+ MusicCore.nonKeyRelatedPut(query,"critical");
+ } catch (MusicServiceException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage());
+ throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString(), e);
+ }
+ }
@Override
public HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException {
@@ -1731,7 +1890,49 @@ public class MusicMixin implements MusicInterface {
}
return changes;
}
+
+
+ @Override
+ public ArrayList<HashMap<Range,StagingTable>> getEveTxDigest() throws MDBCServiceException {
+ HashMap<Range,StagingTable> changes;
+ ArrayList<HashMap<Range,StagingTable>> ecDigestList = new ArrayList<HashMap<Range,StagingTable>>();
+
+ //but was this timestamp is getting added as per post: https://dev.mysql.com/doc/refman/8.0/en/time-zone-leap-seconds.html
+ //Ex1: SELECT uuid, txDigest, UNIX_TIMESTAMP(ts) FROM ectxdigest ORDER by ts;
+ //Ex2: SELECT * FROM ectxdigest ORDER by ts; or SELECT * FROM ectxdigest
+ //####### this will pull all records.. but REPLAY will be against specific records once the NODE it back ON-Line.
+ // I should get the last record timestamp so that I can put a where condition.
+ //EX3: SELECT uuid, txDigest, UNIX_TIMESTAMP(ts) FROM ectxdigest where UNIX_TIMESTAMP(ts)>UNIX_TIMESTAMP(<<Date/Time value from others>>) ORDER by ts;
+ String cql = String.format("SELECT * FROM %s.%s ;", music_ns, this.musicEventualTxDigestTableName);
+ // Ex 1 & 2 might return millions of records!! things to consider outOfMemory issue, performance issue etc.. How to overcome??
+ // Ex 3: will return less records compare to Ex:1 and Ex:2.
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ // I need to get a ResultSet of all the records and give each row to the below HashMap.
+ ResultSet rs = executeMusicRead(pQueryObject.toString());
+ while (!rs.isExhausted()) {
+ Row row = rs.one();
+ String digest = row.getString("transactiondigest");
+
+ try {
+ changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest);
+ } catch (IOException e) {
+ logger.error("IOException when deserializing digest");
+ throw new MDBCServiceException("Deserializng digest failed with ioexception", e);
+ } catch (ClassNotFoundException e) {
+ logger.error("Deserializng digest failed with an invalid class");
+ throw new MDBCServiceException("Deserializng digest failed with an invalid class", e);
+ }
+
+ ecDigestList.add(changes);
+ }
+
+ return ecDigestList;
+ }
+
+
+
ResultSet getAllMriCassandraRows() throws MDBCServiceException {
StringBuilder cqlOperation = new StringBuilder();
cqlOperation.append("SELECT * FROM ")
@@ -1997,7 +2198,11 @@ public class MusicMixin implements MusicInterface {
@Override
public OwnershipReturn own(List<Range> ranges, DatabasePartition partition, UUID opId) throws MDBCServiceException {
- Map<UUID,LockResult> newLocks = new HashMap<>();
+
+ if(ranges == null || ranges.isEmpty())
+ return null;
+
+ Map<UUID,LockResult> newLocks = new HashMap<>();
//Init timeout clock
ownAndCheck.startOwnershipTimeoutClock(opId);
//Find
@@ -2285,7 +2490,7 @@ public class MusicMixin implements MusicInterface {
@Override
public void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException{
- throw new NotImplementedException("Error, replay transaction in music mixin needs to be implemented");
+ //throw new NotImplementedException("Error, replay transaction in music mixin needs to be implemented");
}
@Override
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
index af935ef..25b3de0 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
@@ -17,7 +17,6 @@
* limitations under the License.
* ============LICENSE_END======================================================
*/
-
package org.onap.music.mdbc.mixins;
import java.sql.Connection;
@@ -844,7 +843,7 @@ NEW.field refers to the new value
jdbcConn.setAutoCommit(autocommit);
}
-
+
@Override
public void disableForeignKeyChecks() throws SQLException {
Statement disable = jdbcConn.createStatement();
@@ -896,7 +895,7 @@ NEW.field refers to the new value
switch (op.getOperationType()) {
case INSERT:
sql.append(op.getOperationType() + " INTO ");
- sql.append(r.table + " (") ;
+ sql.append(r.getTable() + " (") ;
sep = "";
for (String col: cols) {
sql.append(sep + col);
@@ -912,7 +911,7 @@ NEW.field refers to the new value
break;
case UPDATE:
sql.append(op.getOperationType() + " ");
- sql.append(r.table + " SET ");
+ sql.append(r.getTable() + " SET ");
sep="";
for (int i=0; i<cols.size(); i++) {
sql.append(sep + cols.get(i) + "=\"" + vals.get(i) +"\"");
@@ -924,7 +923,7 @@ NEW.field refers to the new value
break;
case DELETE:
sql.append(op.getOperationType() + " FROM ");
- sql.append(r.table + " WHERE ");
+ sql.append(r.getTable() + " WHERE ");
sql.append(getPrimaryKeyConditional(op.getKey()));
sql.append(";");
break;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
index a1228d5..68d1f19 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
@@ -359,7 +359,7 @@ public class Dag {
Range range = rangeAndNodes.getKey();
Set<DagNode> nodes = rangeAndNodes.getValue();
if(nodes.size() > 2){
- logger.error("Range "+range.table+"has more than 2 active rows");
+ logger.error("Range "+range.getTable()+"has more than 2 active rows");
throw new MDBCServiceException("Range has more than 2 active rows");
}
else if(nodes.size()==2){
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
index fa15354..1da2d79 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
@@ -19,36 +19,22 @@
*/
package org.onap.music.mdbc.tables;
-import java.io.IOException;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-
-import org.json.JSONObject;
-import org.onap.music.datastore.PreparedQueryObject;
import org.onap.music.exceptions.MDBCServiceException;
-import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.DatabasePartition;
-import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.MdbcConnection;
-import org.onap.music.mdbc.MdbcServerLogic;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.StateManager;
-import org.onap.music.mdbc.configurations.NodeConfiguration;
-import org.onap.music.mdbc.mixins.MusicMixin;
import org.onap.music.mdbc.mixins.DBInterface;
import org.onap.music.mdbc.mixins.MusicInterface;
-import com.datastax.driver.core.Row;
-
public class MusicTxDigest {
private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicTxDigest.class);
@@ -90,7 +76,8 @@ public class MusicTxDigest {
for (UUID partition : partitions) {
if (!partition.equals(myPartition.getMRIIndex())) {
try {
- replayDigestForPartition(mi, partition, dbi);
+ //replayDigestForPartition(mi, partition, dbi);
+ mi.getOwnAndCheck().warmup(mi, dbi, myPartition.getSnapshot());
} catch (MDBCServiceException e) {
logger.error("Unable to update for partition : " + partition + ". " + e.getMessage());
continue;
@@ -98,33 +85,71 @@ public class MusicTxDigest {
}
}
}
+
+ //Step 3: ReplayDigest() for E.C conditions
+ try {
+ replayDigest(mi,dbi);
+ } catch (MDBCServiceException e) {
+ logger.error("Unable to perform Eventual Consistency operations" + e.getMessage());
+ continue;
+ }
+
}
}
-
+
/**
- * Replay the digest for a given partition
+ * Replay the digest for eventual consistency.
* @param mi music interface
* @param partitionId the partition to be replayed
* @param dbi interface to the database that will replay the operations
* @throws MDBCServiceException
*/
- public static void replayDigestForPartition(MusicInterface mi, UUID partitionId, DBInterface dbi) throws MDBCServiceException {
- List<MusicTxDigestId> partitionsRedoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog();
- for (MusicTxDigestId txId: partitionsRedoLogTxIds) {
- HashMap<Range, StagingTable> transaction = mi.getTxDigest(txId);
+ public static void replayDigest(MusicInterface mi, DBInterface dbi) throws MDBCServiceException {
+ //List<MusicTxDigestId> partitionsRedoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog();
+ //From where should I fetch TransactionsIDs ??? from NEW TABLE ?? or EXISING TABLE ?? << what the new SITE_TABLE details??
+ // --> It is a new table called ECTxDigest
+ //I should sort/ call a method which gives all the entires of a table based on the time-stamp from Low to High
+
+ ArrayList<HashMap<Range, StagingTable>> ecTxDigest = mi.getEveTxDigest();
+
+ //for (MusicTxDigestId txId: partitionsRedoLogTxIds) { // partitionsRedoLogTxIds --> this comes from new table where timeStamp > currentTimeStamp ( THIS SHOULD BE lessthan.. which is ASC order)
+ //HashMap<Range, StagingTable> transaction = mi2.getEcTxDigest(); // Getting records from musictxdigest TABLE.
+ for (HashMap<Range, StagingTable> transaction: ecTxDigest) {
try {
- //\TODO do this two operations in parallel
- dbi.replayTransaction(transaction);
- mi.replayTransaction(transaction);
+ dbi.replayTransaction(transaction); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??)
} catch (SQLException e) {
- logger.error("Rolling back the entire digest replay. " + partitionId);
+ logger.error("EC:Rolling back the entire digest replay.");
return;
}
- logger.info("Successfully replayed transaction " + txId);
+ logger.info("EC: Successfully replayed transaction ");
}
- //todo, keep track of where I am in pointer
}
+
+ /**
+ * Replay the digest for a given partition
+ * @param mi music interface
+ * @param partitionId the partition to be replayed
+ * @param dbi interface to the database that will replay the operations
+ * @throws MDBCServiceException
+ */
+ public static void replayDigestForPartition(MusicInterface mi, UUID partitionId, DBInterface dbi) throws MDBCServiceException {
+ List<MusicTxDigestId> partitionsRedoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog();
+ for (MusicTxDigestId txId: partitionsRedoLogTxIds) {
+ HashMap<Range, StagingTable> transaction = mi.getTxDigest(txId);
+ try {
+ //\TODO do this two operations in parallel
+ dbi.replayTransaction(transaction);
+ mi.replayTransaction(transaction);
+ } catch (SQLException e) {
+ logger.error("Rolling back the entire digest replay. " + partitionId);
+ return;
+ }
+ logger.info("Successfully replayed transaction " + txId);
+ }
+ //todo, keep track of where I am in pointer
+ }
+
/**
* Start the background daemon defined by this object
* Spawns a new thread and runs "backgroundDaemon"
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java
index 5c50b6d..2d31939 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java
@@ -20,23 +20,19 @@
package org.onap.music.mdbc;
-import static org.junit.Assert.assertEquals;
+import org.junit.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
import org.onap.music.mdbc.mixins.MySQLMixin;
import ch.vorburger.mariadb4j.DB;
public class MySQLMixinTest {
- public static final String DATABASE = "mdbcTest";
+ public static final String DATABASE = "mdbctest";
public static final String TABLE= "Persons";
public static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS " + TABLE + " (\n" +
" PersonID int,\n" +
@@ -75,7 +71,7 @@ public class MySQLMixinTest {
@Test
public void testGetDataBaseName() throws SQLException {
- assertEquals(DATABASE, mysqlMixin.getDatabaseName());
+ Assert.assertEquals(DATABASE, mysqlMixin.getDatabaseName());
}
}
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java
index 0e7b030..753c629 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java
@@ -158,7 +158,7 @@ public class OwnershipAndCheckpointTest {
String id = MDBCUtils.generateUniqueKey().toString();
TxCommitProgress progressKeeper = new TxCommitProgress();
progressKeeper.createNewTransactionTracker(id ,this.conn);
- musicMixin.commitLog(partition, stagingTable, id, progressKeeper);
+ musicMixin.commitLog(partition, null, stagingTable, id, progressKeeper);
TestUtils.unlockRow(keyspace,mriTableName,partition);
}