diff options
author | 2019-01-21 07:21:46 -0500 | |
---|---|---|
committer | 2019-01-22 15:57:08 -0500 | |
commit | cfb34c1b4f6701555df3bf7b9bdbf8caace966bc (patch) | |
tree | 92f0d8bed956213306d05177e5a54a6c2bf04b53 /mdbc-server/src/main/java/org | |
parent | 71f938d4b604fbe8d16c1fefceb29587c427377e (diff) |
Add Eventual consistency logic
Issue-ID: MUSIC-276
Change-Id: Ie6b2508c57f0a7b677f48f87c991adcd613147cc
Signed-off-by: st782s <statta@research.att.com>
Diffstat (limited to 'mdbc-server/src/main/java/org')
12 files changed, 620 insertions, 145 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java index 629380d..6c1163a 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java @@ -53,6 +53,7 @@ import org.onap.music.mdbc.ownership.Dag; import org.onap.music.mdbc.ownership.DagNode; import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint; import org.onap.music.mdbc.query.QueryProcessor; +import org.onap.music.mdbc.tables.MusicTxDigest; import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.TxCommitProgress; @@ -69,8 +70,8 @@ import org.onap.music.mdbc.tables.TxCommitProgress; public class MdbcConnection implements Connection { private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MdbcConnection.class); - private final String id; // This is the transaction id, assigned to this connection. There is no need to change the id, if connection is reused - private final Connection jdbcConn; // the JDBC Connection to the actual underlying database + private final String id; // This is the transaction id, assigned to this connection. There is no need to change the id, if connection is reused + private final Connection jdbcConn; // the JDBC Connection to the actual underlying database private final MusicInterface mi; private final TxCommitProgress progressKeeper; private final DBInterface dbi; @@ -80,7 +81,7 @@ public class MdbcConnection implements Connection { private DatabasePartition partition; public MdbcConnection(String id, String url, Connection c, Properties info, MusicInterface mi, - TxCommitProgress progressKeeper, DatabasePartition partition, StateManager statemanager) throws MDBCServiceException { + TxCommitProgress progressKeeper, DatabasePartition partition, StateManager statemanager) throws MDBCServiceException { this.id = id; this.table_set = Collections.synchronizedSet(new HashSet<String>()); this.transactionDigest = new HashMap<Range,StagingTable>(); @@ -161,7 +162,7 @@ public class MdbcConnection implements Connection { throw new SQLException("tx id is null"); } try { - mi.commitLog(partition, transactionDigest, id, progressKeeper); + mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper); } catch (MDBCServiceException e) { // TODO Auto-generated catch block logger.error("Cannot commit log to music" + e.getStackTrace()); @@ -203,7 +204,7 @@ public class MdbcConnection implements Connection { try { logger.debug(EELFLoggerDelegate.applicationLogger, " commit "); // transaction was committed -- add all the updates into the REDO-Log in MUSIC - mi.commitLog(partition, transactionDigest, id, progressKeeper); + mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper); } catch (MDBCServiceException e) { //If the commit fail, then a new commitId should be used logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL); @@ -226,7 +227,8 @@ public class MdbcConnection implements Connection { //\TODO try to execute outside of the critical path of commit try { - relinquishIfRequired(partition); + if(partition != null) + relinquishIfRequired(partition); } catch (MDBCServiceException e) { logger.warn("Error trying to relinquish: "+partition.toString()); } @@ -489,12 +491,20 @@ public class MdbcConnection implements Connection { //Parse tables from the sql query Map<String, List<String>> tableToInstruction = QueryProcessor.extractTableFromQuery(sql); //Check ownership of keys - List<Range> ranges = MDBCUtils.getTables(tableToInstruction); - this.partition = own(ranges); + List<Range> queryTables = MDBCUtils.getTables(tableToInstruction); + // filter out ranges that fall under Eventually consistent + // category as these tables do not need ownership + List<Range> scQueryTables = filterEveTables( queryTables); + this.partition = own(scQueryTables); dbi.preStatementHook(sql); } + private List<Range> filterEveTables(List<Range> queryTables) { + queryTables.removeAll(statemanager.getEventualRanges()); + return queryTables; + } + /** * Code to be run within the DB driver after a SQL statement has been executed. This is where remote * statement actions can be copied back to Cassandra/MUSIC. @@ -510,7 +520,7 @@ public class MdbcConnection implements Connection { * in order to prevent multiple threads from running this code in parallel. */ public void synchronizeTables() throws QueryException { - Set<String> set1 = dbi.getSQLTableSet(); // set of tables in the database + Set<String> set1 = dbi.getSQLTableSet(); // set of tables in the database logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1); for (String tableName : set1) { // This map will be filled in if this table was previously discovered diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java index cb0cea9..9fb36ae 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java @@ -58,6 +58,8 @@ public class MdbcServerLogic extends JdbcMeta{ public MdbcServerLogic(String Url, Properties info, NodeConfiguration config) throws SQLException, MDBCServiceException { super(Url,info); this.manager = new StateManager(Url,info,config.nodeName, config.sqlDatabaseName); //FIXME: db name should not be passed in ahead of time + manager.setEventualRanges(config.getEventual().getRanges()); + this.info = info; int concurrencyLevel = Integer.parseInt( info.getProperty(ConnectionCacheSettings.CONCURRENCY_LEVEL.key(), @@ -293,6 +295,12 @@ public class MdbcServerLogic extends JdbcMeta{ } } + + + + + + @Override public void rollback(ConnectionHandle ch) { try { diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java index 214678a..efc89cd 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java @@ -33,10 +33,10 @@ public class Range implements Serializable, Cloneable{ private static final long serialVersionUID = 1610744496930800088L; - public String table; + private String table; public Range(String table) { - this.table = table; + this.table = table.toUpperCase(); } public String toString(){return table;} @@ -51,12 +51,12 @@ public class Range implements Serializable, Cloneable{ if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Range r = (Range) o; - return (this.overlaps(r)) && (r.overlaps(this)); + return (this.overlaps(r)) && (r.overlaps(this)); } @Override public int hashCode(){ - return table.hashCode(); + return table.hashCode(); } @Override @@ -76,4 +76,8 @@ public class Range implements Serializable, Cloneable{ return table.equals(other.table); } + public String getTable() { + return table; + } + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index 1359a0d..4c5a3ed 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -79,7 +79,7 @@ public class StateManager { /** Identifier for this server instance */ private String mdbcServerName; private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition - + private List<Range> eventualRanges; public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException { this.sqlDBName = sqlDBName; @@ -150,6 +150,14 @@ public class StateManager { } + public List<Range> getEventualRanges() { + return eventualRanges; + } + + public void setEventualRanges(List<Range> eventualRanges) { + this.eventualRanges = eventualRanges; + } + public void closeConnection(String connectionId){ //\TODO check if there is a race condition if(mdbcConnections.containsKey(connectionId)) { diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java new file mode 100644 index 0000000..0021bcc --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java @@ -0,0 +1,50 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ +package org.onap.music.mdbc.configurations; + +import java.util.List; +import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.mdbc.Range; + +/** + * This class represents meta information of tables categorized as eventually consistent + */ +public class Eventual { + + private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Eventual.class); + + protected List<Range> ranges; + + public Eventual(List<Range> ranges) { + super(); + this.ranges = ranges; + } + + public List<Range> getRanges() { + return ranges; + } + + public void setRanges(List<Range> ranges) { + this.ranges = ranges; + } + + + +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java index 7a6aad7..96dc65f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java @@ -37,12 +37,14 @@ public class NodeConfiguration { private static transient final EELFLoggerDelegate LOG = EELFLoggerDelegate.getLogger(NodeConfiguration.class); public DatabasePartition partition; + public Eventual eventual; public String nodeName; public String sqlDatabaseName; public NodeConfiguration(String tables, UUID mriIndex, String sqlDatabaseName, String node){ // public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String mriTable, String lockId, String musicTxDigestTable) { partition = new DatabasePartition(toRanges(tables), mriIndex, null) ; + eventual = new Eventual(toRanges(tables)); this.nodeName = node; this.sqlDatabaseName = sqlDatabaseName; } @@ -87,4 +89,12 @@ public class NodeConfiguration { NodeConfiguration config = gson.fromJson(br, NodeConfiguration.class); return config; } + + public Eventual getEventual() { + return eventual; + } + + public void setEventual(Eventual eventual) { + this.eventual = eventual; + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java new file mode 100644 index 0000000..338e697 --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java @@ -0,0 +1,139 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ +package org.onap.music.mdbc.examples; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +public class MdbcEventualTestClient { + + + + public static void main(String[] args){ + try { + Class.forName("org.apache.calcite.avatica.remote.Driver"); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + System.exit(1); + } + Connection connection; + try { + connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf"); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + try { + connection.setAutoCommit(false); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + + final String sql = "CREATE TABLE IF NOT EXISTS audit_log (\n" + + " id int,\n" + + " PersonID int,\n" + + " timeID bigint,\n" + + " PRIMARY KEY (id)" + + ");"; + Statement stmt; + try { + stmt = connection.createStatement(); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + boolean execute = true; +// try { +// execute = stmt.execute(sql); +// } catch (SQLException e) { +// e.printStackTrace(); +// return; +// } + + if (execute) { + try { + connection.commit(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + + try { + stmt.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + + final String insertSQL = "INSERT INTO audit_log VALUES (1, 123, 123456789);"; + final String insertSQL1 = "DELETE FROM audit_log WHERE PersonID=1;"; + final String insertSQL2 = "INSERT INTO audit_log VALUES (1, 123, 123456789);"; + final String insertSQL3 = "UPDATE audit_log SET PersonID=124 where id=1;"; + final String insertSQL4 = "INSERT INTO audit_log VALUES (2, 234, 123456789);"; + + + Statement insertStmt; + try { + insertStmt = connection.createStatement(); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + try { + execute = insertStmt.execute(insertSQL); + execute = insertStmt.execute(insertSQL1); + execute = insertStmt.execute(insertSQL2); + execute = insertStmt.execute(insertSQL3); + execute = insertStmt.execute(insertSQL4); + + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + try { + connection.commit(); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + try { + stmt.close(); + insertStmt.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + + try { + connection.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + + + } +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java index 8abfba1..35b6121 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,19 +20,22 @@ package org.onap.music.mdbc.mixins; import com.datastax.driver.core.ResultSet; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; - import org.json.JSONObject; - import org.onap.music.exceptions.MDBCServiceException; -import org.onap.music.exceptions.MusicServiceException; import org.onap.music.exceptions.MusicLockingException; +import org.onap.music.exceptions.MusicServiceException; import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; +import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.mdbc.tables.MusicTxDigestId; +import org.onap.music.mdbc.tables.StagingTable; +import org.onap.music.mdbc.tables.TxCommitProgress; import org.onap.music.mdbc.ownership.Dag; import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint; import org.onap.music.mdbc.tables.*; @@ -100,7 +103,7 @@ public interface MusicInterface { /** * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables. * The keyspace name comes from the initialization properties passed to the JDBC driver. - * @throws MusicServiceException + * @throws MusicServiceException */ void createKeyspace() throws MDBCServiceException; /** @@ -167,7 +170,7 @@ public interface MusicInterface { * @param changedRow This is information about the row that has changed */ void updateDirtyRowAndEntityTableInMusic(TableInfo ti, String tableName, JSONObject changedRow); - + Object[] getObjects(TableInfo ti, String tableName, JSONObject row); /** * Returns the primary key associated with the given row @@ -182,21 +185,22 @@ public interface MusicInterface { * Commits the corresponding REDO-log into MUSIC * * @param partition information related to ownership of partitions, used to verify ownership when commiting the Tx + * @param eventualRanges * @param transactionDigest digest of the transaction that is being committed into the Redo log in music. It has to - * be a HashMap, because it is required to be serializable + * be a HashMap, because it is required to be serializable * @param txId id associated with the log being send * @param progressKeeper data structure that is used to handle to detect failures, and know what to do * @throws MDBCServiceException */ - void commitLog(DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException; - + void commitLog(DatabasePartition partition, List<Range> eventualRanges, HashMap<Range,StagingTable> transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException; + - /** - * This function is used to obtain the information related to a specific row in the MRI table - * @param partitionIndex index of the row that is going to be retrieved - * @return all the information related to the table - * @throws MDBCServiceException - */ + /** + * This function is used to obtain the information related to a specific row in the MRI table + * @param partitionIndex index of the row that is going to be retrieved + * @return all the information related to the table + * @throws MDBCServiceException + */ MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException; /** @@ -208,11 +212,11 @@ public interface MusicInterface { RangeDependency getMusicRangeDependency(Range baseRange) throws MDBCServiceException; /** - * This function is used to create a new row in the MRI table - * @param info the information used to create the row - * @return the new partition object that contain the new information used to create the row - * @throws MDBCServiceException - */ + * This function is used to create a new row in the MRI table + * @param info the information used to create the row + * @return the new partition object that contain the new information used to create the row + * @throws MDBCServiceException + */ DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException; /** @@ -223,48 +227,58 @@ public interface MusicInterface { void createMusicRangeDependency(RangeDependency rangeAndDependencies) throws MDBCServiceException; /** - * This function is used to append an index to the redo log in a MRI row - * @param partition information related to ownership of partitions, used to verify ownership - * @param newRecord index of the new record to be appended to the redo log - * @throws MDBCServiceException - */ - void appendToRedoLog(DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException; + * This function is used to append an index to the redo log in a MRI row + * @param partition information related to ownership of partitions, used to verify ownership + * @param newRecord index of the new record to be appended to the redo log + * @throws MDBCServiceException + */ + void appendToRedoLog( DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException; - /** - * This functions adds the tx digest to - * @param newId id used as index in the MTD table - * @param transactionDigest digest that contains all the changes performed in the transaction - * @throws MDBCServiceException - */ + /** + * This functions adds the tx digest to + * @param newId id used as index in the MTD table + * @param transactionDigest digest that contains all the changes performed in the transaction + * @throws MDBCServiceException + */ void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException; - + /** - * Function used to retrieve a given transaction digest and deserialize it - * @param id of the transaction digest to be retrieved - * @return the deserialize transaction digest that can be applied to the local SQL database - * @throws MDBCServiceException - */ + * This functions adds the eventual tx digest to + * @param newId id used as index in the MTD table + * @param transactionDigest digest that contains all the changes performed in the transaction + * @throws MDBCServiceException + */ + + void addEventualTxDigest(MusicTxDigestId newId, String transactionDigest) + throws MDBCServiceException; + + /** + * Function used to retrieve a given transaction digest and deserialize it + * @param id of the transaction digest to be retrieved + * @return the deserialize transaction digest that can be applied to the local SQL database + * @throws MDBCServiceException + */ HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException; - /** - * Use this functions to verify ownership, and own new ranges - * @param ranges the ranges that should be own after calling this function - * @param partition current information of the ownership in the system + /** + * Use this functions to verify ownership, and own new ranges + * @param ranges the ranges that should be own after calling this function + * @param partition current information of the ownership in the system * @param ownOpId is the id used to describe this ownership operation (it is not used to create the new row, if any is * required * @return an object indicating the status of the own function result - * @throws MDBCServiceException - */ + * @throws MDBCServiceException + */ OwnershipReturn own(List<Range> ranges, DatabasePartition partition, UUID ownOpId) throws MDBCServiceException; - /** - * This function relinquish ownership, if it is time to do it, it should be used at the end of a commit operation - * @param partition information of the partition that is currently being owned - * @throws MDBCServiceException - */ + /** + * This function relinquish ownership, if it is time to do it, it should be used at the end of a commit operation + * @param partition information of the partition that is currently being owned + * @throws MDBCServiceException + */ void relinquishIfRequired(DatabasePartition partition) throws MDBCServiceException; - /** + /** * This function is in charge of owning all the ranges requested and creating a new row that show the ownership of all * those ranges. * @param rangeId new id to be used in the new row @@ -276,17 +290,17 @@ public interface MusicInterface { //OwnershipReturn appendRange(String rangeId, List<Range> ranges, DatabasePartition partition) throws MDBCServiceException; /** - * This functions relinquishes a range - * @param ownerId id of the current ownerh - * @param rangeId id of the range to be relinquished - * @throws MusicLockingException - */ + * This functions relinquishes a range + * @param ownerId id of the current ownerh + * @param rangeId id of the range to be relinquished + * @throws MusicLockingException + */ void relinquish(String ownerId, String rangeId) throws MDBCServiceException; - /** - * This function return all the range indexes that are currently hold by any of the connections in the system - * @return list of ids of rows in MRI - */ + /** + * This function return all the range indexes that are currently hold by any of the connections in the system + * @return list of ids of rows in MRI + */ List<UUID> getPartitionIndexes() throws MDBCServiceException; /** @@ -294,7 +308,7 @@ public interface MusicInterface { * @param digest this contain all the changes that were perfomed in this digest * @throws MDBCServiceException */ - void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException; + void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException; /** * This function is in charge of deleting old mri rows that are not longer contain @@ -303,8 +317,11 @@ public interface MusicInterface { */ void deleteOldMriRows(Map<UUID,String> oldRowsAndLocks) throws MDBCServiceException; - List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException; + List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException; - OwnershipAndCheckpoint getOwnAndCheck(); + OwnershipAndCheckpoint getOwnAndCheck(); + + + ArrayList<HashMap<Range, StagingTable>> getEveTxDigest() throws MDBCServiceException; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index 068a64d..6eacb4f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -23,27 +23,45 @@ import java.io.IOException; import java.io.Reader; import java.nio.ByteBuffer; import java.sql.Types; -import java.util.*; - +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.UUID; +import java.util.function.BiFunction; import org.apache.commons.lang.NotImplementedException; import org.apache.commons.lang3.tuple.Pair; -import org.onap.music.mdbc.*; -import org.onap.music.mdbc.ownership.Dag; -import org.onap.music.mdbc.ownership.DagNode; -import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint; -import org.onap.music.mdbc.tables.*; import org.json.JSONObject; +import org.onap.music.datastore.Condition; import org.onap.music.datastore.PreparedQueryObject; +import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.exceptions.MusicLockingException; import org.onap.music.exceptions.MusicQueryException; import org.onap.music.exceptions.MusicServiceException; -import org.onap.music.datastore.Condition; +import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.main.MusicCore; import org.onap.music.main.ResultType; import org.onap.music.main.ReturnType; - -import org.onap.music.exceptions.MDBCServiceException; -import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.mdbc.DatabasePartition; +import org.onap.music.mdbc.MDBCUtils; +import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.TableInfo; +import org.onap.music.mdbc.ownership.Dag; +import org.onap.music.mdbc.ownership.DagNode; +import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint; +import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.mdbc.tables.MusicTxDigestId; +import org.onap.music.mdbc.tables.RangeDependency; +import org.onap.music.mdbc.tables.StagingTable; +import org.onap.music.mdbc.tables.TxCommitProgress; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.ColumnDefinitions; import com.datastax.driver.core.DataType; @@ -101,6 +119,7 @@ public class MusicMixin implements MusicInterface { //\TODO Add logic to change the names when required and create the tables when necessary private String musicTxDigestTableName = "musictxdigest"; + private String musicEventualTxDigestTableName = "musicevetxdigest"; private String musicRangeInformationTableName = "musicrangeinformation"; private String musicRangeDependencyTableName = "musicrangedependency"; @@ -193,6 +212,7 @@ public class MusicMixin implements MusicInterface { private OwnershipAndCheckpoint ownAndCheck; public MusicMixin() { + //this.logger = null; this.musicAddress = null; this.music_ns = null; @@ -272,7 +292,6 @@ public class MusicMixin implements MusicInterface { Row row = rs.one(); return (row == null) ? "UNKNOWN" : row.getUUID("HOST_ID").toString(); } - private String getAllHostIds() { ResultSet results = null; try { @@ -297,7 +316,6 @@ public class MusicMixin implements MusicInterface { public String getMixinName() { return "cassandra"; } - /** * Do what is needed to close down the MUSIC connection. */ @@ -316,6 +334,7 @@ public class MusicMixin implements MusicInterface { createKeyspace(); try { createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number + createMusicEventualTxDigest(); createMusicRangeInformationTable(); createMusicRangeDependencyTable(); } @@ -1254,6 +1273,8 @@ public class MusicMixin implements MusicInterface { return newRanges; } + + protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException { UUID mriIndex = partition.getMRIIndex(); String lockId; @@ -1290,8 +1311,21 @@ public class MusicMixin implements MusicInterface { } } - public void commitLog(DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, - String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{ + + /** + * Writes the transaction information to metric's txDigest and musicRangeInformation table + * This officially commits the transaction globally + */ + @Override + public void commitLog(DatabasePartition partition, List<Range> eventualRanges, HashMap<Range,StagingTable> transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{ + + // first deal with commit for eventually consistent tables + filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper); + + // if strong consistency tables are not present in the transaction then return + if(partition == null || partition.getMRIIndex() == null) + return; + UUID mriIndex = partition.getMRIIndex(); String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex; //0. See if reference to lock was already created @@ -1312,6 +1346,10 @@ public class MusicMixin implements MusicInterface { //Add creation type of transaction digest //1. Push new row to RRT and obtain its index + if(transactionDigest == null || transactionDigest.isEmpty()) { + return; + } + String serializedTransactionDigest; try { serializedTransactionDigest = MDBCUtils.toString(transactionDigest); @@ -1328,6 +1366,66 @@ public class MusicMixin implements MusicInterface { appendToRedoLog(partition,digestId); } + private void filterAndAddEventualTxDigest(List<Range> eventualRanges, + HashMap<Range, StagingTable> transactionDigest, String txId, + TxCommitProgress progressKeeper) throws MDBCServiceException { + + if(eventualRanges == null || eventualRanges.isEmpty()) { + return; + } + + HashMap<Range,StagingTable> eventualTransactionDigest = new HashMap<Range,StagingTable>(); + + for(Range eventualRange: eventualRanges) { + transactionDigest.computeIfPresent(eventualRange, new BiFunction<Range,StagingTable,StagingTable>() { + + @Override + public StagingTable apply(Range key, StagingTable value) { + eventualTransactionDigest.put(key, value); + //transactionDigest.remove(key); + return null; + } + + }); + } + + UUID commitId = getCommitId(txId, progressKeeper); + + //1. Push new row to RRT + + String serializedTransactionDigest; + if(eventualTransactionDigest != null && !eventualTransactionDigest.isEmpty()) { + + try { + serializedTransactionDigest = MDBCUtils.toString(eventualTransactionDigest); + } catch (IOException e) { + throw new MDBCServiceException("Failed to serialized transaction digest with error "+e.toString(), e); + } + MusicTxDigestId digestId = new MusicTxDigestId(commitId,-1); + addEventualTxDigest(digestId, serializedTransactionDigest); + + if(progressKeeper!= null) { + progressKeeper.setRecordId(txId,digestId); + } + } + + + } + + private UUID getCommitId(String txId, TxCommitProgress progressKeeper) + throws MDBCServiceException { + UUID commitId; + //Generate a local commit id + if(progressKeeper.containsTx(txId)) { + commitId = progressKeeper.getCommitId(txId); + } + else{ + logger.error(EELFLoggerDelegate.errorLogger, "Tx with id "+txId+" was not created in the TxCommitProgress "); + throw new MDBCServiceException("Tx with id "+txId+" was not created in the TxCommitProgress "); + } + return commitId; + } + /** * @param tableName * @param string @@ -1452,10 +1550,10 @@ public class MusicMixin implements MusicInterface { String cql = String.format("SELECT * FROM %s.%s WHERE range = ?;", music_ns, musicRangeDependencyTableName); PreparedQueryObject pQueryObject = new PreparedQueryObject(); pQueryObject.appendQueryString(cql); - pQueryObject.addValue(baseRange.table); + pQueryObject.addValue(baseRange.getTable()); Row newRow; try { - newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,baseRange.table,null); + newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,baseRange.getTable(),null); } catch (MDBCServiceException e) { logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName); throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e); @@ -1515,12 +1613,12 @@ public class MusicMixin implements MusicInterface { public void createMusicRangeDependency(RangeDependency rangeAndDependencies) throws MDBCServiceException { StringBuilder insert = new StringBuilder("INSERT INTO ") .append(this.music_ns) - .append('.') - .append(this.musicRangeDependencyTableName) - .append(" (range,dependencies) VALUES ") - .append("(") - .append(rangeAndDependencies.getRange().table) - .append(",{"); + .append('.') + .append(this.musicRangeDependencyTableName) + .append(" (range,dependencies) VALUES ") + .append("(") + .append(rangeAndDependencies.getRange().getTable()) + .append(",{"); boolean first=true; for(Range r: rangeAndDependencies.dependentRanges()){ if(first){ first=false; } @@ -1537,16 +1635,16 @@ public class MusicMixin implements MusicInterface { private UUID createEmptyMriRow(List<Range> rangesCopy) { - //TODO: THis should call one of the other createMRIRows - UUID id = generateUniqueKey(); + //TODO: THis should call one of the other createMRIRows + UUID id = generateUniqueKey(); StringBuilder insert = new StringBuilder("INSERT INTO ") .append(this.music_ns) - .append('.') - .append(this.musicRangeInformationTableName) - .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ") - .append("(") - .append(id) - .append(",{"); + .append('.') + .append(this.musicRangeInformationTableName) + .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ") + .append("(") + .append(id) + .append(",{"); boolean first=true; for(Range r: rangesCopy){ if(first){ first=false; } @@ -1565,7 +1663,7 @@ public class MusicMixin implements MusicInterface { MusicCore.eventualPut(query); return id; } - + /** * Creates a new empty MRI row @@ -1635,14 +1733,46 @@ public class MusicMixin implements MusicInterface { public void createMusicTxDigest() throws MDBCServiceException { createMusicTxDigest(-1); } + + public void createMusicEventualTxDigest() throws MDBCServiceException { + createMusicEventualTxDigest(-1); + } /** - * This function creates the MusicTxDigest table. It contain information related to each transaction committed + * This function creates the MusicEveTxDigest table. It contain information related to each eventual transaction committed * * LeaseId: id associated with the lease, text * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later * * TransactionDigest: text that contains all the changes in the transaction */ + private void createMusicEventualTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException { + String tableName = this.musicEventualTxDigestTableName; + if(musicTxDigestTableNumber >= 0) { + tableName = tableName + + "-" + + Integer.toString(musicTxDigestTableNumber); + } + String priKey = "txTimeId"; + StringBuilder fields = new StringBuilder(); + fields.append("txid uuid, "); + fields.append("transactiondigest text, "); + fields.append("txTimeId TIMEUUID ");//notice lack of ',' + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey); + try { + executeMusicWriteQuery(this.music_ns,tableName,cql); + } catch (MDBCServiceException e) { + logger.error("Initialization error: Failure to create redo records table"); + throw(e); + } + } + + + /** + * This function creates the MusicTxDigest table. It contain information related to each transaction committed + * * LeaseId: id associated with the lease, text + * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later + * * TransactionDigest: text that contains all the changes in the transaction + */ private void createMusicTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException { String tableName = this.musicTxDigestTableName; if(musicTxDigestTableNumber >= 0) { @@ -1704,6 +1834,35 @@ public class MusicMixin implements MusicInterface { throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString(), e); } } + + /** + * Writes the Eventual transaction history to the evetxDigest + */ + @Override + public void addEventualTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException { + //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest); + PreparedQueryObject query = new PreparedQueryObject(); + String cqlQuery = "INSERT INTO " + + this.music_ns + + '.' + + this.musicEventualTxDigestTableName + + " (txid,transactiondigest,txTimeId) " + + "VALUES (" + + newId.txId + ",'" + + transactionDigest + + "'," + + // "toTimestamp(now())" + + "now()" + + ");"; + query.appendQueryString(cqlQuery); + //\TODO check if I am not shooting on my own foot + try { + MusicCore.nonKeyRelatedPut(query,"critical"); + } catch (MusicServiceException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage()); + throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString(), e); + } + } @Override public HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException { @@ -1731,7 +1890,49 @@ public class MusicMixin implements MusicInterface { } return changes; } + + + @Override + public ArrayList<HashMap<Range,StagingTable>> getEveTxDigest() throws MDBCServiceException { + HashMap<Range,StagingTable> changes; + ArrayList<HashMap<Range,StagingTable>> ecDigestList = new ArrayList<HashMap<Range,StagingTable>>(); + + //but was this timestamp is getting added as per post: https://dev.mysql.com/doc/refman/8.0/en/time-zone-leap-seconds.html + //Ex1: SELECT uuid, txDigest, UNIX_TIMESTAMP(ts) FROM ectxdigest ORDER by ts; + //Ex2: SELECT * FROM ectxdigest ORDER by ts; or SELECT * FROM ectxdigest + //####### this will pull all records.. but REPLAY will be against specific records once the NODE it back ON-Line. + // I should get the last record timestamp so that I can put a where condition. + //EX3: SELECT uuid, txDigest, UNIX_TIMESTAMP(ts) FROM ectxdigest where UNIX_TIMESTAMP(ts)>UNIX_TIMESTAMP(<<Date/Time value from others>>) ORDER by ts; + String cql = String.format("SELECT * FROM %s.%s ;", music_ns, this.musicEventualTxDigestTableName); + // Ex 1 & 2 might return millions of records!! things to consider outOfMemory issue, performance issue etc.. How to overcome?? + // Ex 3: will return less records compare to Ex:1 and Ex:2. + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + // I need to get a ResultSet of all the records and give each row to the below HashMap. + ResultSet rs = executeMusicRead(pQueryObject.toString()); + while (!rs.isExhausted()) { + Row row = rs.one(); + String digest = row.getString("transactiondigest"); + + try { + changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest); + } catch (IOException e) { + logger.error("IOException when deserializing digest"); + throw new MDBCServiceException("Deserializng digest failed with ioexception", e); + } catch (ClassNotFoundException e) { + logger.error("Deserializng digest failed with an invalid class"); + throw new MDBCServiceException("Deserializng digest failed with an invalid class", e); + } + + ecDigestList.add(changes); + } + + return ecDigestList; + } + + + ResultSet getAllMriCassandraRows() throws MDBCServiceException { StringBuilder cqlOperation = new StringBuilder(); cqlOperation.append("SELECT * FROM ") @@ -1997,7 +2198,11 @@ public class MusicMixin implements MusicInterface { @Override public OwnershipReturn own(List<Range> ranges, DatabasePartition partition, UUID opId) throws MDBCServiceException { - Map<UUID,LockResult> newLocks = new HashMap<>(); + + if(ranges == null || ranges.isEmpty()) + return null; + + Map<UUID,LockResult> newLocks = new HashMap<>(); //Init timeout clock ownAndCheck.startOwnershipTimeoutClock(opId); //Find @@ -2285,7 +2490,7 @@ public class MusicMixin implements MusicInterface { @Override public void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException{ - throw new NotImplementedException("Error, replay transaction in music mixin needs to be implemented"); + //throw new NotImplementedException("Error, replay transaction in music mixin needs to be implemented"); } @Override diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java index af935ef..25b3de0 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java @@ -17,7 +17,6 @@ * limitations under the License. * ============LICENSE_END====================================================== */ - package org.onap.music.mdbc.mixins; import java.sql.Connection; @@ -844,7 +843,7 @@ NEW.field refers to the new value jdbcConn.setAutoCommit(autocommit); } - + @Override public void disableForeignKeyChecks() throws SQLException { Statement disable = jdbcConn.createStatement(); @@ -896,7 +895,7 @@ NEW.field refers to the new value switch (op.getOperationType()) { case INSERT: sql.append(op.getOperationType() + " INTO "); - sql.append(r.table + " (") ; + sql.append(r.getTable() + " (") ; sep = ""; for (String col: cols) { sql.append(sep + col); @@ -912,7 +911,7 @@ NEW.field refers to the new value break; case UPDATE: sql.append(op.getOperationType() + " "); - sql.append(r.table + " SET "); + sql.append(r.getTable() + " SET "); sep=""; for (int i=0; i<cols.size(); i++) { sql.append(sep + cols.get(i) + "=\"" + vals.get(i) +"\""); @@ -924,7 +923,7 @@ NEW.field refers to the new value break; case DELETE: sql.append(op.getOperationType() + " FROM "); - sql.append(r.table + " WHERE "); + sql.append(r.getTable() + " WHERE "); sql.append(getPrimaryKeyConditional(op.getKey())); sql.append(";"); break; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java index a1228d5..68d1f19 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java @@ -359,7 +359,7 @@ public class Dag { Range range = rangeAndNodes.getKey(); Set<DagNode> nodes = rangeAndNodes.getValue(); if(nodes.size() > 2){ - logger.error("Range "+range.table+"has more than 2 active rows"); + logger.error("Range "+range.getTable()+"has more than 2 active rows"); throw new MDBCServiceException("Range has more than 2 active rows"); } else if(nodes.size()==2){ diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java index fa15354..1da2d79 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java @@ -19,36 +19,22 @@ */ package org.onap.music.mdbc.tables; -import java.io.IOException; import java.sql.SQLException; import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Properties; import java.util.UUID; import java.util.concurrent.TimeUnit; - -import org.json.JSONObject; -import org.onap.music.datastore.PreparedQueryObject; import org.onap.music.exceptions.MDBCServiceException; -import org.onap.music.exceptions.MusicServiceException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.mdbc.DatabasePartition; -import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.MdbcConnection; -import org.onap.music.mdbc.MdbcServerLogic; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.StateManager; -import org.onap.music.mdbc.configurations.NodeConfiguration; -import org.onap.music.mdbc.mixins.MusicMixin; import org.onap.music.mdbc.mixins.DBInterface; import org.onap.music.mdbc.mixins.MusicInterface; -import com.datastax.driver.core.Row; - public class MusicTxDigest { private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicTxDigest.class); @@ -90,7 +76,8 @@ public class MusicTxDigest { for (UUID partition : partitions) { if (!partition.equals(myPartition.getMRIIndex())) { try { - replayDigestForPartition(mi, partition, dbi); + //replayDigestForPartition(mi, partition, dbi); + mi.getOwnAndCheck().warmup(mi, dbi, myPartition.getSnapshot()); } catch (MDBCServiceException e) { logger.error("Unable to update for partition : " + partition + ". " + e.getMessage()); continue; @@ -98,33 +85,71 @@ public class MusicTxDigest { } } } + + //Step 3: ReplayDigest() for E.C conditions + try { + replayDigest(mi,dbi); + } catch (MDBCServiceException e) { + logger.error("Unable to perform Eventual Consistency operations" + e.getMessage()); + continue; + } + } } - + /** - * Replay the digest for a given partition + * Replay the digest for eventual consistency. * @param mi music interface * @param partitionId the partition to be replayed * @param dbi interface to the database that will replay the operations * @throws MDBCServiceException */ - public static void replayDigestForPartition(MusicInterface mi, UUID partitionId, DBInterface dbi) throws MDBCServiceException { - List<MusicTxDigestId> partitionsRedoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog(); - for (MusicTxDigestId txId: partitionsRedoLogTxIds) { - HashMap<Range, StagingTable> transaction = mi.getTxDigest(txId); + public static void replayDigest(MusicInterface mi, DBInterface dbi) throws MDBCServiceException { + //List<MusicTxDigestId> partitionsRedoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog(); + //From where should I fetch TransactionsIDs ??? from NEW TABLE ?? or EXISING TABLE ?? << what the new SITE_TABLE details?? + // --> It is a new table called ECTxDigest + //I should sort/ call a method which gives all the entires of a table based on the time-stamp from Low to High + + ArrayList<HashMap<Range, StagingTable>> ecTxDigest = mi.getEveTxDigest(); + + //for (MusicTxDigestId txId: partitionsRedoLogTxIds) { // partitionsRedoLogTxIds --> this comes from new table where timeStamp > currentTimeStamp ( THIS SHOULD BE lessthan.. which is ASC order) + //HashMap<Range, StagingTable> transaction = mi2.getEcTxDigest(); // Getting records from musictxdigest TABLE. + for (HashMap<Range, StagingTable> transaction: ecTxDigest) { try { - //\TODO do this two operations in parallel - dbi.replayTransaction(transaction); - mi.replayTransaction(transaction); + dbi.replayTransaction(transaction); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??) } catch (SQLException e) { - logger.error("Rolling back the entire digest replay. " + partitionId); + logger.error("EC:Rolling back the entire digest replay."); return; } - logger.info("Successfully replayed transaction " + txId); + logger.info("EC: Successfully replayed transaction "); } - //todo, keep track of where I am in pointer } + + /** + * Replay the digest for a given partition + * @param mi music interface + * @param partitionId the partition to be replayed + * @param dbi interface to the database that will replay the operations + * @throws MDBCServiceException + */ + public static void replayDigestForPartition(MusicInterface mi, UUID partitionId, DBInterface dbi) throws MDBCServiceException { + List<MusicTxDigestId> partitionsRedoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog(); + for (MusicTxDigestId txId: partitionsRedoLogTxIds) { + HashMap<Range, StagingTable> transaction = mi.getTxDigest(txId); + try { + //\TODO do this two operations in parallel + dbi.replayTransaction(transaction); + mi.replayTransaction(transaction); + } catch (SQLException e) { + logger.error("Rolling back the entire digest replay. " + partitionId); + return; + } + logger.info("Successfully replayed transaction " + txId); + } + //todo, keep track of where I am in pointer + } + /** * Start the background daemon defined by this object * Spawns a new thread and runs "backgroundDaemon" |