diff options
31 files changed, 3228 insertions, 408 deletions
diff --git a/mdbc-server/pom.xml b/mdbc-server/pom.xml index 1a5a6db..8edb5ab 100755 --- a/mdbc-server/pom.xml +++ b/mdbc-server/pom.xml @@ -183,6 +183,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-handler</artifactId> + <version>4.1.30.Final</version> + </dependency> </dependencies> <build> diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java index 9752dcb..ff8eb80 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java @@ -1,3 +1,4 @@ + /* * ============LICENSE_START==================================================== * org.onap.music.mdbc @@ -24,76 +25,87 @@ import java.io.FileNotFoundException; import java.io.FileReader; import java.util.*; -import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; import com.google.gson.Gson; import com.google.gson.GsonBuilder; /** * A database range contain information about what ranges should be hosted in the current MDBC instance - * A database range with an empty map, is supposed to contain all the tables in Music. - * @author Enrique Saurez + * A database range with an empty map, is supposed to contain all the tables in Music. + * @author Enrique Saurez */ public class DatabasePartition { private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(DatabasePartition.class); - private UUID mriIndex;//Index that can be obtained either from + private UUID musicRangeInformationIndex;//Index that can be obtained either from private String lockId; protected List<Range> ranges; - private List<UUID> oldMRIIds; + + private boolean ready; /** * Each range represents a partition of the database, a database partition is a union of this partitions. * The only requirement is that the ranges are not overlapping. */ + public DatabasePartition() { + this(new ArrayList<Range>(),null,""); + } + public DatabasePartition(UUID mriIndex) { this(new ArrayList<Range>(), mriIndex,""); } public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String lockId) { - this.ranges = knownRanges; + if(mriIndex==null){ + ready = false; + } + else{ + ready = true; + } + ranges = knownRanges; - this.mriIndex = mriIndex; - this.lockId = lockId; - this.oldMRIIds = new ArrayList<>(); + this.setMusicRangeInformationIndex(mriIndex); + this.setLockId(lockId); } - public DatabasePartition(UUID rangeId, String lockId, List<Range> ranges, List<UUID> oldIds) { - this.mriIndex = rangeId; - this.lockId = lockId; - this.ranges = ranges; - this.oldMRIIds = oldIds; - } - - /** + /** * This function is used to change the contents of this, with the contents of a different object * @param otherPartition partition that is used to substitute the local contents */ public void updateDatabasePartition(DatabasePartition otherPartition){ - mriIndex = otherPartition.mriIndex;//Index that can be obtained either from + musicRangeInformationIndex = otherPartition.musicRangeInformationIndex;//Index that can be obtained either from lockId = otherPartition.lockId; ranges = otherPartition.ranges; + ready = otherPartition.ready; } public String toString(){ - StringBuilder builder = new StringBuilder().append("Row: ["+mriIndex+"], lockId: ["+lockId +"], ranges: ["); - for(Range r: ranges){ - builder.append(r.toString()).append(","); - } - builder.append("]"); - return builder.toString(); + StringBuilder builder = new StringBuilder().append("Row: ["+musicRangeInformationIndex.toString()+"], lockId: ["+lockId +"], ranges: ["); + for(Range r: ranges){ + builder.append(r.toString()).append(","); + } + builder.append("]"); + return builder.toString(); } public boolean isLocked(){return lockId != null && !lockId.isEmpty(); } + public boolean isReady() { + return ready; + } + + public void setReady(boolean ready) { + this.ready = ready; + } + public UUID getMRIIndex() { - return mriIndex; + return musicRangeInformationIndex; } public void setMusicRangeInformationIndex(UUID musicRangeInformationIndex) { - this.mriIndex = musicRangeInformationIndex; + this.musicRangeInformationIndex = musicRangeInformationIndex; } /** @@ -130,7 +142,7 @@ public class DatabasePartition { public synchronized List<Range> getSnapshot() { List<Range> newRange = new ArrayList<>(); for(Range r : ranges){ - newRange.add(r.clone()); + newRange.add(r.clone()); } return newRange; } @@ -147,7 +159,7 @@ public class DatabasePartition { } /** - * Function to obtain the configuration + * Function to obtain the configuration * @param filepath path to the database range * @return a new object of type DatabaseRange * @throws FileNotFoundException @@ -175,27 +187,12 @@ public class DatabasePartition { this.lockId = lockId; } - /** - * This function is used to check if we need to create a new row in MRI, beacause one of the new ranges is not contained - * @param ranges ranges that should be contained in the partition - * @param partition currently own partition - * @return - * - */ - public boolean owns(List<Range> ranges) { - for (Range r: ranges) { - if (!this.ranges.contains(r)) { - return false; - } - } - return true; - } - - public List<UUID> getOldMRIIds() { - return oldMRIIds; - } - - public void setOldMRIIds(List<UUID> oldIds) { - this.oldMRIIds = oldIds; - } -} + public boolean isContained(Range range){ + for(Range r: ranges){ + if(r.overlaps(range)){ + return true; + } + } + return false; + } +}
\ No newline at end of file diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java index 3f45d98..8aca034 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java @@ -103,6 +103,12 @@ public class MDBCUtils { return UUIDs.random(); } + /** + * This function is used to generate time based cassandra uuid + * @return a timebased UUID that can be used for fields of type uuid in MUSIC/Cassandra + */ + public static UUID generateTimebasedUniqueKey() {return UUIDs.timeBased();} + public static Properties getMdbcProperties() { Properties prop = new Properties(); InputStream input = null; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java index bd0862d..6c1163a 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java @@ -34,16 +34,10 @@ import java.sql.SQLXML; import java.sql.Savepoint; import java.sql.Statement; import java.sql.Struct; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.Executor; +import org.apache.commons.lang3.NotImplementedException; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.exceptions.QueryException; import org.onap.music.logging.EELFLoggerDelegate; @@ -51,10 +45,16 @@ import org.onap.music.logging.format.AppMessages; import org.onap.music.logging.format.ErrorSeverity; import org.onap.music.logging.format.ErrorTypes; import org.onap.music.mdbc.mixins.DBInterface; +import org.onap.music.mdbc.mixins.LockResult; import org.onap.music.mdbc.mixins.MixinFactory; import org.onap.music.mdbc.mixins.MusicInterface; +import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn; +import org.onap.music.mdbc.ownership.Dag; +import org.onap.music.mdbc.ownership.DagNode; +import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint; import org.onap.music.mdbc.query.QueryProcessor; import org.onap.music.mdbc.tables.MusicTxDigest; +import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.TxCommitProgress; @@ -70,8 +70,8 @@ import org.onap.music.mdbc.tables.TxCommitProgress; public class MdbcConnection implements Connection { private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MdbcConnection.class); - private final String id; // This is the transaction id, assigned to this connection. There is no need to change the id, if connection is reused - private final Connection jdbcConn; // the JDBC Connection to the actual underlying database + private final String id; // This is the transaction id, assigned to this connection. There is no need to change the id, if connection is reused + private final Connection jdbcConn; // the JDBC Connection to the actual underlying database private final MusicInterface mi; private final TxCommitProgress progressKeeper; private final DBInterface dbi; @@ -81,7 +81,7 @@ public class MdbcConnection implements Connection { private DatabasePartition partition; public MdbcConnection(String id, String url, Connection c, Properties info, MusicInterface mi, - TxCommitProgress progressKeeper, DatabasePartition partition, StateManager statemanager) throws MDBCServiceException { + TxCommitProgress progressKeeper, DatabasePartition partition, StateManager statemanager) throws MDBCServiceException { this.id = id; this.table_set = Collections.synchronizedSet(new HashSet<String>()); this.transactionDigest = new HashMap<Range,StagingTable>(); @@ -162,7 +162,7 @@ public class MdbcConnection implements Connection { throw new SQLException("tx id is null"); } try { - mi.commitLog(partition, transactionDigest, id, progressKeeper); + mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper); } catch (MDBCServiceException e) { // TODO Auto-generated catch block logger.error("Cannot commit log to music" + e.getStackTrace()); @@ -204,7 +204,7 @@ public class MdbcConnection implements Connection { try { logger.debug(EELFLoggerDelegate.applicationLogger, " commit "); // transaction was committed -- add all the updates into the REDO-Log in MUSIC - mi.commitLog(partition, transactionDigest, id, progressKeeper); + mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper); } catch (MDBCServiceException e) { //If the commit fail, then a new commitId should be used logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL); @@ -227,7 +227,8 @@ public class MdbcConnection implements Connection { //\TODO try to execute outside of the critical path of commit try { - relinquishIfRequired(partition); + if(partition != null) + relinquishIfRequired(partition); } catch (MDBCServiceException e) { logger.warn("Error trying to relinquish: "+partition.toString()); } @@ -490,11 +491,20 @@ public class MdbcConnection implements Connection { //Parse tables from the sql query Map<String, List<String>> tableToInstruction = QueryProcessor.extractTableFromQuery(sql); //Check ownership of keys - this.partition = statemanager.own(this.id, MDBCUtils.getTables(tableToInstruction), dbi); + List<Range> queryTables = MDBCUtils.getTables(tableToInstruction); + // filter out ranges that fall under Eventually consistent + // category as these tables do not need ownership + List<Range> scQueryTables = filterEveTables( queryTables); + this.partition = own(scQueryTables); dbi.preStatementHook(sql); } + private List<Range> filterEveTables(List<Range> queryTables) { + queryTables.removeAll(statemanager.getEventualRanges()); + return queryTables; + } + /** * Code to be run within the DB driver after a SQL statement has been executed. This is where remote * statement actions can be copied back to Cassandra/MUSIC. @@ -510,7 +520,7 @@ public class MdbcConnection implements Connection { * in order to prevent multiple threads from running this code in parallel. */ public void synchronizeTables() throws QueryException { - Set<String> set1 = dbi.getSQLTableSet(); // set of tables in the database + Set<String> set1 = dbi.getSQLTableSet(); // set of tables in the database logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1); for (String tableName : set1) { // This map will be filled in if this table was previously discovered @@ -541,6 +551,27 @@ public class MdbcConnection implements Connection { return this.dbi; } + private DatabasePartition own(List<Range> ranges) throws MDBCServiceException { + DatabasePartition newPartition = null; + OwnershipAndCheckpoint ownAndCheck = mi.getOwnAndCheck(); + UUID ownOpId = MDBCUtils.generateTimebasedUniqueKey(); + try { + final OwnershipReturn ownershipReturn = mi.own(ranges, partition, ownOpId); + Dag dag = ownershipReturn.getDag(); + DagNode node = dag.getNode(ownershipReturn.getRangeId()); + MusicRangeInformationRow row = node.getRow(); + Map<MusicRangeInformationRow, LockResult> lock = new HashMap<>(); + lock.put(row, new LockResult(row.getPartitionIndex(), ownershipReturn.getOwnerId(), true, ranges)); + ownAndCheck.checkpoint(this.mi, this.dbi, dag, ranges, lock, ownershipReturn.getOwnershipId()); + newPartition = new DatabasePartition(ownershipReturn.getRanges(), ownershipReturn.getRangeId(), + ownershipReturn.getOwnerId()); + } + finally{ + ownAndCheck.stopOwnershipTimeoutClock(ownOpId); + } + return newPartition; + } + public void relinquishIfRequired(DatabasePartition partition) throws MDBCServiceException { mi.relinquishIfRequired(partition); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java index e35c214..9fb36ae 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java @@ -58,6 +58,8 @@ public class MdbcServerLogic extends JdbcMeta{ public MdbcServerLogic(String Url, Properties info, NodeConfiguration config) throws SQLException, MDBCServiceException { super(Url,info); this.manager = new StateManager(Url,info,config.nodeName, config.sqlDatabaseName); //FIXME: db name should not be passed in ahead of time + manager.setEventualRanges(config.getEventual().getRanges()); + this.info = info; int concurrencyLevel = Integer.parseInt( info.getProperty(ConnectionCacheSettings.CONCURRENCY_LEVEL.key(), diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java index 4bccbba..efc89cd 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java @@ -33,10 +33,10 @@ public class Range implements Serializable, Cloneable{ private static final long serialVersionUID = 1610744496930800088L; - public String table; + private String table; public Range(String table) { - this.table = table; + this.table = table.toUpperCase(); } public String toString(){return table;} @@ -51,12 +51,12 @@ public class Range implements Serializable, Cloneable{ if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Range r = (Range) o; - return (table.equals(r.table)); + return (this.overlaps(r)) && (r.overlaps(this)); } @Override public int hashCode(){ - return Objects.hash(table); + return table.hashCode(); } @Override @@ -76,4 +76,8 @@ public class Range implements Serializable, Cloneable{ return table.equals(other.table); } + public String getTable() { + return table; + } + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index 9735800..4c5a3ed 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -21,6 +21,9 @@ package org.onap.music.mdbc; import java.util.ArrayList; import java.util.List; +import java.util.Set; + +import org.apache.commons.lang3.NotImplementedException; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.logging.format.AppMessages; @@ -29,6 +32,7 @@ import org.onap.music.logging.format.ErrorTypes; import org.onap.music.mdbc.mixins.DBInterface; import org.onap.music.mdbc.mixins.MixinFactory; import org.onap.music.mdbc.mixins.MusicInterface; +import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn; import org.onap.music.mdbc.tables.MusicTxDigest; import org.onap.music.mdbc.tables.TxCommitProgress; @@ -75,7 +79,7 @@ public class StateManager { /** Identifier for this server instance */ private String mdbcServerName; private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition - + private List<Range> eventualRanges; public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException { this.sqlDBName = sqlDBName; @@ -146,6 +150,14 @@ public class StateManager { } + public List<Range> getEventualRanges() { + return eventualRanges; + } + + public void setEventualRanges(List<Range> eventualRanges) { + this.eventualRanges = eventualRanges; + } + public void closeConnection(String connectionId){ //\TODO check if there is a race condition if(mdbcConnections.containsKey(connectionId)) { @@ -169,7 +181,6 @@ public class StateManager { /** * Opens a connection into database, setting up all necessary triggers, etc * @param id UUID of a connection - * @param information */ public Connection openConnection(String id) { Connection sqlConnection; @@ -241,20 +252,6 @@ public class StateManager { return openConnection(id); } - public DatabasePartition own(String mdbcConnectionId, List<Range> ranges, DBInterface dbi) throws MDBCServiceException { - DatabasePartition partition = musicInterface.own(ranges, connectionRanges.get(mdbcConnectionId)); - List<UUID> oldRangeIds = partition.getOldMRIIds(); - //\TODO: do in parallel for all range ids - for(UUID oldRange : oldRangeIds) { - MusicTxDigest.replayDigestForPartition(musicInterface, oldRange,dbi); - } - logger.info("Partition: " + partition.getMRIIndex() + " now owns " + ranges); - connectionRanges.put(mdbcConnectionId, partition); - return partition; - } - - - public void initializeSystem() { //\TODO Prefetch data to system using the data ranges as guide throw new UnsupportedOperationException("Function initialize system needs to be implemented id MdbcStateManager"); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java new file mode 100644 index 0000000..0021bcc --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java @@ -0,0 +1,50 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ +package org.onap.music.mdbc.configurations; + +import java.util.List; +import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.mdbc.Range; + +/** + * This class represents meta information of tables categorized as eventually consistent + */ +public class Eventual { + + private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Eventual.class); + + protected List<Range> ranges; + + public Eventual(List<Range> ranges) { + super(); + this.ranges = ranges; + } + + public List<Range> getRanges() { + return ranges; + } + + public void setRanges(List<Range> ranges) { + this.ranges = ranges; + } + + + +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java index 7a6aad7..96dc65f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java @@ -37,12 +37,14 @@ public class NodeConfiguration { private static transient final EELFLoggerDelegate LOG = EELFLoggerDelegate.getLogger(NodeConfiguration.class); public DatabasePartition partition; + public Eventual eventual; public String nodeName; public String sqlDatabaseName; public NodeConfiguration(String tables, UUID mriIndex, String sqlDatabaseName, String node){ // public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String mriTable, String lockId, String musicTxDigestTable) { partition = new DatabasePartition(toRanges(tables), mriIndex, null) ; + eventual = new Eventual(toRanges(tables)); this.nodeName = node; this.sqlDatabaseName = sqlDatabaseName; } @@ -87,4 +89,12 @@ public class NodeConfiguration { NodeConfiguration config = gson.fromJson(br, NodeConfiguration.class); return config; } + + public Eventual getEventual() { + return eventual; + } + + public void setEventual(Eventual eventual) { + this.eventual = eventual; + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java index fac47c5..5beb6b7 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java @@ -88,7 +88,7 @@ public class TablesConfiguration { partitionId = partitionInfo.partitionId; } //2) Create a row in the transaction information table - UUID mriTableIndex = MDBCUtils.generateUniqueKey(); + UUID mriTableIndex = MDBCUtils.generateTimebasedUniqueKey(); //3) Add owner and tit information to partition info table RedoRow newRedoRow = new RedoRow(mriTableName,mriTableIndex); //DatabaseOperations.updateRedoRow(musicNamespace,pitName,partitionId,newRedoRow,partitionInfo.owner,null); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java new file mode 100644 index 0000000..338e697 --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java @@ -0,0 +1,139 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ +package org.onap.music.mdbc.examples; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +public class MdbcEventualTestClient { + + + + public static void main(String[] args){ + try { + Class.forName("org.apache.calcite.avatica.remote.Driver"); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + System.exit(1); + } + Connection connection; + try { + connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf"); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + try { + connection.setAutoCommit(false); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + + final String sql = "CREATE TABLE IF NOT EXISTS audit_log (\n" + + " id int,\n" + + " PersonID int,\n" + + " timeID bigint,\n" + + " PRIMARY KEY (id)" + + ");"; + Statement stmt; + try { + stmt = connection.createStatement(); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + boolean execute = true; +// try { +// execute = stmt.execute(sql); +// } catch (SQLException e) { +// e.printStackTrace(); +// return; +// } + + if (execute) { + try { + connection.commit(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + + try { + stmt.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + + final String insertSQL = "INSERT INTO audit_log VALUES (1, 123, 123456789);"; + final String insertSQL1 = "DELETE FROM audit_log WHERE PersonID=1;"; + final String insertSQL2 = "INSERT INTO audit_log VALUES (1, 123, 123456789);"; + final String insertSQL3 = "UPDATE audit_log SET PersonID=124 where id=1;"; + final String insertSQL4 = "INSERT INTO audit_log VALUES (2, 234, 123456789);"; + + + Statement insertStmt; + try { + insertStmt = connection.createStatement(); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + try { + execute = insertStmt.execute(insertSQL); + execute = insertStmt.execute(insertSQL1); + execute = insertStmt.execute(insertSQL2); + execute = insertStmt.execute(insertSQL3); + execute = insertStmt.execute(insertSQL4); + + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + try { + connection.commit(); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + try { + stmt.close(); + insertStmt.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + + try { + connection.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + + + } +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java index 383b4b3..01d346c 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java @@ -112,12 +112,16 @@ public interface DBInterface { String getPrimaryKey(String sql, String tableName); - String applyDigest(Map<Range,StagingTable> digest); - /** * Replay a given TxDigest into the local DB * @param digest * @throws SQLException if replay cannot occur correctly */ void replayTransaction(HashMap<Range,StagingTable> digest) throws SQLException; + + void disableForeignKeyChecks() throws SQLException; + + void enableForeignKeyChecks() throws SQLException; + + void applyTxDigest(HashMap<Range, StagingTable> txDigest) throws SQLException; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java new file mode 100644 index 0000000..7dd92c4 --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ + +package org.onap.music.mdbc.mixins; + +import java.util.List; +import java.util.UUID; +import org.onap.music.mdbc.Range; + +public class LockResult{ + private final UUID musicRangeInformationIndex; + private final String ownerId; + private List<Range> ranges; + private final boolean newLock; + + public LockResult(UUID rowId, String ownerId, boolean newLock, List<Range> ranges){ + this.musicRangeInformationIndex = rowId; + this.ownerId=ownerId; + this.newLock=newLock; + this.ranges=ranges; + } + public String getOwnerId(){ + return ownerId; + } + public boolean isNewLock(){ + return newLock; + } + public UUID getIndex() {return musicRangeInformationIndex;} + public List<Range> getRanges() {return ranges;} + public void addRange(Range range){ranges.add(range);} +}
\ No newline at end of file diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java index 12fe873..35b6121 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java @@ -19,23 +19,26 @@ */ package org.onap.music.mdbc.mixins; +import com.datastax.driver.core.ResultSet; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; - import org.json.JSONObject; - import org.onap.music.exceptions.MDBCServiceException; -import org.onap.music.exceptions.MusicServiceException; import org.onap.music.exceptions.MusicLockingException; +import org.onap.music.exceptions.MusicServiceException; import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; +import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.StagingTable; -import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.TxCommitProgress; +import org.onap.music.mdbc.ownership.Dag; +import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint; +import org.onap.music.mdbc.tables.*; /** * This Interface defines the methods that MDBC needs for a class to provide access to the persistence layer of MUSIC. @@ -43,6 +46,29 @@ import org.onap.music.mdbc.tables.TxCommitProgress; * @author Robert P. Eby */ public interface MusicInterface { + class OwnershipReturn{ + private final UUID ownershipId; + private final String ownerId; + private final UUID rangeId; + private final List<Range> ranges; + private final Dag dag; + public OwnershipReturn(UUID ownershipId, String ownerId, UUID rangeId, List<Range> ranges, Dag dag){ + this.ownershipId=ownershipId; + this.ownerId=ownerId; + this.rangeId=rangeId; + this.ranges=ranges; + this.dag=dag; + } + public String getOwnerId(){ + return ownerId; + } + public UUID getRangeId(){ + return rangeId; + } + public List<Range> getRanges(){ return ranges; } + public Dag getDag(){return dag;} + public UUID getOwnershipId() { return ownershipId; } + } /** * Get the name of this MusicInterface mixin object. * @return the name @@ -159,13 +185,14 @@ public interface MusicInterface { * Commits the corresponding REDO-log into MUSIC * * @param partition information related to ownership of partitions, used to verify ownership when commiting the Tx + * @param eventualRanges * @param transactionDigest digest of the transaction that is being committed into the Redo log in music. It has to * be a HashMap, because it is required to be serializable * @param txId id associated with the log being send * @param progressKeeper data structure that is used to handle to detect failures, and know what to do * @throws MDBCServiceException */ - void commitLog(DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException; + void commitLog(DatabasePartition partition, List<Range> eventualRanges, HashMap<Range,StagingTable> transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException; /** @@ -177,6 +204,14 @@ public interface MusicInterface { MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException; /** + * This function is used to get the dependencies of a given range + * @param baseRange range for which we search the dependencies + * @return dependencies + * @throws MDBCServiceException + */ + RangeDependency getMusicRangeDependency(Range baseRange) throws MDBCServiceException; + + /** * This function is used to create a new row in the MRI table * @param info the information used to create the row * @return the new partition object that contain the new information used to create the row @@ -185,6 +220,13 @@ public interface MusicInterface { DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException; /** + * This function is used to create all the required music dependencies + * @param rangeAndDependencies + * @throws MDBCServiceException + */ + void createMusicRangeDependency(RangeDependency rangeAndDependencies) throws MDBCServiceException; + + /** * This function is used to append an index to the redo log in a MRI row * @param partition information related to ownership of partitions, used to verify ownership * @param newRecord index of the new record to be appended to the redo log @@ -199,6 +241,16 @@ public interface MusicInterface { * @throws MDBCServiceException */ void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException; + + /** + * This functions adds the eventual tx digest to + * @param newId id used as index in the MTD table + * @param transactionDigest digest that contains all the changes performed in the transaction + * @throws MDBCServiceException + */ + + void addEventualTxDigest(MusicTxDigestId newId, String transactionDigest) + throws MDBCServiceException; /** * Function used to retrieve a given transaction digest and deserialize it @@ -212,10 +264,12 @@ public interface MusicInterface { * Use this functions to verify ownership, and own new ranges * @param ranges the ranges that should be own after calling this function * @param partition current information of the ownership in the system - * @return a partition indicating the status of the own function result + * @param ownOpId is the id used to describe this ownership operation (it is not used to create the new row, if any is + * required + * @return an object indicating the status of the own function result * @throws MDBCServiceException */ - DatabasePartition own(List<Range> ranges, DatabasePartition partition) throws MDBCServiceException; + OwnershipReturn own(List<Range> ranges, DatabasePartition partition, UUID ownOpId) throws MDBCServiceException; /** * This function relinquish ownership, if it is time to do it, it should be used at the end of a commit operation @@ -225,6 +279,17 @@ public interface MusicInterface { void relinquishIfRequired(DatabasePartition partition) throws MDBCServiceException; /** + * This function is in charge of owning all the ranges requested and creating a new row that show the ownership of all + * those ranges. + * @param rangeId new id to be used in the new row + * @param ranges ranges to be owned by the end of the function called + * @param partition current ownership status + * @return + * @throws MDBCServiceException + */ + //OwnershipReturn appendRange(String rangeId, List<Range> ranges, DatabasePartition partition) throws MDBCServiceException; + + /** * This functions relinquishes a range * @param ownerId id of the current ownerh * @param rangeId id of the range to be relinquished @@ -238,6 +303,25 @@ public interface MusicInterface { */ List<UUID> getPartitionIndexes() throws MDBCServiceException; - void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException; + /** + * This function is in charge of applying the transaction digests to the MUSIC tables. + * @param digest this contain all the changes that were perfomed in this digest + * @throws MDBCServiceException + */ + void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException; + + /** + * This function is in charge of deleting old mri rows that are not longer contain + * @param oldRowsAndLocks is a map + * @throws MDBCServiceException + */ + void deleteOldMriRows(Map<UUID,String> oldRowsAndLocks) throws MDBCServiceException; + + List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException; + + OwnershipAndCheckpoint getOwnAndCheck(); + + + ArrayList<HashMap<Range, StagingTable>> getEveTxDigest() throws MDBCServiceException; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index 400956e..6eacb4f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -27,36 +27,41 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Properties; import java.util.Random; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; - +import java.util.function.BiFunction; import org.apache.commons.lang.NotImplementedException; -import org.onap.music.mdbc.*; -import org.onap.music.mdbc.tables.MusicTxDigestId; -import org.onap.music.mdbc.tables.StagingTable; -import org.onap.music.mdbc.tables.MusicRangeInformationRow; -import org.onap.music.mdbc.tables.TxCommitProgress; -import org.onap.music.service.impl.MusicCassaCore; +import org.apache.commons.lang3.tuple.Pair; import org.json.JSONObject; -import org.onap.music.lockingservice.cassandra.CassaLockStore; +import org.onap.music.datastore.Condition; import org.onap.music.datastore.PreparedQueryObject; +import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.exceptions.MusicLockingException; import org.onap.music.exceptions.MusicQueryException; import org.onap.music.exceptions.MusicServiceException; -import org.onap.music.datastore.Condition; +import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.main.MusicCore; import org.onap.music.main.ResultType; import org.onap.music.main.ReturnType; - -import org.onap.music.exceptions.MDBCServiceException; -import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.mdbc.DatabasePartition; +import org.onap.music.mdbc.MDBCUtils; +import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.TableInfo; +import org.onap.music.mdbc.ownership.Dag; +import org.onap.music.mdbc.ownership.DagNode; +import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint; +import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.mdbc.tables.MusicTxDigestId; +import org.onap.music.mdbc.tables.RangeDependency; +import org.onap.music.mdbc.tables.StagingTable; +import org.onap.music.mdbc.tables.TxCommitProgress; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.ColumnDefinitions; import com.datastax.driver.core.DataType; @@ -96,12 +101,16 @@ public class MusicMixin implements MusicInterface { public static final String KEY_MUSIC_RFACTOR = "music_rfactor"; /** The property name to use to provide the replication factor for Cassandra. */ public static final String KEY_MUSIC_NAMESPACE = "music_namespace"; + /** The property name to use to provide a timeout to mdbc (ownership) */ + public static final String KEY_TIMEOUT = "mdbc_timeout"; /** Namespace for the tables in MUSIC (Cassandra) */ public static final String DEFAULT_MUSIC_NAMESPACE = "namespace"; /** The default property value to use for the Cassandra IP address. */ public static final String DEFAULT_MUSIC_ADDRESS = "localhost"; /** The default property value to use for the Cassandra replication factor. */ public static final int DEFAULT_MUSIC_RFACTOR = 1; + /** The default property value to use for the MDBC timeout */ + public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours /** The default primary string column, if none is provided. */ public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid"; /** Type of the primary key, if none is defined by the user */ @@ -110,26 +119,58 @@ public class MusicMixin implements MusicInterface { //\TODO Add logic to change the names when required and create the tables when necessary private String musicTxDigestTableName = "musictxdigest"; + private String musicEventualTxDigestTableName = "musicevetxdigest"; private String musicRangeInformationTableName = "musicrangeinformation"; + private String musicRangeDependencyTableName = "musicrangedependency"; private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicMixin.class); - private class LockResult{ - private final UUID musicRangeInformationIndex; - private final String ownerId; - private final boolean newLock; - public LockResult(UUID rowId, String ownerId, boolean newLock){ - this.musicRangeInformationIndex = rowId; - this.ownerId=ownerId; - this.newLock=newLock; + + + private class RangeMriRow{ + private MusicRangeInformationRow currentRow; + private List<MusicRangeInformationRow> oldRows; + private final Range range; + public RangeMriRow(Range range) { + this.range = range; + oldRows = new ArrayList<>(); + } + Range getRange(){ + return range; + } + public MusicRangeInformationRow getCurrentRow(){ + return currentRow; } - public String getOwnerId(){ - return ownerId; + public void setCurrentRow(MusicRangeInformationRow row){ + currentRow=row; } - public boolean getNewLock(){ - return newLock; + public void addOldRow(MusicRangeInformationRow row){ + oldRows.add(row); + } + public List<MusicRangeInformationRow> getOldRows(){ + return oldRows; + } + } + + private class LockRequest{ + private final String table; + private final UUID id; + private final List<Range> toLockRanges; + public LockRequest(String table, UUID id, List<Range> toLockRanges){ + this.table=table; + this.id=id; + this.toLockRanges=toLockRanges; + } + public UUID getId() { + return id; + } + public List<Range> getToLockRanges() { + return toLockRanges; + } + + public String getTable() { + return table; } - public UUID getIndex() {return musicRangeInformationIndex;} } @@ -161,13 +202,17 @@ public class MusicMixin implements MusicInterface { protected final String[] allReplicaIds; private final String musicAddress; private final int music_rfactor; + protected long timeout; private MusicConnector mCon = null; private Session musicSession = null; private boolean keyspace_created = false; private Map<String, PreparedStatement> ps_cache = new HashMap<>(); private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>()); + private Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied; + private OwnershipAndCheckpoint ownAndCheck; public MusicMixin() { + //this.logger = null; this.musicAddress = null; this.music_ns = null; @@ -195,6 +240,12 @@ public class MusicMixin implements MusicInterface { this.music_ns = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE); logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns); + String t = info.getProperty(KEY_TIMEOUT); + this.timeout = (t == null) ? DEFAULT_TIMEOUT : Integer.parseInt(t); + + alreadyApplied = new HashMap<>(); + ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied,timeout); + initializeMetricTables(); } @@ -283,7 +334,9 @@ public class MusicMixin implements MusicInterface { createKeyspace(); try { createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number + createMusicEventualTxDigest(); createMusicRangeInformationTable(); + createMusicRangeDependencyTable(); } catch(MDBCServiceException e){ logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC"); @@ -1069,6 +1122,22 @@ public class MusicMixin implements MusicInterface { return query; } + private PreparedQueryObject createChangeIsLatestToMriQuery(String mriTable, UUID uuid, String table, boolean isLatest){ + PreparedQueryObject query = new PreparedQueryObject(); + StringBuilder appendBuilder = new StringBuilder(); + appendBuilder.append("UPDATE ") + .append(music_ns) + .append(".") + .append(mriTable) + .append(" SET islatest =") + .append(isLatest) + .append(" WHERE rangeid = ") + .append(uuid) + .append(";"); + query.appendQueryString(appendBuilder.toString()); + return query; + } + protected ReturnType acquireLock(String fullyQualifiedKey, String lockId) throws MDBCServiceException{ ReturnType lockReturn; //\TODO Handle better failures to acquire locks @@ -1087,36 +1156,132 @@ public class MusicMixin implements MusicInterface { return lockReturn; } - protected DatabasePartition waitForLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException { + private void addRange(Map<UUID,List<Range>> container, UUID index, Range range){ + if(!container.containsKey(index)){ + container.put(index,new ArrayList<Range>()); + } + container.get(index).add(range); + } + + private void addRows(Map<UUID,List<Range>> container, RangeMriRow newRow, Range range){ + //First add current row + MusicRangeInformationRow currentRow = newRow.getCurrentRow(); + addRange(container,currentRow.getPartitionIndex(),range); + for(MusicRangeInformationRow row : newRow.getOldRows()){ + addRange(container,row.getPartitionIndex(),range); + } + } + + private NavigableMap<UUID, List<Range>> getPendingRows(Map<Range, RangeMriRow> rangeRows){ + NavigableMap<UUID,List<Range>> pendingRows = new TreeMap<>(); + rangeRows.forEach((key, value) -> { + addRows(pendingRows,value,key); + }); + return pendingRows; + } + + private List<Range> lockRow(LockRequest request,Map.Entry<UUID, List<Range>> pending,Map<UUID, String> currentLockRef, + String fullyQualifiedKey, String lockId, List<Range> pendingToLock, + Map<UUID, LockResult> alreadyHeldLocks) + throws MDBCServiceException{ + List<Range> newRanges = new ArrayList<>(); + String newFullyQualifiedKey = music_ns + "." + request.getTable() + "." + pending.getKey().toString(); + String newLockId; + boolean success; + if (currentLockRef.containsKey(pending.getKey())) { + newLockId = currentLockRef.get(pending.getKey()); + success = (MusicCore.whoseTurnIsIt(newFullyQualifiedKey) == newLockId); + } else { + newLockId = MusicCore.createLockReference(newFullyQualifiedKey); + ReturnType newLockReturn = acquireLock(fullyQualifiedKey, lockId); + success = newLockReturn.getResult().compareTo(ResultType.SUCCESS) == 0; + } + if (!success) { + pendingToLock.addAll(pending.getValue()); + currentLockRef.put(pending.getKey(), newLockId); + } else { + if(alreadyHeldLocks.containsKey(pending.getKey())){ + throw new MDBCServiceException("Adding key that already exist"); + } + alreadyHeldLocks.put(pending.getKey(),new LockResult(pending.getKey(), newLockId, true, + pending.getValue())); + newRanges.addAll(pending.getValue()); + } + return newRanges; + } + + private boolean isDifferent(NavigableMap<UUID, List<Range>> previous, NavigableMap<UUID, List<Range>> current){ + return previous.keySet().equals(current.keySet()); + } + + protected List<Range> waitForLock(LockRequest request, DatabasePartition partition, + Map<UUID, LockResult> rowLock) throws MDBCServiceException { + List<Range> newRanges = new ArrayList<>(); + if(partition.getMRIIndex()!=request.getId()){ + throw new MDBCServiceException("Invalid argument for wait for lock, range id in request and partition should match"); + } + String fullyQualifiedKey= music_ns+"."+ request.getTable()+"."+request.getId(); String lockId = MusicCore.createLockReference(fullyQualifiedKey); ReturnType lockReturn = acquireLock(fullyQualifiedKey,lockId); if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) { //\TODO Improve the exponential backoff + List<Range> pendingToLock = request.getToLockRanges(); + Map<UUID, String> currentLockRef = new HashMap<>(); int n = 1; int low = 1; int high = 1000; Random r = new Random(); - while(MusicCore.whoseTurnIsIt(fullyQualifiedKey)!=lockId){ + Map<Range, RangeMriRow> rangeRows = findRangeRows(pendingToLock); + NavigableMap<UUID, List<Range>> rowsToLock = getPendingRows(rangeRows); + NavigableMap<UUID, List<Range>> prevRows = new TreeMap<>(); + while (!pendingToLock.isEmpty() && isDifferent(prevRows,rowsToLock) ) { + pendingToLock.clear(); try { Thread.sleep(((int) Math.round(Math.pow(2, n)) * 1000) + (r.nextInt(high - low) + low)); } catch (InterruptedException e) { continue; } - n++; - if(n==20){ - throw new MDBCServiceException("Lock was impossible to obtain, waited for 20 exponential backoffs!") ; + n++; + if (n == 20) { + throw new MDBCServiceException("Lock was impossible to obtain, waited for 20 exponential backoffs!"); + } + //\TODO do this in parallel + //\TODO there is a race condition here, from the time we get the find range rows, to the time we lock the row, + //\TODO this race condition can only be solved if require to obtain lock to all related rows in MRI + //\TODO before fully owning the range + //\TODO The rows need to be lock in increasing order of timestamp + //there could be a new row created + // Note: This loop needs to be perfomed in sorted order of timebased UUID + for (Map.Entry<UUID, List<Range>> pending : rowsToLock.entrySet()) { + List<Range> rs = lockRow(request, pending, currentLockRef, fullyQualifiedKey, lockId, pendingToLock, rowLock); + newRanges.addAll(rs); + } + if (n++ == 20) { + throw new MDBCServiceException( + "Lock was impossible to obtain, waited for 20 exponential backoffs!"); } + rangeRows = findRangeRows(pendingToLock); + prevRows = rowsToLock; + rowsToLock = getPendingRows(rangeRows); } } - partition.setLockId(lockId); - return partition; + else { + partition.setLockId(lockId); + rowLock.put(partition.getMRIIndex(),new LockResult(partition.getMRIIndex(), lockId, true, partition.getSnapshot())); + } + return newRanges; } + + protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException { UUID mriIndex = partition.getMRIIndex(); String lockId; lockId = MusicCore.createLockReference(fullyQualifiedKey); + if(lockId==null) { + throw new MDBCServiceException("lock reference is null"); + } ReturnType lockReturn = acquireLock(fullyQualifiedKey,lockId); //\TODO this is wrong, we should have a better way to obtain a lock forcefully, clean the queue and obtain the lock if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) { @@ -1126,7 +1291,16 @@ public class MusicMixin implements MusicInterface { return lockId; } - + protected void changeIsLatestToMRI(MusicRangeInformationRow row, boolean isLatest, LockResult lock) throws MDBCServiceException{ + PreparedQueryObject appendQuery = createChangeIsLatestToMriQuery(musicRangeInformationTableName, row.getPartitionIndex(), + musicTxDigestTableName, isLatest); + ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, row.getPartitionIndex().toString(), + appendQuery, lock.getOwnerId(), null); + if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){ + logger.error(EELFLoggerDelegate.errorLogger, "Error when executing change isLatest operation with return type: "+returnType.getMessage()); + throw new MDBCServiceException("Error when executing change isLatest operation with return type: "+returnType.getMessage()); + } + } protected void appendIndexToMri(String lockId, UUID commitId, UUID MriIndex) throws MDBCServiceException{ PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MriIndex, musicTxDigestTableName, commitId); @@ -1137,23 +1311,27 @@ public class MusicMixin implements MusicInterface { } } + /** * Writes the transaction information to metric's txDigest and musicRangeInformation table * This officially commits the transaction globally */ @Override - public void commitLog(DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{ + public void commitLog(DatabasePartition partition, List<Range> eventualRanges, HashMap<Range,StagingTable> transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{ + + // first deal with commit for eventually consistent tables + filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper); + + // if strong consistency tables are not present in the transaction then return + if(partition == null || partition.getMRIIndex() == null) + return; + UUID mriIndex = partition.getMRIIndex(); - if(mriIndex==null) { - partition = own(partition.getSnapshot(),partition); - mriIndex = partition.getMRIIndex(); - System.err.println("MRIINDEX: " + mriIndex); - } String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex; //0. See if reference to lock was already created String lockId = partition.getLockId(); - if(lockId == null || lockId.isEmpty()) { - waitForLock(fullyQualifiedMriKey,partition); + if(mriIndex==null || lockId == null || lockId.isEmpty()) { + own(partition.getSnapshot(),partition, MDBCUtils.generateTimebasedUniqueKey()); } UUID commitId; @@ -1168,13 +1346,17 @@ public class MusicMixin implements MusicInterface { //Add creation type of transaction digest //1. Push new row to RRT and obtain its index + if(transactionDigest == null || transactionDigest.isEmpty()) { + return; + } + String serializedTransactionDigest; try { serializedTransactionDigest = MDBCUtils.toString(transactionDigest); } catch (IOException e) { throw new MDBCServiceException("Failed to serialized transaction digest with error "+e.toString(), e); } - MusicTxDigestId digestId = new MusicTxDigestId(commitId); + MusicTxDigestId digestId = new MusicTxDigestId(commitId,-1); addTxDigest(digestId, serializedTransactionDigest); //2. Save RRT index to RQ if(progressKeeper!= null) { @@ -1184,6 +1366,66 @@ public class MusicMixin implements MusicInterface { appendToRedoLog(partition,digestId); } + private void filterAndAddEventualTxDigest(List<Range> eventualRanges, + HashMap<Range, StagingTable> transactionDigest, String txId, + TxCommitProgress progressKeeper) throws MDBCServiceException { + + if(eventualRanges == null || eventualRanges.isEmpty()) { + return; + } + + HashMap<Range,StagingTable> eventualTransactionDigest = new HashMap<Range,StagingTable>(); + + for(Range eventualRange: eventualRanges) { + transactionDigest.computeIfPresent(eventualRange, new BiFunction<Range,StagingTable,StagingTable>() { + + @Override + public StagingTable apply(Range key, StagingTable value) { + eventualTransactionDigest.put(key, value); + //transactionDigest.remove(key); + return null; + } + + }); + } + + UUID commitId = getCommitId(txId, progressKeeper); + + //1. Push new row to RRT + + String serializedTransactionDigest; + if(eventualTransactionDigest != null && !eventualTransactionDigest.isEmpty()) { + + try { + serializedTransactionDigest = MDBCUtils.toString(eventualTransactionDigest); + } catch (IOException e) { + throw new MDBCServiceException("Failed to serialized transaction digest with error "+e.toString(), e); + } + MusicTxDigestId digestId = new MusicTxDigestId(commitId,-1); + addEventualTxDigest(digestId, serializedTransactionDigest); + + if(progressKeeper!= null) { + progressKeeper.setRecordId(txId,digestId); + } + } + + + } + + private UUID getCommitId(String txId, TxCommitProgress progressKeeper) + throws MDBCServiceException { + UUID commitId; + //Generate a local commit id + if(progressKeeper.containsTx(txId)) { + commitId = progressKeeper.getCommitId(txId); + } + else{ + logger.error(EELFLoggerDelegate.errorLogger, "Tx with id "+txId+" was not created in the TxCommitProgress "); + throw new MDBCServiceException("Tx with id "+txId+" was not created in the TxCommitProgress "); + } + return commitId; + } + /** * @param tableName * @param string @@ -1241,7 +1483,8 @@ public class MusicMixin implements MusicInterface { return partitions; } - List<Range> getRanges(Row newRow){ + + public List<Range> getRanges(Row newRow){ List<Range> partitions = new ArrayList<>(); Set<String> tables = newRow.getSet("keys",String.class); for (String table:tables){ @@ -1250,22 +1493,36 @@ public class MusicMixin implements MusicInterface { return partitions; } - MusicRangeInformationRow getMRIRowFromCassandraRow(Row newRow){ + public MusicRangeInformationRow getMRIRowFromCassandraRow(Row newRow){ UUID partitionIndex = newRow.getUUID("rangeid"); List<TupleValue> log = newRow.getList("txredolog",TupleValue.class); List<MusicTxDigestId> digestIds = new ArrayList<>(); + int index=0; for(TupleValue t: log){ //final String tableName = t.getString(0); - final UUID index = t.getUUID(1); - digestIds.add(new MusicTxDigestId(index)); + final UUID id = t.getUUID(1); + digestIds.add(new MusicTxDigestId(id,index++)); } List<Range> partitions = new ArrayList<>(); Set<String> tables = newRow.getSet("keys",String.class); for (String table:tables){ partitions.add(new Range(table)); } - return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex, ""), - digestIds, newRow.getString("ownerid"),newRow.getString("metricprocessid")); + return new MusicRangeInformationRow(partitionIndex, new DatabasePartition(partitions, partitionIndex, ""), + digestIds, newRow.getString("ownerid"),newRow.getString("metricprocessid"), + newRow.getBool("islatest")); + } + + public RangeDependency getRangeDependenciesFromCassandraRow(Row newRow){ + if(newRow == null) return null; + String base = newRow.getString("range"); + Range baseRange = new Range(base); + Set<String> dependencies = newRow.getSet("dependencies", String.class); + List<Range> rangeDependencies = new ArrayList<>(); + for(String dependency: dependencies){ + rangeDependencies.add(new Range(dependency)); + } + return new RangeDependency(baseRange,rangeDependencies); } @Override @@ -1288,6 +1545,21 @@ public class MusicMixin implements MusicInterface { return getMRIRowFromCassandraRow(newRow); } + @Override + public RangeDependency getMusicRangeDependency(Range baseRange) throws MDBCServiceException{ + String cql = String.format("SELECT * FROM %s.%s WHERE range = ?;", music_ns, musicRangeDependencyTableName); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + pQueryObject.addValue(baseRange.getTable()); + Row newRow; + try { + newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,baseRange.getTable(),null); + } catch (MDBCServiceException e) { + logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName); + throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e); + } + return getRangeDependenciesFromCassandraRow(newRow); + } /** * This function creates the TransactionInformation table. It contain information related @@ -1307,6 +1579,7 @@ public class MusicMixin implements MusicInterface { fields.append("rangeid uuid, "); fields.append("keys set<text>, "); fields.append("ownerid text, "); + fields.append("islatest boolean, "); fields.append("metricprocessid text, "); //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly fields.append("txredolog list<frozen<tuple<text,uuid>>> "); @@ -1330,22 +1603,48 @@ public class MusicMixin implements MusicInterface { if(lockId == null || lockId.isEmpty()){ throw new MDBCServiceException("Error initializing music range information, error creating a lock for a new row") ; } - createEmptyMriRow(newPartition.getMRIIndex(),info.getMetricProcessId(),lockId,newPartition.getSnapshot()); + createEmptyMriRow(newPartition.getMRIIndex(),info.getMetricProcessId(),lockId, + newPartition.getSnapshot(),info.getIsLatest()); + info.setOwnerId(lockId); return newPartition; } + @Override + public void createMusicRangeDependency(RangeDependency rangeAndDependencies) throws MDBCServiceException { + StringBuilder insert = new StringBuilder("INSERT INTO ") + .append(this.music_ns) + .append('.') + .append(this.musicRangeDependencyTableName) + .append(" (range,dependencies) VALUES ") + .append("(") + .append(rangeAndDependencies.getRange().getTable()) + .append(",{"); + boolean first=true; + for(Range r: rangeAndDependencies.dependentRanges()){ + if(first){ first=false; } + else { + insert.append(','); + } + insert.append("'").append(r.toString()).append("'"); + } + insert.append("};"); + PreparedQueryObject query = new PreparedQueryObject(); + query.appendQueryString(insert.toString()); + MusicCore.eventualPut(query); + } + private UUID createEmptyMriRow(List<Range> rangesCopy) { - //TODO: THis should call one of the other createMRIRows - UUID id = generateUniqueKey(); + //TODO: THis should call one of the other createMRIRows + UUID id = generateUniqueKey(); StringBuilder insert = new StringBuilder("INSERT INTO ") .append(this.music_ns) - .append('.') - .append(this.musicRangeInformationTableName) - .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ") - .append("(") - .append(id) - .append(",{"); + .append('.') + .append(this.musicRangeInformationTableName) + .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ") + .append("(") + .append(id) + .append(",{"); boolean first=true; for(Range r: rangesCopy){ if(first){ first=false; } @@ -1364,7 +1663,7 @@ public class MusicMixin implements MusicInterface { MusicCore.eventualPut(query); return id; } - + /** * Creates a new empty MRI row @@ -1373,8 +1672,9 @@ public class MusicMixin implements MusicInterface { */ private UUID createEmptyMriRow(String processId, String lockId, List<Range> ranges) throws MDBCServiceException { - UUID id = MDBCUtils.generateUniqueKey(); - return createEmptyMriRow(id,processId,lockId,ranges); + UUID id = MDBCUtils.generateTimebasedUniqueKey(); + return createEmptyMriRow(id,processId,lockId,ranges,true); + } /** @@ -1382,14 +1682,14 @@ public class MusicMixin implements MusicInterface { * @param processId id of the process that is going to own initially this. * @return uuid associated to the new row */ - private UUID createEmptyMriRow(UUID id, String processId, String lockId, List<Range> ranges) + private UUID createEmptyMriRow(UUID id, String processId, String lockId, List<Range> ranges, boolean isLatest) throws MDBCServiceException{ logger.info("Creating MRI " + id + " for ranges " + ranges); StringBuilder insert = new StringBuilder("INSERT INTO ") .append(this.music_ns) .append('.') .append(this.musicRangeInformationTableName) - .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ") + .append(" (rangeid,keys,ownerid,islatest,metricprocessid,txredolog) VALUES ") .append("(") .append(id) .append(",{"); @@ -1403,7 +1703,9 @@ public class MusicMixin implements MusicInterface { } insert.append("},'") .append((lockId==null)?"":lockId) - .append("','") + .append("',") + .append(isLatest) + .append(",'") .append(processId) .append("',[]);"); PreparedQueryObject query = new PreparedQueryObject(); @@ -1431,14 +1733,46 @@ public class MusicMixin implements MusicInterface { public void createMusicTxDigest() throws MDBCServiceException { createMusicTxDigest(-1); } + + public void createMusicEventualTxDigest() throws MDBCServiceException { + createMusicEventualTxDigest(-1); + } /** - * This function creates the MusicTxDigest table. It contain information related to each transaction committed + * This function creates the MusicEveTxDigest table. It contain information related to each eventual transaction committed * * LeaseId: id associated with the lease, text * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later * * TransactionDigest: text that contains all the changes in the transaction */ + private void createMusicEventualTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException { + String tableName = this.musicEventualTxDigestTableName; + if(musicTxDigestTableNumber >= 0) { + tableName = tableName + + "-" + + Integer.toString(musicTxDigestTableNumber); + } + String priKey = "txTimeId"; + StringBuilder fields = new StringBuilder(); + fields.append("txid uuid, "); + fields.append("transactiondigest text, "); + fields.append("txTimeId TIMEUUID ");//notice lack of ',' + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey); + try { + executeMusicWriteQuery(this.music_ns,tableName,cql); + } catch (MDBCServiceException e) { + logger.error("Initialization error: Failure to create redo records table"); + throw(e); + } + } + + + /** + * This function creates the MusicTxDigest table. It contain information related to each transaction committed + * * LeaseId: id associated with the lease, text + * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later + * * TransactionDigest: text that contains all the changes in the transaction + */ private void createMusicTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException { String tableName = this.musicTxDigestTableName; if(musicTxDigestTableNumber >= 0) { @@ -1459,6 +1793,21 @@ public class MusicMixin implements MusicInterface { } } + private void createMusicRangeDependencyTable() throws MDBCServiceException { + String tableName = this.musicRangeDependencyTableName; + String priKey = "range"; + StringBuilder fields = new StringBuilder(); + fields.append("range text, "); + fields.append("dependencies set<text> ");//notice lack of ',' + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, + fields, priKey); + try { + executeMusicWriteQuery(this.music_ns,tableName,cql); + } catch (MDBCServiceException e) { + logger.error("Initialization error: Failure to create redo records table"); + throw(e); + } + } /** * Writes the transaction history to the txDigest @@ -1485,6 +1834,35 @@ public class MusicMixin implements MusicInterface { throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString(), e); } } + + /** + * Writes the Eventual transaction history to the evetxDigest + */ + @Override + public void addEventualTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException { + //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest); + PreparedQueryObject query = new PreparedQueryObject(); + String cqlQuery = "INSERT INTO " + + this.music_ns + + '.' + + this.musicEventualTxDigestTableName + + " (txid,transactiondigest,txTimeId) " + + "VALUES (" + + newId.txId + ",'" + + transactionDigest + + "'," + + // "toTimestamp(now())" + + "now()" + + ");"; + query.appendQueryString(cqlQuery); + //\TODO check if I am not shooting on my own foot + try { + MusicCore.nonKeyRelatedPut(query,"critical"); + } catch (MusicServiceException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage()); + throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString(), e); + } + } @Override public HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException { @@ -1512,133 +1890,465 @@ public class MusicMixin implements MusicInterface { } return changes; } + + + @Override + public ArrayList<HashMap<Range,StagingTable>> getEveTxDigest() throws MDBCServiceException { + HashMap<Range,StagingTable> changes; + ArrayList<HashMap<Range,StagingTable>> ecDigestList = new ArrayList<HashMap<Range,StagingTable>>(); + + //but was this timestamp is getting added as per post: https://dev.mysql.com/doc/refman/8.0/en/time-zone-leap-seconds.html + //Ex1: SELECT uuid, txDigest, UNIX_TIMESTAMP(ts) FROM ectxdigest ORDER by ts; + //Ex2: SELECT * FROM ectxdigest ORDER by ts; or SELECT * FROM ectxdigest + //####### this will pull all records.. but REPLAY will be against specific records once the NODE it back ON-Line. + // I should get the last record timestamp so that I can put a where condition. + //EX3: SELECT uuid, txDigest, UNIX_TIMESTAMP(ts) FROM ectxdigest where UNIX_TIMESTAMP(ts)>UNIX_TIMESTAMP(<<Date/Time value from others>>) ORDER by ts; + String cql = String.format("SELECT * FROM %s.%s ;", music_ns, this.musicEventualTxDigestTableName); + // Ex 1 & 2 might return millions of records!! things to consider outOfMemory issue, performance issue etc.. How to overcome?? + // Ex 3: will return less records compare to Ex:1 and Ex:2. + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + + // I need to get a ResultSet of all the records and give each row to the below HashMap. + ResultSet rs = executeMusicRead(pQueryObject.toString()); + while (!rs.isExhausted()) { + Row row = rs.one(); + String digest = row.getString("transactiondigest"); + + try { + changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest); + } catch (IOException e) { + logger.error("IOException when deserializing digest"); + throw new MDBCServiceException("Deserializng digest failed with ioexception", e); + } catch (ClassNotFoundException e) { + logger.error("Deserializng digest failed with an invalid class"); + throw new MDBCServiceException("Deserializng digest failed with an invalid class", e); + } + + ecDigestList.add(changes); + } + + return ecDigestList; + } + + + + ResultSet getAllMriCassandraRows() throws MDBCServiceException { + StringBuilder cqlOperation = new StringBuilder(); + cqlOperation.append("SELECT * FROM ") + .append(music_ns) + .append(".") + .append(musicRangeInformationTableName); + return executeMusicRead(cqlOperation.toString()); + } + + @Override + public List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException{ + List<MusicRangeInformationRow> rows = new ArrayList<>(); + final ResultSet mriCassandraRows = getAllMriCassandraRows(); + while (!mriCassandraRows.isExhausted()) { + Row musicRow = mriCassandraRows.one(); + final MusicRangeInformationRow mriRow = getMRIRowFromCassandraRow(musicRow); + rows.add(mriRow); + } + return rows; + } + + private RangeMriRow findRangeRow(Range range) throws MDBCServiceException { + RangeMriRow row = null; + final ResultSet musicResults = getAllMriCassandraRows(); + while (!musicResults.isExhausted()) { + Row musicRow = musicResults.one(); + final MusicRangeInformationRow mriRow = getMRIRowFromCassandraRow(musicRow); + final List<Range> musicRanges = getRanges(musicRow); + //\TODO optimize this for loop to avoid redudant access + for(Range retrievedRange : musicRanges) { + if (retrievedRange.overlaps(range)) { + if(row==null){ + row = new RangeMriRow(range); + row.setCurrentRow(mriRow); + } + else if(row.getCurrentRow().getTimestamp() < mriRow.getTimestamp()){ + row.addOldRow(row.getCurrentRow()); + row.setCurrentRow(mriRow); + } + } + } + } + if(row==null){ + logger.error("Row in MRI doesn't exist for Range "+range.toString()); + throw new MDBCServiceException("Row in MRI doesn't exist for Range "+range.toString()); + } + return row; + } /** * This function is used to find all the related uuids associated with the required ranges * @param ranges ranges to be find - * @return a map that associated each MRI row to the corresponding ranges + * @return a map that associates each MRI row to the corresponding ranges */ - private Map<UUID,List<Range>> findRangeRows(List<Range> ranges) throws MDBCServiceException { - /* \TODO this function needs to be improved, by creating an additional index, or at least keeping a local cache + private Map<Range,RangeMriRow> findRangeRows(List<Range> ranges) throws MDBCServiceException { + /* \TODO this function needs to be improved, by creating an additional index, or at least keeping a local cache Additionally, we should at least used pagination and the token function, to avoid retrieving the whole table at once, this can become problematic if we have too many connections in the overall METRIC system */ - Map<UUID,List<Range>> result = new HashMap<>(); - List<Range> rangesCopy = new LinkedList<>(ranges); + Map<Range,RangeMriRow> result = new HashMap<>(); + for(Range r:ranges){ + result.put(r,null); + } int counter=0; - StringBuilder cqlOperation = new StringBuilder(); - cqlOperation.append("SELECT * FROM ") - .append(music_ns) - .append(".") - .append(musicRangeInformationTableName); - ResultSet musicResults = executeMusicRead(cqlOperation.toString()); + final ResultSet musicResults = getAllMriCassandraRows(); while (!musicResults.isExhausted()) { Row musicRow = musicResults.one(); - UUID mriIndex = musicRow.getUUID("rangeid"); + final MusicRangeInformationRow mriRow = getMRIRowFromCassandraRow(musicRow); final List<Range> musicRanges = getRanges(musicRow); + //\TODO optimize this for loop to avoid redudant access for(Range retrievedRange : musicRanges) { - for (Iterator<Range> iterator = rangesCopy.iterator(); iterator.hasNext(); ) { - Range range = iterator.next(); + for(Map.Entry<Range,RangeMriRow> e : result.entrySet()) { + Range range = e.getKey(); if (retrievedRange.overlaps(range)) { - // Remove the current element from the iterator and the list. - if(!result.containsKey(mriIndex)){ - result.put(mriIndex,new ArrayList<>()); + RangeMriRow r = e.getValue(); + if(r==null){ + counter++; + RangeMriRow newMriRow = new RangeMriRow(range); + newMriRow.setCurrentRow(mriRow); + result.replace(range,newMriRow); + } + else if(r.getCurrentRow().getTimestamp() < mriRow.getTimestamp()){ + r.addOldRow(r.getCurrentRow()); + r.setCurrentRow(mriRow); + } + else{ + r.addOldRow(mriRow); } - List<Range> foundRanges = result.get(mriIndex); - foundRanges.add(range); - iterator.remove(); } } } } - if(!rangesCopy.isEmpty()){ - StringBuilder tables = new StringBuilder(); - for(Range range: rangesCopy){ - tables.append(range.toString()).append(','); - } - logger.warn("Row in MRI doesn't exist for tables [ "+tables.toString()+"]"); - createEmptyMriRow(rangesCopy); + + if(ranges.size() != counter){ + logger.error("Row in MRI doesn't exist for "+Integer.toString(counter)+" ranges"); + throw new MDBCServiceException("MRI row doesn't exist for "+Integer.toString(counter)+" ranges"); } return result; } - private DatabasePartition lockRow(UUID rowId, List<Range> ranges, DatabasePartition partition) + private List<Range> lockRow(LockRequest request, DatabasePartition partition,Map<UUID, LockResult> rowLock) throws MDBCServiceException { - List<LockResult> result = new ArrayList<>(); - if(partition.getMRIIndex()==rowId){ - return partition; + if(partition.getMRIIndex()==request.id && partition.isLocked()){ + return new ArrayList<>(); } //\TODO: this function needs to be improved, to track possible changes in the owner of a set of ranges - String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+rowId.toString(); + String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+request.id.toString(); //return List<Range> knownRanges, UUID mriIndex, String lockId - DatabasePartition newPartition = new DatabasePartition(ranges,rowId,null); - return waitForLock(fullyQualifiedMriKey,newPartition); + DatabasePartition newPartition = new DatabasePartition(request.toLockRanges,request.id,null); + return waitForLock(request,newPartition,rowLock); + } + + private void unlockKeyInMusic(String table, String key, String lockref) { + String fullyQualifiedKey= music_ns+"."+ table+"."+lockref; + MusicCore.destroyLockRef(fullyQualifiedKey,lockref); + } + + private void releaseLocks(Map<UUID,LockResult> newLocks) throws MDBCServiceException{ + for(Map.Entry<UUID,LockResult> lock : newLocks.entrySet()) { + unlockKeyInMusic(musicRangeInformationTableName, lock.getKey().toString(), lock.getValue().getOwnerId()); + } + } + + private void releaseLocks(List<MusicRangeInformationRow> changed, Map<UUID,LockResult> newLocks) throws MDBCServiceException{ + for(MusicRangeInformationRow r : changed) { + LockResult lock = newLocks.get(r.getPartitionIndex()); + unlockKeyInMusic(musicRangeInformationTableName, r.getPartitionIndex().toString(), + lock.getOwnerId()); + newLocks.remove(r.getPartitionIndex()); + } + } + + private void releaseAllLocksExcept(UUID finalRow, Map<UUID,LockResult> newLocks) throws MDBCServiceException { + Set<UUID> toErase = new HashSet<>(); + for(Map.Entry<UUID,LockResult> lock : newLocks.entrySet()) { + UUID id = lock.getKey(); + if(id!=finalRow){ + unlockKeyInMusic(musicRangeInformationTableName, id.toString(), lock.getValue().getOwnerId()); + toErase.add(id); + } + } + for(UUID id:toErase){ + newLocks.remove(id); + } + } + + private List<Range> getExtendedRanges(List<Range> range) throws MDBCServiceException{ + Set<Range> extendedRange = new HashSet<>(); + for(Range r: range){ + extendedRange.add(r); + RangeDependency dependencies = getMusicRangeDependency(r); + if(dependencies!=null){ + extendedRange.addAll(dependencies.dependentRanges()); + } + } + return new ArrayList<>(extendedRange); + } + + private LockResult waitForLock(LockRequest request) throws MDBCServiceException{ + String fullyQualifiedKey= music_ns+"."+ request.getTable()+"."+request.getId(); + String lockId = MusicCore.createLockReference(fullyQualifiedKey); + ReturnType lockReturn = acquireLock(fullyQualifiedKey,lockId); + if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) { + //\TODO Improve the exponential backoff + int n = 1; + int low = 1; + int high = 1000; + Random r = new Random(); + while(MusicCore.whoseTurnIsIt(fullyQualifiedKey) != lockId){ + try { + Thread.sleep(((int) Math.round(Math.pow(2, n)) * 1000) + + (r.nextInt(high - low) + low)); + } catch (InterruptedException e) { + continue; + } + n++; + if (n == 20) { + throw new MDBCServiceException("Lock was impossible to obtain, waited for 20 exponential backoffs!"); + } + } + } + return new LockResult(request.id,lockId,true,null); + } + + private void recoverFromFailureAndUpdateDag(Dag latestDag,List<MusicRangeInformationRow> rows,List<Range> ranges, + Map<UUID,LockResult> locks) throws MDBCServiceException{ + Pair<List<Range>,Set<DagNode>> rangesAndDependents = latestDag.getIncompleteRangesAndDependents(); + if(rangesAndDependents.getKey()==null || rangesAndDependents.getKey().size()==0 || + rangesAndDependents.getValue()==null || rangesAndDependents.getValue().size() == 0){ + return; + } + MusicRangeInformationRow r = createAndAssignLock(rangesAndDependents.getKey()); + locks.put(r.getPartitionIndex(),new LockResult(r.getPartitionIndex(),r.getOwnerId(),true,rangesAndDependents.getKey())); + latestDag.addNewNode(r,new ArrayList<>(rangesAndDependents.getValue())); + } + + private List<MusicRangeInformationRow> setReadOnlyAnyDoubleRow(Dag latestDag,List<MusicRangeInformationRow> rows, Map<UUID,LockResult> locks) + throws MDBCServiceException{ + List<MusicRangeInformationRow> returnInfo = new ArrayList<>(); + List<DagNode> toDisable = latestDag.getOldestDoubles(); + for(DagNode node : toDisable){ + changeIsLatestToMRI(node.getRow(),false,locks.get(node.getId())); + latestDag.setIsLatest(node.getId(),false); + returnInfo.add(node.getRow()); + } + return returnInfo; + } + + private MusicRangeInformationRow createAndAssignLock(List<Range> ranges) throws MDBCServiceException { + UUID newUUID = MDBCUtils.generateTimebasedUniqueKey(); + DatabasePartition newPartition = new DatabasePartition(ranges,newUUID,null); + MusicRangeInformationRow newRow = new MusicRangeInformationRow(newUUID,newPartition,new ArrayList<>(), + null,getMyHostId(),true); + createMusicRangeInformation(newRow); + return newRow; + } + + private OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges, + Map<UUID,LockResult> locks, UUID ownershipId) throws MDBCServiceException{ + recoverFromFailureAndUpdateDag(extendedDag,latestRows,ranges,locks); + List<MusicRangeInformationRow> changed = setReadOnlyAnyDoubleRow(extendedDag, latestRows,locks); + releaseLocks(changed, locks); + MusicRangeInformationRow row = createAndAssignLock(ranges); + latestRows.add(row); + locks.put(row.getPartitionIndex(),new LockResult(row.getPartitionIndex(),row.getOwnerId(),true,ranges)); + extendedDag.addNewNodeWithSearch(row,ranges); + Pair<List<Range>, Set<DagNode>> missing = extendedDag.getIncompleteRangesAndDependents(); + if(missing.getKey().size()!=0 && missing.getValue().size()!=0) { + MusicRangeInformationRow newRow = createAndAssignLock(missing.getKey()); + latestRows.add(newRow); + locks.put(newRow.getPartitionIndex(), new LockResult(newRow.getPartitionIndex(), newRow.getOwnerId(), true, + missing.getKey())); + extendedDag.addNewNode(newRow, new ArrayList<>(missing.getValue())); + } + changed = setReadOnlyAnyDoubleRow(extendedDag, latestRows,locks); + releaseLocks(changed,locks); + releaseAllLocksExcept(row.getPartitionIndex(),locks); + LockResult ownRow = locks.get(row.getPartitionIndex()); + return new OwnershipReturn(ownershipId, ownRow.getOwnerId(), ownRow.getIndex(),ranges,extendedDag); + } + + // \TODO merge with dag code + private Map<Range,Set<DagNode>> getIsLatestPerRange(Dag dag, List<MusicRangeInformationRow> rows) throws MDBCServiceException { + Map<Range,Set<DagNode>> rowsPerLatestRange = new HashMap<>(); + for(MusicRangeInformationRow row : rows){ + DatabasePartition dbPartition = row.getDBPartition(); + if (row.getIsLatest()) { + for(Range range : dbPartition.getSnapshot()){ + if(!rowsPerLatestRange.containsKey(range)){ + rowsPerLatestRange.put(range,new HashSet<>()); + } + DagNode node = dag.getNode(row.getPartitionIndex()); + if(node!=null) { + rowsPerLatestRange.get(range).add(node); + } + else{ + rowsPerLatestRange.get(range).add(new DagNode(row)); + } + } + } + } + return rowsPerLatestRange; } @Override - public DatabasePartition own(List<Range> ranges, DatabasePartition partition) throws MDBCServiceException { - if (partition.owns(ranges)) { - return partition; - } - return appendRange(ranges,partition); + public OwnershipReturn own(List<Range> ranges, DatabasePartition partition, UUID opId) throws MDBCServiceException { + + if(ranges == null || ranges.isEmpty()) + return null; + + Map<UUID,LockResult> newLocks = new HashMap<>(); + //Init timeout clock + ownAndCheck.startOwnershipTimeoutClock(opId); + //Find + List<Range> extendedRanges = getExtendedRanges(ranges); + List<MusicRangeInformationRow> allMriRows = getAllMriRows(); + List<MusicRangeInformationRow> rows = ownAndCheck.getRows(allMriRows,extendedRanges, false); + Dag dag = Dag.getDag(rows,extendedRanges); + Dag prev = new Dag(); + while( (dag.isDifferent(prev) || !prev.isOwned() ) && + !ownAndCheck.timeout(opId) + ){ + while(dag.hasNextToOwn()){ + DagNode node = dag.nextToOwn(); + MusicRangeInformationRow row = node.getRow(); + UUID uuid = row.getPartitionIndex(); + if(partition.isLocked()&&partition.getMRIIndex()==uuid|| + newLocks.containsKey(uuid) || + !row.getIsLatest()){ + dag.setOwn(node); + } + else{ + LockResult lockResult = null; + boolean owned = false; + while(!owned && !ownAndCheck.timeout(opId)){ + try { + LockRequest request = new LockRequest(musicRangeInformationTableName,uuid, + new ArrayList(node.getRangeSet())); + lockResult = waitForLock(request); + owned = true; + } + catch (MDBCServiceException e){ + logger.warn("Locking failed, retrying",e); + } + } + if(owned){ + dag.setOwn(node); + newLocks.put(uuid,lockResult); + } + else{ + break; + } + } + } + prev=dag; + //TODO instead of comparing dags, compare rows + allMriRows = getAllMriRows(); + rows = ownAndCheck.getRows(allMriRows,extendedRanges,false); + dag = Dag.getDag(rows,extendedRanges); + } + if(!prev.isOwned() || dag.isDifferent(prev)){ + releaseLocks(newLocks); + ownAndCheck.stopOwnershipTimeoutClock(opId); + logger.error("Error when owning a range: Timeout"); + throw new MDBCServiceException("Ownership timeout"); + } + Set<Range> allRanges = prev.getAllRanges(); + List<MusicRangeInformationRow> isLatestRows = ownAndCheck.getRows(allMriRows, new ArrayList<>(allRanges), true); + prev.setRowsPerLatestRange(getIsLatestPerRange(dag,isLatestRows)); + return mergeLatestRows(prev,rows,ranges,newLocks,opId); } /** - * Merge otherpartitions info into the partition - * @param newId - * @param otherPartitionsk - * @param partition - * @return list of old UUIDs merged - * @throws MDBCServiceException + * This function is used to check if we need to create a new row in MRI, beacause one of the new ranges is not contained + * @param ranges ranges that should be contained in the partition + * @param partition currently own partition + * @return */ - private DatabasePartition mergeMriRows(UUID newId, List<DatabasePartition> otherPartitions, DatabasePartition partition) + public boolean isAppendRequired(List<Range> ranges, DatabasePartition partition){ + for(Range r: ranges){ + if(!partition.isContained(r)){ + return true; + } + } + return false; + } + + private Map<UUID,String> mergeMriRows(String newId, Map<UUID,LockResult> lock, DatabasePartition partition) throws MDBCServiceException { - List<UUID> oldIds = new ArrayList<>(); + Map<UUID,String> oldIds = new HashMap<>(); List<Range> newRanges = new ArrayList<>(); - for (DatabasePartition dbPart : otherPartitions) { - oldIds.add(dbPart.getMRIIndex()); - newRanges.addAll(dbPart.getSnapshot()); - } - DatabasePartition newPartition = new DatabasePartition(newRanges,newId,null); + for (Map.Entry<UUID,LockResult> entry : lock.entrySet()) { + oldIds.put(entry.getKey(),entry.getValue().getOwnerId()); + //\TODO check if we need to do a locked get? Is that even required? + final MusicRangeInformationRow mriRow = getMusicRangeInformation(entry.getKey()); + final DatabasePartition dbPartition = mriRow.getDBPartition(); + newRanges.addAll(dbPartition.getSnapshot()); + } + DatabasePartition newPartition = new DatabasePartition(newRanges,UUID.fromString(newId),null); String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+newId; - newPartition = waitForLock(fullyQualifiedMriKey, newPartition); + UUID newUUID = UUID.fromString(newId); + LockRequest newRequest = new LockRequest(musicRangeInformationTableName,newUUID,newRanges); + waitForLock(newRequest, newPartition,lock); + if(!lock.containsKey(newUUID)||!lock.get(newUUID).isNewLock()){ + logger.error("When merging rows, lock returned an invalid error"); + throw new MDBCServiceException("When merging MRI rows, lock returned an invalid error"); + } + final LockResult lockResult = lock.get(newUUID); partition.updateDatabasePartition(newPartition); - createEmptyMriRow(partition.getMRIIndex(),myId,partition.getLockId(),partition.getSnapshot()); - return partition; + createEmptyMriRow(partition.getMRIIndex(),myId,lockResult.getOwnerId(),partition.getSnapshot(),true); + return oldIds; } - /** - * This function is in charge of owning all the ranges requested and creating a new row that show the ownership of all - * those ranges. - * @param rangeId new id to be used in the new row - * @param ranges ranges to be owned by the end of the function called - * @param partition current ownership status - * @return - * @throws MDBCServiceException - */ - private DatabasePartition appendRange(List<Range> ranges, DatabasePartition partition) - throws MDBCServiceException { - UUID newMRIId = generateUniqueKey(); - Map<UUID,List<Range>> rows = findRangeRows(ranges); - List<DatabasePartition> rowLocks=new ArrayList<>(); + private void obtainAllLocks(NavigableMap<UUID, List<Range>> rowsToLock,DatabasePartition partition, + List<Range> newRanges,Map<UUID, LockResult> rowLock) throws MDBCServiceException { //\TODO: perform this operations in parallel - for(Map.Entry<UUID,List<Range>> row : rows.entrySet()){ - DatabasePartition dbPartition; + for(Map.Entry<UUID,List<Range>> row : rowsToLock.entrySet()){ + List<Range> additionalRanges; try { - dbPartition = lockRow(row.getKey(),row.getValue(), partition); + LockRequest newRequest = new LockRequest(musicRangeInformationTableName,row.getKey(),row.getValue()); + additionalRanges =lockRow(newRequest, partition, rowLock); } catch (MDBCServiceException e) { //TODO: Make a decision if retry or just fail? - logger.error("Error locking row"); + logger.error("Error locking row",e); throw e; } - rowLocks.add(dbPartition); + newRanges.addAll(additionalRanges); } + } + +/* @Override + public OwnershipReturn appendRange(String rangeId, List<Range> ranges, DatabasePartition partition) + throws MDBCServiceException { + if(!isAppendRequired(ranges,partition)){ + return new OwnershipReturn(partition.getLockId(),UUID.fromString(rangeId),null,null); + } + Map<Range, RangeMriRow> rows = findRangeRows(ranges); + final NavigableMap<UUID, List<Range>> rowsToLock = getPendingRows(rows); + HashMap<UUID, LockResult> rowLock = new HashMap<>(); + List<Range> newRanges = new ArrayList<>(); + + obtainAllLocks(rowsToLock,partition,newRanges,rowLock); + String lockId; - List<UUID> oldIds = null; - if (rowLocks.size()==1) { - return rowLocks.get(0); + Map<UUID,String> oldIds = null; + if(rowLock.size()!=1){ + oldIds = mergeMriRows(rangeId, rowLock, partition); + lockId = partition.getLockId(); } - return mergeMriRows(newMRIId, rowLocks, partition); - } + else{ + List<LockResult> list = new ArrayList<>(rowLock.values()); + LockResult lockResult = list.get(0); + lockId = lockResult.getOwnerId(); + } + + return new OwnershipReturn(lockId,UUID.fromString(rangeId),oldIds,newRanges); + }*/ @Override public void relinquish(String ownerId, String rangeId) throws MDBCServiceException{ @@ -1663,19 +2373,13 @@ public class MusicMixin implements MusicInterface { if(!canTryRelinquishing() || !partition.isLocked()){ return; } - CassaLockStore lsHandle; - try { - lsHandle = MusicCassaCore.getLockingServiceHandle(); - } catch (MusicLockingException e) { - logger.error("Error obtaining the locking service handle when checking if relinquish was required"); - throw new MDBCServiceException("Error obtaining locking service"+e.getMessage(), e); - } long lockQueueSize; try { - lockQueueSize = lsHandle.getLockQueueSize(music_ns, this.musicRangeInformationTableName, partition.getMRIIndex().toString()); - } catch (MusicServiceException|MusicQueryException e) { + String fullyQualifiedKey= music_ns+"."+ this.musicRangeInformationTableName+"."+partition.getMRIIndex().toString(); + lockQueueSize = MusicCore.getLockQueueSize(fullyQualifiedKey); + } catch (MusicServiceException|MusicQueryException|MusicLockingException e) { logger.error("Error obtaining the lock queue size"); - throw new MDBCServiceException("Error obtaining lock queue size: "+e.getMessage(), e); + throw new MDBCServiceException("Error obtaining lock queue size: " + e.getMessage(), e); } if(lockQueueSize> 1){ //If there is any other node waiting, we just relinquish ownership @@ -1713,15 +2417,24 @@ public class MusicMixin implements MusicInterface { String lock) throws MDBCServiceException{ ResultSet result; - try { - result = MusicCore.criticalGet(keyspace,table,primaryKey,cqlObject,lock); - } catch(MusicServiceException e){ - //\TODO: handle better, at least transform into an MDBCServiceException - e.printStackTrace(); - throw new MDBCServiceException("Error executing critical get", e); + if(lock != null && !lock.isEmpty()) { + try { + result = MusicCore.criticalGet(keyspace, table, primaryKey, cqlObject, lock); + } catch (MusicServiceException e) { + e.printStackTrace(); + throw new MDBCServiceException("Error executing critical get", e); + } + } + else{ + try { + result = MusicCore.atomicGet(keyspace,table,primaryKey,cqlObject); + } catch (MusicServiceException|MusicLockingException|MusicQueryException e) { + e.printStackTrace(); + throw new MDBCServiceException("Error executing atomic get", e); + } } if(result.isExhausted()){ - throw new MDBCServiceException("There is not a row that matches the id "+primaryKey); + return null; } return result.one(); } @@ -1729,7 +2442,6 @@ public class MusicMixin implements MusicInterface { private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject) throws MDBCServiceException{ ResultSet result = MusicCore.quorumGet(cqlObject); - //\TODO: handle better, at least transform into an MDBCServiceException if(result.isExhausted()){ throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]"); } @@ -1762,9 +2474,37 @@ public class MusicMixin implements MusicInterface { } } + private void executeMusicLockedDelete(String namespace, String tableName, String primaryKeyValue, String lockId + ) throws MDBCServiceException{ + StringBuilder delete = new StringBuilder("DELETE FROM ") + .append(namespace) + .append('.') + .append(tableName) + .append(" WHERE rangeid= ") + .append(primaryKeyValue) + .append(";"); + PreparedQueryObject query = new PreparedQueryObject(); + query.appendQueryString(delete.toString()); + executeMusicLockedPut(namespace,tableName,primaryKeyValue,query,lockId,null); + } @Override public void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException{ - throw new NotImplementedException("Error, replay transaction in music mixin needs to be implemented"); + //throw new NotImplementedException("Error, replay transaction in music mixin needs to be implemented"); + } + + @Override + public void deleteOldMriRows(Map<UUID, String> oldRowsAndLocks) throws MDBCServiceException { + //\TODO Do this operations in parallel or combine in only query to cassandra + for(Map.Entry<UUID,String> rows : oldRowsAndLocks.entrySet()){ + //\TODO handle music delete correctly so we can delete the other rows + executeMusicLockedDelete(music_ns,musicRangeInformationTableName,rows.getKey().toString(),rows.getValue()); + } } + + @Override + public OwnershipAndCheckpoint getOwnAndCheck(){ + return ownAndCheck; + } + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java index 386865a..25b3de0 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java @@ -844,6 +844,25 @@ NEW.field refers to the new value jdbcConn.setAutoCommit(autocommit); } + @Override + public void disableForeignKeyChecks() throws SQLException { + Statement disable = jdbcConn.createStatement(); + disable.execute("SET FOREIGN_KEY_CHECKS=0"); + disable.closeOnCompletion(); + } + + @Override + public void enableForeignKeyChecks() throws SQLException { + Statement enable = jdbcConn.createStatement(); + enable.execute("SET FOREIGN_KEY_CHECKS=1"); + enable.closeOnCompletion(); + } + + @Override + public void applyTxDigest(HashMap<Range, StagingTable> txDigest) throws SQLException { + replayTransaction(txDigest); + } + /** * Replays operation into database, usually from txDigest * @param jdbcStmt @@ -876,7 +895,7 @@ NEW.field refers to the new value switch (op.getOperationType()) { case INSERT: sql.append(op.getOperationType() + " INTO "); - sql.append(r.table + " (") ; + sql.append(r.getTable() + " (") ; sep = ""; for (String col: cols) { sql.append(sep + col); @@ -892,7 +911,7 @@ NEW.field refers to the new value break; case UPDATE: sql.append(op.getOperationType() + " "); - sql.append(r.table + " SET "); + sql.append(r.getTable() + " SET "); sep=""; for (int i=0; i<cols.size(); i++) { sql.append(sep + cols.get(i) + "=\"" + vals.get(i) +"\""); @@ -904,7 +923,7 @@ NEW.field refers to the new value break; case DELETE: sql.append(op.getOperationType() + " FROM "); - sql.append(r.table + " WHERE "); + sql.append(r.getTable() + " WHERE "); sql.append(getPrimaryKeyConditional(op.getKey())); sql.append(";"); break; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java new file mode 100644 index 0000000..68d1f19 --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java @@ -0,0 +1,402 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ + +package org.onap.music.mdbc.ownership; + +import java.util.*; +import java.util.stream.Collectors; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.lang3.tuple.Pair; +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.mdbc.DatabasePartition; +import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.tables.MriRowComparator; +import org.onap.music.mdbc.tables.MusicRangeInformationRow; + +public class Dag { + + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Dag.class); + + private boolean valid; + private boolean ownInit; + private boolean readyInit; + private Map<UUID,DagNode> nodes; + private Queue<DagNode> readyNodes; + private Queue<DagNode> toApplyNodes; + private Map<Range,Set<DagNode>> rowsPerLatestRange; + private List<Range> ranges; + + public Dag(){ + this(false); + } + + + public Dag(boolean isValid){ + valid=isValid; + ranges=null; + readyNodes = new LinkedList<>(); + toApplyNodes = new LinkedList<>(); + nodes = new HashMap<>(); + ownInit = false; + readyInit = false; + rowsPerLatestRange = null; + } + + private void createDag(List<MusicRangeInformationRow> rows, List<Range> ranges){ + this.ranges = new ArrayList<>(ranges); + Map<Range,DagNode> latestRow = new HashMap<>(); + Collections.sort(rows, new MriRowComparator()); + for(MusicRangeInformationRow row : rows){ + if(!nodes.containsKey(row.getPartitionIndex())){ + DagNode node = new DagNode(row); + nodes.put(row.getPartitionIndex(),node); + for(Range range : ranges){ + List<Range> nodeRanges = row.getDBPartition().getSnapshot(); + for(Range nRange : nodeRanges){ + if(nRange.overlaps(range)){ + if(latestRow.containsKey(range)){ + final DagNode dagNode = latestRow.get(range); + dagNode.addOutgoingEdge(node); + node.addIncomingEdge(dagNode); + } + latestRow.put(range,node); + } + } + } + } + } + } + + public static Dag getDag(List<MusicRangeInformationRow> rows, List<Range> ranges){ + Dag newDag = new Dag(true); + newDag.createDag(rows,ranges); + return newDag; + } + + public void setRowsPerLatestRange(Map<Range, Set<DagNode>> rowsPerLatestRange) { + this.rowsPerLatestRange = rowsPerLatestRange; + } + + private void initApplyDatastructures(){ + readyInit=true; + nodes.forEach((id, node) -> { + if(node.hasNotIncomingEdges()) { + toApplyNodes.add(node); + } + }); + } + + private void initOwnDatastructures(){ + ownInit = true; + nodes.forEach((id, node) -> { + if(node.hasNotIncomingEdges()) { + readyNodes.add(node); + } + }); + } + + public DagNode getNode(UUID rowId) throws MDBCServiceException { + if(!nodes.containsKey(rowId)){ + return null; + } + return nodes.get(rowId); + } + + public synchronized boolean hasNextToOwn(){ + if(!ownInit){ + initOwnDatastructures(); + } + return !readyNodes.isEmpty(); + } + + public synchronized DagNode nextToOwn() throws MDBCServiceException { + if(!ownInit){ + initOwnDatastructures(); + } + DagNode nextNode = readyNodes.poll(); + if(nextNode == null){ + throw new MDBCServiceException("Next To Own was call without checking has next to own"); + } + return nextNode; + } + + public synchronized DagNode nextToApply(List<Range> ranges){ + if(!readyInit){ + initApplyDatastructures(); + } + Set<Range> rangesSet = new HashSet<>(ranges); + while(!toApplyNodes.isEmpty()){ + DagNode nextNode = toApplyNodes.poll(); + List<DagNode> outgoing = nextNode.getOutgoingEdges(); + for(DagNode out : outgoing){ + out.setApplyDependencyReady(nextNode); + if(out.areApplyDependenciesReady()){ + toApplyNodes.add(out); + } + } + if(!nextNode.wasApplied(rangesSet)){ + return nextNode; + } + } + return null; + } + + public synchronized boolean isDifferent(Dag other){ + Set<DagNode> thisSet = new HashSet<>(nodes.values()); + Set<DagNode> otherSet = new HashSet<>(other.nodes.values()); + return !(thisSet.size()==otherSet.size() && + thisSet.containsAll(otherSet)); + } + + public synchronized boolean isOwned(){ + if(!valid){ + return false; + } + else if(nodes.isEmpty()){ + return true; + } + for(Map.Entry<UUID,DagNode> pair : nodes.entrySet()){ + if(!pair.getValue().isOwned()){ + return false; + } + } + return true; + } + + public void setOwn(DagNode node) throws MDBCServiceException { + if(node == null){ + throw new MDBCServiceException("Set Own was call with a null node"); + } + final DagNode dagNode = nodes.get(node.getId()); + if(dagNode == null){ + throw new MDBCServiceException("Set Own was call with a node that is not in the DAG"); + } + dagNode.setOwned(); + for(DagNode next: dagNode.getOutgoingEdges()){ + next.setOwnDependencyReady(dagNode); + if (next.areOwnDependenciesReady()) { + readyNodes.add(next); + } + } + } + + public void setReady(DagNode node, Range range) throws MDBCServiceException { + if(node == null){ + throw new MDBCServiceException("Set Ready was call with a null node"); + } + final DagNode dagNode = nodes.get(node.getId()); + if(dagNode == null){ + throw new MDBCServiceException("Set Ready was call with a node that is not in the DAG"); + } + dagNode.addReady(range); + } + + public void setPartiallyReady(DagNode node, Range range, int index) throws MDBCServiceException { + if(node == null){ + throw new MDBCServiceException("Set Ready was call with a null node"); + } + final DagNode dagNode = nodes.get(node.getId()); + if(dagNode == null){ + throw new MDBCServiceException("Set Ready was call with a node that is not in the DAG"); + } + dagNode.addPartiallyReady(range,index); + } + + public synchronized boolean applied(){ + if(!valid) { + return false; + } + if(!readyInit){ + initApplyDatastructures(); + } + return toApplyNodes.isEmpty(); + } + + public void setAlreadyApplied(Map<Range, Pair<MusicRangeInformationRow,Integer>> alreadyApplied, Set<Range> ranges) + throws MDBCServiceException { + for(Map.Entry<UUID,DagNode> node : nodes.entrySet()){ + Set<Range> intersection = new HashSet<>(ranges); + intersection.retainAll(node.getValue().getRangeSet()); + for(Range r : intersection){ + if(alreadyApplied.containsKey(r)){ + final Pair<MusicRangeInformationRow, Integer> appliedPair = alreadyApplied.get(r); + final MusicRangeInformationRow appliedRow = appliedPair.getKey(); + final int index = appliedPair.getValue(); + final long appliedTimestamp = appliedRow.getTimestamp(); + final long nodeTimestamp = node.getValue().getTimestamp(); + if(appliedTimestamp > nodeTimestamp){ + setReady(node.getValue(),r); + } + else if(appliedTimestamp == nodeTimestamp){ + setPartiallyReady(node.getValue(),r,index); + } + } + } + } + } + + public void addNewNode(MusicRangeInformationRow row, List<DagNode> dependencies) throws MDBCServiceException { + boolean found=false; + if (ranges != null) { + DatabasePartition dbPartition = row.getDBPartition(); + for(Range range : dbPartition.getSnapshot()){ + for(Range dagRange : ranges){ + if(dagRange.overlaps(range)){ + found = true; + break; + } + } + if(found) break; + } + if(!found) { + return; + } + } + + DagNode newNode = new DagNode(row); + nodes.put(row.getPartitionIndex(),newNode); + for(DagNode dependency : dependencies) { + newNode.addIncomingEdge(dependency); + DagNode localNode = getNode(dependency.getId()); + localNode.addOutgoingEdge(newNode); + } + } + + public void addNewNodeWithSearch(MusicRangeInformationRow row, List<Range> ranges) throws MDBCServiceException { + Map<Range,DagNode> newestNode = new HashMap<>(); + for(DagNode node : nodes.values()){ + for(Range range : ranges) { + if (node.getRangeSet().contains(range)){ + if(!newestNode.containsKey(range)){ + newestNode.put(range,node); + } + else{ + DagNode current = newestNode.get(range); + if(node.getTimestamp() > current.getTimestamp()){ + newestNode.put(range,node); + } + } + } + } + } + List<DagNode> dependencies = newestNode.values().stream().distinct().collect(Collectors.toList()); + addNewNode(row,dependencies); + } + + public Set<Range> getAllRanges(){ + Set<Range> ranges = new HashSet<>(); + for(DagNode node : nodes.values()){ + ranges.addAll(node.getRangeSet()); + } + return ranges; + } + + public void setIsLatest(UUID id, boolean isLatest){ + DagNode dagNode = nodes.get(id); + dagNode.setIsLatest(isLatest); + if(isLatest) { + MusicRangeInformationRow row = dagNode.getRow(); + DatabasePartition dbPartition = row.getDBPartition(); + for (Range range : dbPartition.getSnapshot()) { + if (!rowsPerLatestRange.containsKey(range)) { + rowsPerLatestRange.put(range, new HashSet<>()); + } + rowsPerLatestRange.get(range).add(dagNode); + } + } + else{ + MusicRangeInformationRow row = dagNode.getRow(); + DatabasePartition dbPartition = row.getDBPartition(); + for (Range range : dbPartition.getSnapshot()) { + if (rowsPerLatestRange.containsKey(range)) { + rowsPerLatestRange.get(range).remove(dagNode); + } + } + } + } + + private Map<Range,Set<DagNode>> getIsLatestPerRange(){ + if(rowsPerLatestRange == null){ + rowsPerLatestRange = new HashMap<>(); + } + for(DagNode node : nodes.values()){ + MusicRangeInformationRow row = node.getRow(); + DatabasePartition dbPartition = row.getDBPartition(); + if (row.getIsLatest()) { + for(Range range : dbPartition.getSnapshot()){ + if(!rowsPerLatestRange.containsKey(range)){ + rowsPerLatestRange.put(range,new HashSet<>()); + } + rowsPerLatestRange.get(range).add(node); + } + } + } + return new HashMap<>(rowsPerLatestRange); + } + + private List<DagNode> getOldestDoubleRows(Map<Range,Set<DagNode>> rowPerLatestRange) throws MDBCServiceException { + Set<DagNode> oldest = new HashSet<>(); + for(Map.Entry<Range,Set<DagNode>> rangeAndNodes : rowPerLatestRange.entrySet()){ + Range range = rangeAndNodes.getKey(); + Set<DagNode> nodes = rangeAndNodes.getValue(); + if(nodes.size() > 2){ + logger.error("Range "+range.getTable()+"has more than 2 active rows"); + throw new MDBCServiceException("Range has more than 2 active rows"); + } + else if(nodes.size()==2){ + DagNode older = null; + long olderTimestamp = Long.MAX_VALUE; + for(DagNode node : nodes){ + if(olderTimestamp > node.getTimestamp()){ + older = node; + olderTimestamp=node.getTimestamp(); + } + } + oldest.add(older); + } + } + return new ArrayList<>(oldest); + } + + public List<DagNode> getOldestDoubles() throws MDBCServiceException{ + Map<Range,Set<DagNode>> rowsPerLatestRange = getIsLatestPerRange(); + List<DagNode> toDisable = getOldestDoubleRows(rowsPerLatestRange); + return toDisable; + } + + public Pair<List<Range>,Set<DagNode>> getIncompleteRangesAndDependents() throws MDBCServiceException { + List<Range> incomplete = new ArrayList<>(); + Set<DagNode> dependents = new HashSet<>(); + Map<Range,Set<DagNode>> rowsPerLatestRange = getIsLatestPerRange(); + List<DagNode> toDisable = getOldestDoubleRows(rowsPerLatestRange); + for(DagNode node : toDisable) { + for (Range range : node.getRangeSet()) { + rowsPerLatestRange.get(range).remove(node); + if (rowsPerLatestRange.get(range).size() == 0) { + incomplete.add(range); + dependents.add(node); + } + } + } + return Pair.of(incomplete,dependents); + } +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java new file mode 100644 index 0000000..e737b26 --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java @@ -0,0 +1,202 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ + +package org.onap.music.mdbc.ownership; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.commons.lang3.tuple.Pair; +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.mdbc.tables.MusicTxDigestId; + +public class DagNode { + + private boolean owned; + private boolean applyInit; + private final MusicRangeInformationRow row; + private int currentIndex; + private Set<DagNode> dependencies; + private Set<DagNode> outgoingEdges; + private Set<DagNode> readyOwnDependencies; + private Set<DagNode> readyAppliedDependencies; + private List<Range> alreadyApplied; + private Map<Range,Integer> partiallyApplied; + private Map<Range,Integer> startIndex; + + public DagNode(MusicRangeInformationRow row){ + this.row = row; + owned = false; + applyInit = false; + currentIndex = 0; + dependencies = new HashSet<>(); + outgoingEdges = new HashSet<>(); + readyOwnDependencies = new HashSet<>(); + readyAppliedDependencies = new HashSet<>(); + alreadyApplied = new ArrayList<>(); + partiallyApplied = new HashMap<>(); + startIndex = new HashMap<>(); + } + + public MusicRangeInformationRow getRow() { return row;} + + public synchronized void setOwned(){ + owned = true; + } + + public synchronized boolean isOwned(){ + return owned; + } + + public UUID getId(){ + return row.getPartitionIndex(); + } + + public synchronized void addIncomingEdge(DagNode sourceNode){ + dependencies.add(sourceNode); + } + + public synchronized void addOutgoingEdge(DagNode destinationNode){ + outgoingEdges.add(destinationNode); + } + + public synchronized boolean hasNotIncomingEdges(){ + return dependencies.isEmpty(); + } + + public synchronized List<DagNode> getOutgoingEdges(){ + return new ArrayList<>(outgoingEdges); + } + + public synchronized void addReady(Range r) throws MDBCServiceException { + if(!row.getDBPartition().isContained(r)){ + throw new MDBCServiceException("Range was set ready to a node that doesn't own it"); + } + alreadyApplied.add(r); + } + + public synchronized void addPartiallyReady(Range r, int index){ + partiallyApplied.put(r,index); + } + + public synchronized void setOwnDependencyReady(DagNode other){ + readyOwnDependencies.add(other); + } + + public synchronized boolean areOwnDependenciesReady(){ + final int dSize = dependencies.size(); + final int oSize = readyOwnDependencies.size(); + return (dSize == oSize) && dependencies.containsAll(readyOwnDependencies); + } + + public synchronized void setApplyDependencyReady(DagNode other){ + readyAppliedDependencies.add(other); + } + + public synchronized boolean areApplyDependenciesReady(){ + final int dSize = dependencies.size(); + final int oSize = readyAppliedDependencies.size(); + return (dSize == oSize) && dependencies.containsAll(readyAppliedDependencies); + } + + private void initializeApply(Set<Range> ranges){ + applyInit = true; + int redoSize = row.getRedoLog().size(); + // No need to apply + for(Range r: alreadyApplied){ + startIndex.put(r,redoSize); + } + // Only apply the required subsection + partiallyApplied.forEach((r, index) -> { + startIndex.put(r,index); + }); + // All other ranges need to be applied completely + Set<Range> alreadySet = new HashSet<>(alreadyApplied); + Set<Range> partialSet = partiallyApplied.keySet(); + Set<Range> pending = new HashSet<>(ranges); + pending.removeAll(alreadySet); + pending.removeAll(partialSet); + for(Range r: pending){ + startIndex.put(r,-1); + } + //Get the index of the redo log to begin with + currentIndex = startIndex.values().stream().mapToInt(v->v).min().orElse(0); + currentIndex = currentIndex+1; + } + + public synchronized Pair<MusicTxDigestId, List<Range>> nextNotAppliedTransaction(Set<Range> ranges){ + if(row.getRedoLog().isEmpty()) return null; + if(!applyInit){ + initializeApply(ranges); + } + final List<MusicTxDigestId> redoLog = row.getRedoLog(); + if(currentIndex < redoLog.size()){ + List<Range> responseRanges= new ArrayList<>(); + startIndex.forEach((r, index) -> { + if(index < currentIndex){ + responseRanges.add(r); + } + }); + return Pair.of(redoLog.get(currentIndex++),responseRanges); + } + return null; + } + + public void setIsLatest(boolean isLatest){ + row.setIsLatest(isLatest); + } + + public synchronized Set<Range> getRangeSet(){ + return new HashSet<>(row.getDBPartition().getSnapshot()); + } + + public synchronized boolean wasApplied(Set<Range> ranges){ + if(row.getRedoLog().isEmpty()) return true; + if(!applyInit){ + initializeApply(ranges); + } + return currentIndex >= row.getRedoLog().size(); + } + + public long getTimestamp(){ + return row.getTimestamp(); + } + + + @Override + public boolean equals(Object o){ + if (this == o) return true; + if(o == null) return false; + if(!(o instanceof DagNode)) return false; + DagNode other = (DagNode) o; + return other.row.getPartitionIndex().equals(this.row.getPartitionIndex()); + } + + @Override + public int hashCode(){ + return row.getPartitionIndex().hashCode(); + } +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java new file mode 100644 index 0000000..4ccd21d --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java @@ -0,0 +1,225 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ + +package org.onap.music.mdbc.ownership; + +import java.sql.SQLException; +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.lang3.tuple.Pair; +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.mixins.DBInterface; +import org.onap.music.mdbc.mixins.LockResult; +import org.onap.music.mdbc.mixins.MusicInterface; +import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.mdbc.tables.MusicTxDigestId; +import org.onap.music.mdbc.tables.StagingTable; + +public class OwnershipAndCheckpoint{ + + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(OwnershipAndCheckpoint.class); + private Lock checkpointLock; + private AtomicBoolean change; + private Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied; + private Map<UUID,Long> ownershipBeginTime; + private long timeoutInMs; + + public OwnershipAndCheckpoint(){ + this(new HashMap<>(),Long.MAX_VALUE); + } + + public OwnershipAndCheckpoint(Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied, long timeoutInMs){ + change = new AtomicBoolean(true); + checkpointLock = new ReentrantLock(); + this.alreadyApplied = alreadyApplied; + ownershipBeginTime = new HashMap<>(); + this.timeoutInMs = timeoutInMs; + } + + public void startOwnershipTimeoutClock(UUID id){ + ownershipBeginTime.put(id,System.currentTimeMillis()); + } + + public void stopOwnershipTimeoutClock(UUID id){ + if(ownershipBeginTime.containsKey(id)) { + ownershipBeginTime.remove(id); + } + else{ + logger.warn("clock was deleted with an invalid/stale id "+id); + } + } + + public boolean timeout(UUID id) throws MDBCServiceException { + long current = System.currentTimeMillis(); + if(!ownershipBeginTime.containsKey(id)){ + throw new MDBCServiceException("timeout was call with an invalid id"); + } + Long beginTime = ownershipBeginTime.get(id); + if(current-beginTime > timeoutInMs){ + return true; + } + return false; + } + + public List<MusicRangeInformationRow> getRows(List<MusicRangeInformationRow> allMriRows, List<Range> ranges, + boolean onlyIsLatest){ + List<MusicRangeInformationRow> rows = new ArrayList<>(); + for(MusicRangeInformationRow row : allMriRows){ + if(onlyIsLatest && !row.getIsLatest()){ + continue; + } + final List<Range> rowRanges = row.getDBPartition().getSnapshot(); + boolean found = false; + for(Range sRange : ranges){ + for(Range rRange: rowRanges) { + if(sRange.overlaps(rRange)){ + rows.add(row); + found=true; + break; + } + } + if(found) break; + } + } + return rows; + } + + private List<MusicRangeInformationRow> getRows(MusicInterface music, List<Range> ranges, boolean onlyIsLatest) + throws MDBCServiceException { + final List<MusicRangeInformationRow> allMriRows = music.getAllMriRows(); + return getRows(allMriRows,ranges,onlyIsLatest); + } + + public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, List<Range> ranges, + Map<MusicRangeInformationRow, LockResult> locks, UUID ownOpId) throws MDBCServiceException { + try { + checkpointLock.lock(); + change.set(true); + Set<Range> rangesSet = new HashSet<>(ranges); + extendedDag.setAlreadyApplied(alreadyApplied, rangesSet); + applyRequiredChanges(mi, di, extendedDag, ranges, ownOpId); + } + catch(MDBCServiceException e){ + stopOwnershipTimeoutClock(ownOpId); + throw e; + } + finally { + checkpointLock.unlock(); + } + } + + private void enableForeignKeys(DBInterface di) throws MDBCServiceException { + try { + di.enableForeignKeyChecks(); + } catch (SQLException e) { + throw new MDBCServiceException("Error enabling foreign keys checks",e); + } + } + + private void disableForeignKeys(DBInterface di) throws MDBCServiceException { + try { + di.disableForeignKeyChecks(); + } catch (SQLException e) { + throw new MDBCServiceException("Error disable foreign keys checks",e); + } + } + + private void applyTxDigest(DBInterface di, HashMap<Range, StagingTable> txDigest) + throws MDBCServiceException { + try { + di.applyTxDigest(txDigest); + } catch (SQLException e) { + throw new MDBCServiceException("Error applying tx digest in local SQL",e); + } + } + + public void warmup(MusicInterface mi, DBInterface di, List<Range> ranges) throws MDBCServiceException { + boolean ready = false; + change.set(true); + Set<Range> rangeSet = new HashSet<Range>(ranges); + Dag dag = new Dag(false); + while(!ready){ + if(change.get()){ + change.set(false); + final List<MusicRangeInformationRow> rows = getRows(mi, ranges,false); + dag = Dag.getDag(rows,ranges); + } + else if(!dag.applied()){ + DagNode node = dag.nextToApply(ranges); + if(node!=null) { + Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet); + while (pair != null) { + disableForeignKeys(di); + checkpointLock.lock(); + if (change.get()) { + enableForeignKeys(di); + checkpointLock.unlock(); + break; + } else { + final HashMap<Range, StagingTable> txDigest = mi.getTxDigest(pair.getKey()); + applyTxDigest(di, txDigest); + for (Range r : pair.getValue()) { + alreadyApplied.put(r, Pair.of(node.getRow(), pair.getKey().index)); + } + } + pair = node.nextNotAppliedTransaction(rangeSet); + enableForeignKeys(di); + checkpointLock.unlock(); + } + } + } + else{ + ready = true; + } + } + } + + private void applyRequiredChanges(MusicInterface mi, DBInterface db, Dag extendedDag, List<Range> ranges, UUID ownOpId) + throws MDBCServiceException { + Set<Range> rangeSet = new HashSet<Range>(ranges); + disableForeignKeys(db); + while(!extendedDag.applied()){ + DagNode node = extendedDag.nextToApply(ranges); + if(node!=null) { + Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet); + while (pair != null) { + final HashMap<Range, StagingTable> txDigest = mi.getTxDigest(pair.getKey()); + applyTxDigest(db, txDigest); + for (Range r : pair.getValue()) { + alreadyApplied.put(r, Pair.of(node.getRow(), pair.getKey().index)); + } + pair = node.nextNotAppliedTransaction(rangeSet); + if (timeout(ownOpId)) { + enableForeignKeys(db); + throw new MDBCServiceException("Timeout apply changes to local dbi"); + } + } + } + } + enableForeignKeys(db); + + } + +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriRowComparator.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriRowComparator.java new file mode 100644 index 0000000..281d763 --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriRowComparator.java @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ + +package org.onap.music.mdbc.tables; + +import java.util.Comparator; + +public class MriRowComparator implements Comparator<MusicRangeInformationRow> { + + @Override + public int compare(MusicRangeInformationRow o1, MusicRangeInformationRow o2) { + return Long.compare(o1.getTimestamp(),o2.getTimestamp()); + } +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java index 94011d7..2c5af2c 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,30 +19,38 @@ */ package org.onap.music.mdbc.tables; +import java.sql.Timestamp; import java.util.List; import java.util.UUID; import org.onap.music.mdbc.DatabasePartition; -public final class MusicRangeInformationRow { +public final class MusicRangeInformationRow implements Comparable<MusicRangeInformationRow>{ private final DatabasePartition dbPartition; - //private final UUID partitionIndex; + private final UUID partitionIndex; private final List<MusicTxDigestId> redoLog; - private final String ownerId; + private String ownerId; private final String metricProcessId; + private boolean isLatest; - public MusicRangeInformationRow (DatabasePartition dbPartition, List<MusicTxDigestId> redoLog, - String ownerId, String metricProcessId) { + public MusicRangeInformationRow (UUID partitionIndex, DatabasePartition dbPartition, List<MusicTxDigestId> redoLog, + String ownerId, String metricProcessId, boolean isLatest) { + this.partitionIndex=partitionIndex; this.dbPartition = dbPartition; this.redoLog = redoLog; this.ownerId = ownerId; this.metricProcessId = metricProcessId; + this.isLatest = isLatest; } - /*public UUID getPartitionIndex() { - return dbPartition.getMusicRangeInformationIndex(); - } */ - + public UUID getPartitionIndex() { + return dbPartition.getMRIIndex(); + } + + public boolean getIsLatest(){ return isLatest; } + + public void setIsLatest(boolean isLatest){ this.isLatest = isLatest; } + public DatabasePartition getDBPartition() { return this.dbPartition; } @@ -58,5 +66,35 @@ public final class MusicRangeInformationRow { public String getMetricProcessId() { return metricProcessId; } - + + public long getTimestamp(){ + return partitionIndex.timestamp(); + } + + public void setOwnerId(String newOwnerId){ + this.ownerId=newOwnerId; + } + + @Override + public int compareTo(MusicRangeInformationRow o) { + long thisTimestamp = this.getTimestamp(); + long oTimestamp = o.getTimestamp(); + //descending order + int returnVal = (thisTimestamp>oTimestamp)?-1:(thisTimestamp==oTimestamp)?0:1; + return returnVal; + } + + @Override + public boolean equals(Object o){ + if (this == o) return true; + if(o == null) return false; + if(!(o instanceof MusicRangeInformationRow)) return false; + MusicRangeInformationRow other = (MusicRangeInformationRow) o; + return other.getPartitionIndex().equals(this.getPartitionIndex()); + } + + @Override + public int hashCode(){ + return partitionIndex.hashCode(); + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java index b7c37ba..1da2d79 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java @@ -19,36 +19,22 @@ */ package org.onap.music.mdbc.tables; -import java.io.IOException; import java.sql.SQLException; import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Properties; import java.util.UUID; import java.util.concurrent.TimeUnit; - -import org.json.JSONObject; -import org.onap.music.datastore.PreparedQueryObject; import org.onap.music.exceptions.MDBCServiceException; -import org.onap.music.exceptions.MusicServiceException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.mdbc.DatabasePartition; -import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.MdbcConnection; -import org.onap.music.mdbc.MdbcServerLogic; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.StateManager; -import org.onap.music.mdbc.configurations.NodeConfiguration; -import org.onap.music.mdbc.mixins.MusicMixin; import org.onap.music.mdbc.mixins.DBInterface; import org.onap.music.mdbc.mixins.MusicInterface; -import com.datastax.driver.core.Row; - public class MusicTxDigest { private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicTxDigest.class); @@ -90,7 +76,8 @@ public class MusicTxDigest { for (UUID partition : partitions) { if (!partition.equals(myPartition.getMRIIndex())) { try { - replayDigestForPartition(mi, partition, dbi); + //replayDigestForPartition(mi, partition, dbi); + mi.getOwnAndCheck().warmup(mi, dbi, myPartition.getSnapshot()); } catch (MDBCServiceException e) { logger.error("Unable to update for partition : " + partition + ". " + e.getMessage()); continue; @@ -98,31 +85,71 @@ public class MusicTxDigest { } } } + + //Step 3: ReplayDigest() for E.C conditions + try { + replayDigest(mi,dbi); + } catch (MDBCServiceException e) { + logger.error("Unable to perform Eventual Consistency operations" + e.getMessage()); + continue; + } + } } - + /** - * Replay the digest for a given partition + * Replay the digest for eventual consistency. * @param mi music interface * @param partitionId the partition to be replayed * @param dbi interface to the database that will replay the operations * @throws MDBCServiceException */ - public static void replayDigestForPartition(MusicInterface mi, UUID partitionId, DBInterface dbi) throws MDBCServiceException { - List<MusicTxDigestId> partitionsRedoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog(); - for (MusicTxDigestId txId: partitionsRedoLogTxIds) { - HashMap<Range, StagingTable> transaction = mi.getTxDigest(txId); + public static void replayDigest(MusicInterface mi, DBInterface dbi) throws MDBCServiceException { + //List<MusicTxDigestId> partitionsRedoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog(); + //From where should I fetch TransactionsIDs ??? from NEW TABLE ?? or EXISING TABLE ?? << what the new SITE_TABLE details?? + // --> It is a new table called ECTxDigest + //I should sort/ call a method which gives all the entires of a table based on the time-stamp from Low to High + + ArrayList<HashMap<Range, StagingTable>> ecTxDigest = mi.getEveTxDigest(); + + //for (MusicTxDigestId txId: partitionsRedoLogTxIds) { // partitionsRedoLogTxIds --> this comes from new table where timeStamp > currentTimeStamp ( THIS SHOULD BE lessthan.. which is ASC order) + //HashMap<Range, StagingTable> transaction = mi2.getEcTxDigest(); // Getting records from musictxdigest TABLE. + for (HashMap<Range, StagingTable> transaction: ecTxDigest) { try { - dbi.replayTransaction(transaction); + dbi.replayTransaction(transaction); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??) } catch (SQLException e) { - logger.error("Rolling back the entire digest replay. " + partitionId); + logger.error("EC:Rolling back the entire digest replay."); return; } - logger.info("Successfully replayed transaction " + txId); + logger.info("EC: Successfully replayed transaction "); } - //todo, keep track of where I am in pointer } + + /** + * Replay the digest for a given partition + * @param mi music interface + * @param partitionId the partition to be replayed + * @param dbi interface to the database that will replay the operations + * @throws MDBCServiceException + */ + public static void replayDigestForPartition(MusicInterface mi, UUID partitionId, DBInterface dbi) throws MDBCServiceException { + List<MusicTxDigestId> partitionsRedoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog(); + for (MusicTxDigestId txId: partitionsRedoLogTxIds) { + HashMap<Range, StagingTable> transaction = mi.getTxDigest(txId); + try { + //\TODO do this two operations in parallel + dbi.replayTransaction(transaction); + mi.replayTransaction(transaction); + } catch (SQLException e) { + logger.error("Rolling back the entire digest replay. " + partitionId); + return; + } + logger.info("Successfully replayed transaction " + txId); + } + //todo, keep track of where I am in pointer + } + /** * Start the background daemon defined by this object * Spawns a new thread and runs "backgroundDaemon" diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java index fda34e2..1c37db0 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java @@ -23,12 +23,28 @@ import java.util.UUID; public final class MusicTxDigestId { public final UUID txId; + public final int index; - public MusicTxDigestId(UUID primaryKey) { + public MusicTxDigestId(UUID primaryKey, int index) { this.txId= primaryKey; + this.index=index; } public boolean isEmpty() { return (this.txId==null); } + + @Override + public boolean equals(Object o){ + if (this == o) return true; + if(o == null) return false; + if(!(o instanceof MusicTxDigestId)) return false; + MusicTxDigestId other = (MusicTxDigestId) o; + return other.txId.equals(this.txId); + } + + @Override + public int hashCode(){ + return txId.hashCode(); + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/RangeDependency.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/RangeDependency.java new file mode 100644 index 0000000..cc8c875 --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/RangeDependency.java @@ -0,0 +1,41 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ + +package org.onap.music.mdbc.tables; + +import org.onap.music.mdbc.Range; + +import java.util.List; + +public class RangeDependency { + final Range baseRange; + final List<Range> dependentRanges; + + public RangeDependency(Range baseRange, List<Range> dependentRanges){ + this.baseRange=baseRange; + this.dependentRanges=dependentRanges; + } + public Range getRange(){ + return baseRange; + } + public List<Range> dependentRanges(){ + return dependentRanges; + } +} diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcConnectionTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcConnectionTest.java index 2250b90..6842ed5 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcConnectionTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcConnectionTest.java @@ -20,8 +20,6 @@ package org.onap.music.mdbc; -import static org.junit.Assert.*; - import org.junit.Test; public class MdbcConnectionTest { diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java index 458f70f..2d31939 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java @@ -17,25 +17,22 @@ * limitations under the License. * ============LICENSE_END====================================================== */ + package org.onap.music.mdbc; -import static org.junit.Assert.*; +import org.junit.*; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; import org.onap.music.mdbc.mixins.MySQLMixin; import ch.vorburger.mariadb4j.DB; public class MySQLMixinTest { - public static final String DATABASE = "mdbcTest"; + public static final String DATABASE = "mdbctest"; public static final String TABLE= "Persons"; public static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS " + TABLE + " (\n" + " PersonID int,\n" + @@ -74,7 +71,7 @@ public class MySQLMixinTest { @Test public void testGetDataBaseName() throws SQLException { - assertEquals(DATABASE, mysqlMixin.getDatabaseName()); + Assert.assertEquals(DATABASE, mysqlMixin.getDatabaseName()); } } diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java b/mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java index 85220d4..8c45c2e 100755 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java @@ -22,8 +22,12 @@ package org.onap.music.mdbc; import com.datastax.driver.core.*; import com.datastax.driver.core.exceptions.QueryExecutionException; import com.datastax.driver.core.exceptions.SyntaxError; +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.exceptions.MusicLockingException; import org.onap.music.lockingservice.cassandra.CassaLockStore; +import org.onap.music.lockingservice.cassandra.MusicLockState; import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.main.MusicCore; import org.onap.music.main.MusicUtil; import java.io.FileInputStream; @@ -31,6 +35,8 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.*; +import org.onap.music.mdbc.mixins.MusicInterface; +import org.onap.music.mdbc.tables.MusicRangeInformationRow; import static junit.framework.TestCase.assertNotNull; import static junit.framework.TestCase.fail; @@ -41,6 +47,31 @@ public class TestUtils { private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TestUtils.class); + public static DatabasePartition createBasicRow(Range range, MusicInterface mixin, String mdbcServerName){ + final UUID uuid = MDBCUtils.generateTimebasedUniqueKey(); + List<Range> ranges = new ArrayList<>(); + ranges.add(range); + DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null); + MusicRangeInformationRow newRow = new MusicRangeInformationRow(uuid,dbPartition, new ArrayList<>(), "", + mdbcServerName, true); + DatabasePartition partition=null; + try { + partition = mixin.createMusicRangeInformation(newRow); + } catch (MDBCServiceException e) { + fail("failure when creating new row"); + } + return partition; + } + + public static void unlockRow(String keyspace, String mriTableName, DatabasePartition partition){ + String fullyQualifiedMriKey = keyspace+"."+ mriTableName+"."+partition.getMRIIndex().toString(); + try { + MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId()); + } catch (MusicLockingException e) { + fail("failure when releasing lock"); + } + } + public static void createKeyspace(String keyspace, Session session) { String queryOp = "CREATE KEYSPACE " + keyspace + diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java index e4facc7..0b2bb57 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java @@ -20,18 +20,19 @@ package org.onap.music.mdbc.mixins; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.Assert.*; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Session; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.UUID; + +import java.util.*; + import org.cassandraunit.utils.EmbeddedCassandraServerHelper; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Ignore; @@ -47,7 +48,11 @@ import org.onap.music.lockingservice.cassandra.CassaLockStore; import org.onap.music.lockingservice.cassandra.MusicLockState; import org.onap.music.main.MusicCore; import org.onap.music.mdbc.DatabasePartition; +import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.TestUtils; +import org.onap.music.mdbc.ownership.Dag; +import org.onap.music.mdbc.ownership.DagNode; import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.service.impl.MusicCassaCore; @@ -82,6 +87,18 @@ public class MusicMixinTest { MusicDataStoreHandle.mDstoreHandle = new MusicDataStore(cluster, session); CassaLockStore store = new CassaLockStore(MusicDataStoreHandle.mDstoreHandle); assertNotNull("Invalid configuration for music", store); + } + + @AfterClass + public static void close() throws MusicServiceException, MusicQueryException { + //TODO: shutdown cassandra + session.close(); + cluster.close(); + } + + @Before + public void initTest(){ + session.execute("DROP KEYSPACE IF EXISTS "+keyspace); try { Properties properties = new Properties(); properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,keyspace); @@ -90,127 +107,121 @@ public class MusicMixinTest { } catch (MDBCServiceException e) { fail("error creating music mixin"); } - } - @AfterClass - public static void close() throws MusicServiceException, MusicQueryException { - //TODO: shutdown cassandra - session.close(); - cluster.close(); } @Test(timeout=1000) public void own() { - final UUID uuid = mixin.generateUniqueKey(); + Range range = new Range("table1"); List<Range> ranges = new ArrayList<>(); - ranges.add(new Range("table1")); - DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null); - MusicRangeInformationRow newRow = new MusicRangeInformationRow(dbPartition, new ArrayList<>(), "", mdbcServerName); - DatabasePartition partition=null; - try { - partition = mixin.createMusicRangeInformation(newRow); - } catch (MDBCServiceException e) { - fail("failure when creating new row"); - } - String fullyQualifiedMriKey = keyspace+"."+ mriTableName+"."+partition.getMRIIndex().toString(); - try { - MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId()); - } catch (MusicLockingException e) { - fail("failure when releasing lock"); - } - DatabasePartition newPartition = new DatabasePartition(mixin.generateUniqueKey()); + ranges.add(range); + final DatabasePartition partition = TestUtils.createBasicRow(range, mixin, mdbcServerName); + TestUtils.unlockRow(keyspace,mriTableName,partition); + + DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey()); try { - mixin.own(ranges,newPartition); + mixin.own(ranges,currentPartition, MDBCUtils.generateTimebasedUniqueKey()); } catch (MDBCServiceException e) { fail("failure when running own function"); } } - @Test(timeout=1000) - @Ignore //TODO: Fix this. it is breaking because of previous test^ - public void own2() { - final UUID uuid = mixin.generateUniqueKey(); - final UUID uuid2 = mixin.generateUniqueKey(); - List<Range> ranges = new ArrayList<>(); - List<Range> ranges2 = new ArrayList<>(); - ranges.add(new Range("table2")); - ranges2.add(new Range("table3")); + private DatabasePartition addRow(List<Range> ranges,boolean isLatest){ + final UUID uuid = MDBCUtils.generateTimebasedUniqueKey(); DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null); - DatabasePartition dbPartition2 = new DatabasePartition(ranges2,uuid2,null); - MusicRangeInformationRow newRow = new MusicRangeInformationRow(dbPartition, new ArrayList<>(), "", mdbcServerName); - MusicRangeInformationRow newRow2 = new MusicRangeInformationRow(dbPartition2, new ArrayList<>(), "", mdbcServerName); + MusicRangeInformationRow newRow = new MusicRangeInformationRow(uuid,dbPartition, new ArrayList<>(), "", + mdbcServerName, isLatest); DatabasePartition partition=null; - DatabasePartition partition2=null; try { partition = mixin.createMusicRangeInformation(newRow); - partition2 = mixin.createMusicRangeInformation(newRow2); } catch (MDBCServiceException e) { fail("failure when creating new row"); } String fullyQualifiedMriKey = keyspace+"."+ mriTableName+"."+partition.getMRIIndex().toString(); - String fullyQualifiedMriKey2 = keyspace+"."+ mriTableName+"."+partition2.getMRIIndex().toString(); try { MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId()); - MusicLockState musicLockState2 = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey2, partition2.getLockId()); } catch (MusicLockingException e) { fail("failure when releasing lock"); } - DatabasePartition blankPartition = new DatabasePartition(mixin.generateUniqueKey()); - DatabasePartition newPartition=null; + return partition; + } + + @Test(timeout=1000) + public void own2() throws InterruptedException, MDBCServiceException { + List<Range> range12 = new ArrayList<>( Arrays.asList( + new Range("range1"), + new Range("range2") + )); + List<Range> range34 = new ArrayList<>( Arrays.asList( + new Range("range3"), + new Range("range4") + )); + List<Range> range24 = new ArrayList<>( Arrays.asList( + new Range("range2"), + new Range("range4") + )); + List<Range> range123 = new ArrayList<>( Arrays.asList( + new Range("range1"), + new Range("range2"), + new Range("range3") + )); + DatabasePartition db1 = addRow(range12, false); + DatabasePartition db2 = addRow(range34, false); + MILLISECONDS.sleep(10); + DatabasePartition db3 = addRow(range12, true); + DatabasePartition db4 = addRow(range34, true); + MILLISECONDS.sleep(10); + DatabasePartition db5 = addRow(range24, true); + DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey()); + MusicInterface.OwnershipReturn own = null; try { - List<Range> ownRanges = new ArrayList<>(); - ownRanges.add(new Range("table2")); - ownRanges.add(new Range("table3")); - newPartition = mixin.own(ownRanges, blankPartition); + own = mixin.own(range123, currentPartition, MDBCUtils.generateTimebasedUniqueKey()); } catch (MDBCServiceException e) { fail("failure when running own function"); } - assertEquals(2,newPartition.getOldMRIIds().size()); - assertEquals(newPartition.getLockId(),blankPartition.getLockId()); - assertTrue(newPartition.getOldMRIIds().get(0).equals(partition.getMRIIndex())|| - newPartition.getOldMRIIds().get(1).equals(partition.getMRIIndex())); - assertTrue(newPartition.getOldMRIIds().get(0).equals(partition2.getMRIIndex())|| - newPartition.getOldMRIIds().get(1).equals(partition2.getMRIIndex())); - String finalfullyQualifiedMriKey = keyspace+"."+ mriTableName+"."+blankPartition.getMRIIndex().toString(); - try { - List<String> lockQueue = MusicCassaCore.getLockingServiceHandle().getLockQueue(keyspace, mriTableName, - blankPartition.getMRIIndex().toString()); - assertEquals(1,lockQueue.size()); - assertEquals(lockQueue.get(0),blankPartition.getLockId()); - } catch (MusicServiceException|MusicQueryException|MusicLockingException e) { - fail("failure on getting queue"); - } - MusicRangeInformationRow musicRangeInformation=null; - try { - musicRangeInformation= mixin.getMusicRangeInformation(blankPartition.getMRIIndex()); - } catch (MDBCServiceException e) { - fail("fail to retrieve row"); - } - assertEquals(2,musicRangeInformation.getDBPartition().getSnapshot().size()); - assertEquals(0,musicRangeInformation.getRedoLog().size()); - assertEquals(blankPartition.getLockId(),musicRangeInformation.getOwnerId()); - assertEquals(mdbcServerName,musicRangeInformation.getMetricProcessId()); - List<Range> snapshot = musicRangeInformation.getDBPartition().getSnapshot(); - boolean containsTable1=false; - Range table1Range = new Range("table2"); - for(Range r:snapshot){ - if(r.overlaps(table1Range)){ - containsTable1=true; - break; - } - } - assertTrue(containsTable1); - boolean containsTable2=false; - Range table2Range = new Range("table3"); - for(Range r:snapshot){ - if(r.overlaps(table2Range)){ - containsTable2=true; - break; - } - } - assertTrue(containsTable2); + Dag dag = own.getDag(); + + DagNode node4 = dag.getNode(db4.getMRIIndex()); + assertFalse(node4.hasNotIncomingEdges()); + List<DagNode> outgoingEdges = new ArrayList<>(node4.getOutgoingEdges()); + assertEquals(1,outgoingEdges.size()); + + DagNode missing = outgoingEdges.get(0); + Set<Range> missingRanges = missing.getRangeSet(); + assertEquals(2,missingRanges.size()); + assertTrue(missingRanges.contains(new Range("range1"))); + assertTrue(missingRanges.contains(new Range("range3"))); + List<DagNode> outgoingEdges1 = missing.getOutgoingEdges(); + assertEquals(1,outgoingEdges1.size()); + + DagNode finalNode = outgoingEdges1.get(0); + assertFalse(finalNode.hasNotIncomingEdges()); + Set<Range> finalSet = finalNode.getRangeSet(); + assertEquals(3,finalSet.size()); + assertTrue(finalSet.contains(new Range("range1"))); + assertTrue(finalSet.contains(new Range("range2"))); + assertTrue(finalSet.contains(new Range("range3"))); + + DagNode node5 = dag.getNode(db5.getMRIIndex()); + List<DagNode> toRemoveOutEdges = node5.getOutgoingEdges(); + assertEquals(1,toRemoveOutEdges.size()); + toRemoveOutEdges.remove(finalNode); + assertEquals(0,toRemoveOutEdges.size()); + + MusicRangeInformationRow row = mixin.getMusicRangeInformation(own.getRangeId()); + assertTrue(row.getIsLatest()); + DatabasePartition dbPartition = row.getDBPartition(); + List<Range> snapshot = dbPartition.getSnapshot(); + assertEquals(3,snapshot.size()); + MusicRangeInformationRow node5row = mixin.getMusicRangeInformation(node5.getId()); + assertFalse(node5row.getIsLatest()); + MusicRangeInformationRow node4Row = mixin.getMusicRangeInformation(db4.getMRIIndex()); + assertFalse(node4Row.getIsLatest()); + MusicRangeInformationRow node3Row = mixin.getMusicRangeInformation(db3.getMRIIndex()); + assertFalse(node3Row.getIsLatest()); } + @Test public void relinquish() { } diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java new file mode 100644 index 0000000..85e31cd --- /dev/null +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java @@ -0,0 +1,423 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ + +package org.onap.music.mdbc.ownership; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.Test; +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.mdbc.DatabasePartition; +import org.onap.music.mdbc.MDBCUtils; +import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.mdbc.tables.MusicTxDigestId; + +import static java.util.concurrent.TimeUnit.*; +import static org.junit.Assert.*; + +public class DagTest { + + private MusicRangeInformationRow createNewRow(List<Range> ranges, String lockid, boolean isLatest){ + List<MusicTxDigestId> redoLog = new ArrayList<>(); + return createNewRow(ranges,lockid,isLatest,redoLog); + } + + private MusicRangeInformationRow createNewRow(List<Range> ranges, String lockid, boolean isLatest, + List<MusicTxDigestId> redoLog) { + UUID id = MDBCUtils.generateTimebasedUniqueKey(); + DatabasePartition dbPartition = new DatabasePartition(ranges, id, lockid); + return new MusicRangeInformationRow(id, dbPartition, redoLog, lockid, "id", isLatest); + } + + @Test + public void getDag() throws InterruptedException, MDBCServiceException { + List<MusicRangeInformationRow> rows = new ArrayList<>(); + List<Range> ranges = new ArrayList<>( Arrays.asList( + new Range("range1") + )); + rows.add(createNewRow(new ArrayList<>(ranges),"",false)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(ranges),"",false)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(ranges),"",true)); + Dag dag = Dag.getDag(rows, ranges); + DagNode node1 = dag.getNode(rows.get(0).getPartitionIndex()); + DagNode node2 = dag.getNode(rows.get(1).getPartitionIndex()); + DagNode node3 = dag.getNode(rows.get(2).getPartitionIndex()); + List<DagNode> outgoingEdges1 = node1.getOutgoingEdges(); + assertTrue(node1.hasNotIncomingEdges()); + assertEquals(outgoingEdges1.size(),1); + assertEquals(outgoingEdges1.get(0),node2); + List<DagNode> outgoingEdges2 = node2.getOutgoingEdges(); + assertEquals(outgoingEdges2.size(),1); + assertEquals(outgoingEdges2.get(0),node3); + assertFalse(node2.hasNotIncomingEdges()); + List<DagNode> outgoingEdges3 = node3.getOutgoingEdges(); + assertEquals(outgoingEdges3.size(),0); + assertFalse(node3.hasNotIncomingEdges()); + } + + @Test + public void getDag2() throws InterruptedException, MDBCServiceException { + List<MusicRangeInformationRow> rows = new ArrayList<>(); + List<Range> range1 = new ArrayList<>( Arrays.asList( + new Range("range1") + )); + List<Range> range2 = new ArrayList<>( Arrays.asList( + new Range("range2") + )); + List<Range> ranges = new ArrayList<>( Arrays.asList( + new Range("range2"), + new Range("range1") + )); + rows.add(createNewRow(new ArrayList<>(range1),"",false)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(range2),"",false)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(ranges),"",true)); + Dag dag = Dag.getDag(rows, ranges); + DagNode node1 = dag.getNode(rows.get(0).getPartitionIndex()); + DagNode node2 = dag.getNode(rows.get(1).getPartitionIndex()); + DagNode node3 = dag.getNode(rows.get(2).getPartitionIndex()); + List<DagNode> outgoingEdges1 = node1.getOutgoingEdges(); + assertTrue(node1.hasNotIncomingEdges()); + assertEquals(outgoingEdges1.size(),1); + assertEquals(outgoingEdges1.get(0),node3); + List<DagNode> outgoingEdges2 = node2.getOutgoingEdges(); + assertEquals(outgoingEdges2.size(),1); + assertEquals(outgoingEdges2.get(0),node3); + assertTrue(node2.hasNotIncomingEdges()); + List<DagNode> outgoingEdges3 = node3.getOutgoingEdges(); + assertEquals(outgoingEdges3.size(),0); + assertFalse(node3.hasNotIncomingEdges()); + } + + + @Test + public void nextToOwn() throws InterruptedException, MDBCServiceException { + List<MusicRangeInformationRow> rows = new ArrayList<>(); + List<Range> ranges = new ArrayList<>( Arrays.asList( + new Range("range1") + )); + rows.add(createNewRow(new ArrayList<>(ranges),"",false)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(ranges),"",false)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(ranges),"",true)); + Dag dag = Dag.getDag(rows, ranges); + int counter = 0; + while(dag.hasNextToOwn()){ + DagNode node = dag.nextToOwn(); + MusicRangeInformationRow row = node.getRow(); + UUID uuid = row.getPartitionIndex(); + assertEquals(rows.get(counter).getPartitionIndex(),uuid); + dag.setOwn(node); + counter++; + assertNotEquals(4,counter); + } + assertEquals(3,counter); + assertTrue(dag.isOwned()); + } + + @Test + public void nextToApply() throws InterruptedException { + List<MusicRangeInformationRow> rows = new ArrayList<>(); + List<Range> ranges = new ArrayList<>( Arrays.asList( + new Range("range1") + )); + List<MusicTxDigestId> redo1 = new ArrayList<>(Arrays.asList( + new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0) + )); + rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo1)); + MILLISECONDS.sleep(10); + List<MusicTxDigestId> redo2 = new ArrayList<>(Arrays.asList( + new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0) + )); + rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo2)); + MILLISECONDS.sleep(10); + List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList( + new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0) + )); + rows.add(createNewRow(new ArrayList<>(ranges),"",true,redo3)); + Dag dag = Dag.getDag(rows, ranges); + int nodeCounter = 0; + HashSet<Range> rangesSet = new HashSet<>(ranges); + while(!dag.applied()){ + DagNode node = dag.nextToApply(ranges); + Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangesSet); + int transactionCounter = 0; + while(pair!=null) { + assertNotEquals(1,transactionCounter); + MusicRangeInformationRow row = rows.get(nodeCounter); + MusicTxDigestId id = row.getRedoLog().get(transactionCounter); + assertEquals(id,pair.getKey()); + assertEquals(0,pair.getKey().index); + List<Range> value = pair.getValue(); + assertEquals(1,value.size()); + assertEquals(new Range("range1"),value.get(0)); + pair = node.nextNotAppliedTransaction(rangesSet); + transactionCounter++; + } + assertEquals(1,transactionCounter); + nodeCounter++; + } + assertEquals(3,nodeCounter); + } + + @Test + public void nextToApply2() throws InterruptedException, MDBCServiceException { + Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied = new HashMap<>(); + List<MusicRangeInformationRow> rows = new ArrayList<>(); + List<Range> ranges = new ArrayList<>( Arrays.asList( + new Range("range1") + )); + List<MusicTxDigestId> redo1 = new ArrayList<>(Arrays.asList( + new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0) + )); + rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo1)); + MILLISECONDS.sleep(10); + List<MusicTxDigestId> redo2 = new ArrayList<>(Arrays.asList( + new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0), + new MusicTxDigestId(MDBCUtils.generateUniqueKey(),1) + )); + MusicRangeInformationRow newRow = createNewRow(new ArrayList<>(ranges), "", false, redo2); + alreadyApplied.put(new Range("range1"),Pair.of(newRow, 0)); + rows.add(newRow); + MILLISECONDS.sleep(10); + List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList( + new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0) + )); + rows.add(createNewRow(new ArrayList<>(ranges),"",true,redo3)); + Dag dag = Dag.getDag(rows, ranges); + HashSet<Range> rangesSet = new HashSet<>(ranges); + dag.setAlreadyApplied(alreadyApplied, rangesSet); + int nodeCounter = 1; + while(!dag.applied()){ + DagNode node = dag.nextToApply(ranges); + Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangesSet); + int transactionCounter = 0; + while(pair!=null) { + assertNotEquals(1,transactionCounter); + MusicRangeInformationRow row = rows.get(nodeCounter); + MusicTxDigestId id = row.getRedoLog().get(2-nodeCounter); + assertEquals(id,pair.getKey()); + assertEquals(2-nodeCounter,pair.getKey().index); + List<Range> value = pair.getValue(); + assertEquals(1,value.size()); + assertEquals(new Range("range1"),value.get(0)); + pair = node.nextNotAppliedTransaction(rangesSet); + transactionCounter++; + } + assertEquals(1,transactionCounter); + nodeCounter++; + } + assertEquals(3,nodeCounter); + } + + @Test + public void isDifferent() throws InterruptedException { + List<MusicRangeInformationRow> rows = new ArrayList<>(); + List<Range> range1 = new ArrayList<>( Arrays.asList( + new Range("range1") + )); + List<Range> range2 = new ArrayList<>( Arrays.asList( + new Range("range2") + )); + List<Range> ranges = new ArrayList<>( Arrays.asList( + new Range("range2"), + new Range("range1") + )); + rows.add(createNewRow(new ArrayList<>(range1),"",false)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(range2),"",false)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(ranges),"",true)); + List<MusicRangeInformationRow> rows2 = new ArrayList<>(rows); + List<MusicRangeInformationRow> rows3 = new ArrayList<>(rows); + MILLISECONDS.sleep(10); + rows3.add(createNewRow(new ArrayList<>(ranges),"",true)); + Dag dag = Dag.getDag(rows, ranges); + Dag dag2 = Dag.getDag(rows2, new ArrayList<>(ranges)); + Dag dag3 = Dag.getDag(rows3, new ArrayList<>(ranges)); + assertFalse(dag.isDifferent(dag2)); + assertFalse(dag2.isDifferent(dag)); + assertTrue(dag.isDifferent(dag3)); + assertTrue(dag3.isDifferent(dag)); + assertTrue(dag2.isDifferent(dag3)); + assertTrue(dag3.isDifferent(dag2)); + } + + @Test + public void getOldestDoubles() throws InterruptedException, MDBCServiceException { + List<MusicRangeInformationRow> rows = new ArrayList<>(); + List<Range> range1 = new ArrayList<>( Arrays.asList( + new Range("range1") + )); + List<Range> range2 = new ArrayList<>( Arrays.asList( + new Range("range2") + )); + List<Range> ranges = new ArrayList<>( Arrays.asList( + new Range("range2"), + new Range("range1") + )); + rows.add(createNewRow(new ArrayList<>(range1),"",false)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(range2),"",false)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(range1),"",true)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(range2),"",true)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(ranges),"",true)); + Dag dag = Dag.getDag(rows, ranges); + List<DagNode> oldestDoubles = dag.getOldestDoubles(); + assertTrue(oldestDoubles.contains(dag.getNode(rows.get(2).getPartitionIndex()))); + assertTrue(oldestDoubles.contains(dag.getNode(rows.get(3).getPartitionIndex()))); + assertEquals(2,oldestDoubles.size()); + } + + @Test + public void getIncompleteRangesAndDependents() throws InterruptedException, MDBCServiceException { + List<MusicRangeInformationRow> rows = new ArrayList<>(); + List<Range> range1 = new ArrayList<>( Arrays.asList( + new Range("range1") + )); + List<Range> range2 = new ArrayList<>( Arrays.asList( + new Range("range2"), + new Range("range3") + )); + List<Range> ranges = new ArrayList<>( Arrays.asList( + new Range("range2"), + new Range("range1") + )); + rows.add(createNewRow(new ArrayList<>(range1),"",false)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(range2),"",false)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(range1),"",true)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(range2),"",true)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(ranges),"",true)); + Dag dag = Dag.getDag(rows, ranges); + Pair<List<Range>, Set<DagNode>> incompleteRangesAndDependents = dag.getIncompleteRangesAndDependents(); + List<Range> incomplete = incompleteRangesAndDependents.getKey(); + Set<DagNode> dependents = incompleteRangesAndDependents.getValue(); + assertEquals(1,incomplete.size()); + assertTrue(incomplete.contains(new Range("range3"))); + assertEquals(1,dependents.size()); + assertTrue(dependents.contains(dag.getNode(rows.get(3).getPartitionIndex()))); + } + + @Test + public void getIncompleteRangesAndDependents2() throws InterruptedException, MDBCServiceException { + List<MusicRangeInformationRow> rows = new ArrayList<>(); + List<Range> range1 = new ArrayList<>( Arrays.asList( + new Range("range1"), + new Range("range4") + )); + List<Range> range2 = new ArrayList<>( Arrays.asList( + new Range("range2"), + new Range("range3") + )); + List<Range> ranges = new ArrayList<>( Arrays.asList( + new Range("range2"), + new Range("range1") + )); + rows.add(createNewRow(new ArrayList<>(range1),"",false)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(range2),"",false)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(range1),"",true)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(range2),"",true)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(ranges),"",true)); + Dag dag = Dag.getDag(rows, ranges); + Pair<List<Range>, Set<DagNode>> incompleteRangesAndDependents = dag.getIncompleteRangesAndDependents(); + List<Range> incomplete = incompleteRangesAndDependents.getKey(); + Set<DagNode> dependents = incompleteRangesAndDependents.getValue(); + assertEquals(2,incomplete.size()); + assertTrue(incomplete.contains(new Range("range3"))); + assertTrue(incomplete.contains(new Range("range4"))); + assertEquals(2,dependents.size()); + assertTrue(dependents.contains(dag.getNode(rows.get(3).getPartitionIndex()))); + assertTrue(dependents.contains(dag.getNode(rows.get(2).getPartitionIndex()))); + } + + @Test + public void addNewNodeWithSearch() throws InterruptedException, MDBCServiceException { + List<MusicRangeInformationRow> rows = new ArrayList<>(); + List<Range> range1 = new ArrayList<>( Arrays.asList( + new Range("range1") + )); + List<Range> range2 = new ArrayList<>( Arrays.asList( + new Range("range2"), + new Range("range3") + )); + List<Range> ranges = new ArrayList<>( Arrays.asList( + new Range("range2"), + new Range("range1") + )); + List<Range> allRanges = new ArrayList<>( Arrays.asList( + new Range("range2"), + new Range("range3"), + new Range("range1") + )); + rows.add(createNewRow(new ArrayList<>(range1),"",false)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(range2),"",false)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(range1),"",true)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(range2),"",true)); + MILLISECONDS.sleep(10); + rows.add(createNewRow(new ArrayList<>(ranges),"",true)); + Dag dag = Dag.getDag(rows, ranges); + MusicRangeInformationRow newRow = createNewRow(new ArrayList<>(allRanges), "", true); + dag.addNewNodeWithSearch(newRow,allRanges); + DagNode newNode = dag.getNode(newRow.getPartitionIndex()); + DagNode node = dag.getNode(rows.get(4).getPartitionIndex()); + List<DagNode> outgoingEdges = node.getOutgoingEdges(); + assertEquals(1,outgoingEdges.size()); + assertEquals(newNode,outgoingEdges.get(0)); + DagNode oNode = dag.getNode(rows.get(3).getPartitionIndex()); + outgoingEdges = node.getOutgoingEdges(); + assertEquals(1,outgoingEdges.size()); + assertEquals(newNode,outgoingEdges.get(0)); + DagNode node0 = dag.getNode(rows.get(0).getPartitionIndex()); + outgoingEdges = node0.getOutgoingEdges(); + assertEquals(1,outgoingEdges.size()); + DagNode node1 = dag.getNode(rows.get(1).getPartitionIndex()); + outgoingEdges = node1.getOutgoingEdges(); + assertEquals(1,outgoingEdges.size()); + DagNode node2 = dag.getNode(rows.get(2).getPartitionIndex()); + outgoingEdges = node1.getOutgoingEdges(); + assertEquals(1,outgoingEdges.size()); + } +}
\ No newline at end of file diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java new file mode 100644 index 0000000..753c629 --- /dev/null +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java @@ -0,0 +1,237 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ + +package org.onap.music.mdbc.ownership; + +import static org.junit.Assert.*; + +import ch.vorburger.exec.ManagedProcessException; +import ch.vorburger.mariadb4j.DB; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; + +import java.sql.*; +import java.util.*; + +import org.cassandraunit.utils.EmbeddedCassandraServerHelper; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.onap.music.datastore.MusicDataStore; +import org.onap.music.datastore.MusicDataStoreHandle; +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.exceptions.MusicQueryException; +import org.onap.music.exceptions.MusicServiceException; +import org.onap.music.lockingservice.cassandra.CassaLockStore; +import org.onap.music.mdbc.DatabasePartition; +import org.onap.music.mdbc.MDBCUtils; +import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.TestUtils; +import org.onap.music.mdbc.mixins.LockResult; +import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn; +import org.onap.music.mdbc.mixins.MusicMixin; +import org.onap.music.mdbc.mixins.MySQLMixin; +import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.mdbc.tables.StagingTable; +import org.onap.music.mdbc.tables.TxCommitProgress; + +public class OwnershipAndCheckpointTest { + final private static int sqlPort = 13350; + final private static String keyspace="metricmusictest"; + final private static String mriTableName = "musicrangeinformation"; + final private static String mtdTableName = "musictxdigest"; + final private static String mdbcServerName = "name"; + public static final String DATABASE = "mdbcTest"; + public static final String TABLE= "Persons"; + public static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS " + TABLE + " (\n" + + " PersonID int,\n" + + " LastName varchar(255),\n" + + " FirstName varchar(255),\n" + + " Address varchar(255),\n" + + " City varchar(255),\n" + + " PRIMARY KEY (PersonID,LastName)" + + ");"; + public static final String DROP_TABLE = "DROP TABLE IF EXISTS " + TABLE + ";"; + //Properties used to connect to music + private static Cluster cluster; + private static Session session; + private static String cassaHost = "localhost"; + private static MusicMixin musicMixin = null; + private static DB db; + Connection conn; + MySQLMixin mysqlMixin; + + @BeforeClass + public static void init() throws MusicServiceException, ClassNotFoundException, ManagedProcessException { + try { + EmbeddedCassandraServerHelper.startEmbeddedCassandra(); + } catch (Exception e) { + fail(e.getMessage()); + } + cluster = new Cluster.Builder().addContactPoint(cassaHost).withPort(9142).build(); + cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(20000); + assertNotNull("Invalid configuration for cassandra", cluster); + session = cluster.connect(); + assertNotNull("Invalid configuration for cassandra", session); + Class.forName("org.mariadb.jdbc.Driver"); + MusicDataStoreHandle.mDstoreHandle = new MusicDataStore(cluster, session); + CassaLockStore store = new CassaLockStore(MusicDataStoreHandle.mDstoreHandle); + assertNotNull("Invalid configuration for music", store); + //start embedded mariadb + db = DB.newEmbeddedDB(sqlPort); + db.start(); + db.createDB(DATABASE); + } + + @AfterClass + public static void close() throws MusicServiceException, MusicQueryException, ManagedProcessException { + //TODO: shutdown cassandra + session.close(); + cluster.close(); + db.stop(); + } + + private void dropTable() throws SQLException { + final Statement dropStatement = this.conn.createStatement(); + dropStatement.execute(DROP_TABLE); + dropStatement.close(); + } + + private void createTable() throws SQLException { + final Statement createStatement = this.conn.createStatement(); + createStatement.execute(CREATE_TABLE); + createStatement.close(); + } + + private void dropAndCreateTable() throws SQLException { + mysqlMixin.dropSQLTriggers(TABLE); + dropTable(); + createTable(); + mysqlMixin.createSQLTriggers(TABLE); + } + + @Before + public void initTest() throws SQLException { + session.execute("DROP KEYSPACE IF EXISTS "+keyspace); + try { + Properties properties = new Properties(); + properties.setProperty(MusicMixin.KEY_MY_ID,mdbcServerName); + properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,keyspace); + musicMixin =new MusicMixin(mdbcServerName,properties); + } catch (MDBCServiceException e) { + fail("error creating music musicMixin"); + } + this.conn = DriverManager.getConnection("jdbc:mariadb://localhost:"+sqlPort+"/"+DATABASE, "root", ""); + this.mysqlMixin = new MySQLMixin(musicMixin, "localhost:"+sqlPort+"/"+DATABASE, conn, null); + dropAndCreateTable(); + } + + private void initDatabase(Range range) throws MDBCServiceException, SQLException { + final DatabasePartition partition = TestUtils.createBasicRow(range, musicMixin, mdbcServerName); + String sqlOperation = "INSERT INTO "+TABLE+" (PersonID,LastName,FirstName,Address,City) VALUES "+ + "(1,'SAUREZ','ENRIQUE','GATECH','ATLANTA');"; + HashMap<Range, StagingTable> stagingTable = new HashMap<>(); + final Statement executeStatement = this.conn.createStatement(); + executeStatement.execute(sqlOperation); + this.conn.commit(); + mysqlMixin.postStatementHook(sqlOperation,stagingTable); + executeStatement.close(); + String id = MDBCUtils.generateUniqueKey().toString(); + TxCommitProgress progressKeeper = new TxCommitProgress(); + progressKeeper.createNewTransactionTracker(id ,this.conn); + musicMixin.commitLog(partition, null, stagingTable, id, progressKeeper); + TestUtils.unlockRow(keyspace,mriTableName,partition); + } + + private OwnershipReturn cleanAndOwnPartition(List<Range> ranges, UUID ownOpId) throws SQLException { + dropAndCreateTable(); + DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey()); + + OwnershipReturn own=null; + try { + own = musicMixin.own(ranges, currentPartition, ownOpId); + } catch (MDBCServiceException e) { + fail("failure when running own function"); + } + return own; + } + + public void checkData() throws SQLException { + Statement statement = this.conn.createStatement(); + ResultSet rs = statement.executeQuery("SELECT * FROM " + TABLE + ";"); + int counter = 0; + while (rs.next()) { + int personId = rs.getInt("PersonID"); + assertEquals(1,personId); + String lastname = rs.getString("LastName"); + assertEquals("SAUREZ",lastname); + String firstname = rs.getString("FirstName"); + assertEquals("ENRIQUE",firstname); + String address = rs.getString("Address"); + assertEquals("GATECH",address); + String city = rs.getString("City"); + assertEquals("ATLANTA",city); + counter++; + } + assertEquals(1,counter); + } + + @Test + @Ignore + public void checkpoint() throws MDBCServiceException, SQLException { + Range range = new Range(TABLE); + OwnershipAndCheckpoint ownAndCheck = musicMixin.getOwnAndCheck(); + initDatabase(range); + + List<Range> ranges = new ArrayList<>(); + ranges.add(range); + UUID ownOpId = MDBCUtils.generateTimebasedUniqueKey(); + OwnershipReturn own = cleanAndOwnPartition(ranges,ownOpId); + + Map<MusicRangeInformationRow, LockResult> locks = new HashMap<>(); + locks.put(own.getDag().getNode(own.getRangeId()).getRow(),new LockResult(own.getRangeId(),own.getOwnerId(),true, + ranges)); + ownAndCheck.checkpoint(musicMixin,mysqlMixin,own.getDag(),ranges,locks, ownOpId); + + checkData(); + } + + @Test + @Ignore + public void warmup() throws MDBCServiceException, SQLException { + Range range = new Range(TABLE); + OwnershipAndCheckpoint ownAndCheck = musicMixin.getOwnAndCheck(); + initDatabase(range); + + List<Range> ranges = new ArrayList<>(); + ranges.add(range); + UUID ownOpId = MDBCUtils.generateTimebasedUniqueKey(); + OwnershipReturn own = cleanAndOwnPartition(ranges,ownOpId); + + Map<MusicRangeInformationRow, LockResult> locks = new HashMap<>(); + locks.put(own.getDag().getNode(own.getRangeId()).getRow(),new LockResult(own.getRangeId(),own.getOwnerId(),true, + ranges)); + ownAndCheck.warmup(musicMixin,mysqlMixin,ranges); + + checkData(); + } +}
\ No newline at end of file @@ -184,7 +184,7 @@ <dependency> <groupId>ch.vorburger.mariaDB4j</groupId> <artifactId>mariaDB4j</artifactId> - <version>2.2.3</version> + <version>2.3.0</version> <scope>test</scope> </dependency> <dependency> @@ -243,6 +243,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-handler</artifactId> + <version>4.1.30.Final</version> + </dependency> </dependencies> <build> |