aboutsummaryrefslogtreecommitdiffstats
path: root/mdbc-server/src/main/java
diff options
context:
space:
mode:
authorEnrique Saurez <enrique.saurez@gmail.com>2018-11-30 19:02:00 -0500
committerEnrique Saurez <enrique.saurez@gmail.com>2019-01-15 18:03:58 -0500
commit111c1795a31f3dc619242c1f13fc6f7812779118 (patch)
tree94053ee484bf4a1f67bb8b3b840c177d05a1fd52 /mdbc-server/src/main/java
parentbde93d748fd26d3f0447a434b92009aa9f24ba7e (diff)
Dag, Ownership and Checkpoint (ignore some tests)
Change-Id: Ia720ba45b4f9c8687f5eac5b0d64fd2be19fedaa Issue-ID: MUSIC-269 Signed-off-by: Enrique Saurez <enrique.saurez@gmail.com>
Diffstat (limited to 'mdbc-server/src/main/java')
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java101
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java6
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java41
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java6
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/Range.java4
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java19
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java2
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java8
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java48
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java181
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java861
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java22
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java402
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java202
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java225
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriRowComparator.java31
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java62
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java2
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java18
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/RangeDependency.java41
20 files changed, 1960 insertions, 322 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
index 9752dcb..ff8eb80 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
@@ -1,3 +1,4 @@
+
/*
* ============LICENSE_START====================================================
* org.onap.music.mdbc
@@ -24,76 +25,87 @@ import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.*;
-import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
/**
* A database range contain information about what ranges should be hosted in the current MDBC instance
- * A database range with an empty map, is supposed to contain all the tables in Music.
- * @author Enrique Saurez
+ * A database range with an empty map, is supposed to contain all the tables in Music.
+ * @author Enrique Saurez
*/
public class DatabasePartition {
private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(DatabasePartition.class);
- private UUID mriIndex;//Index that can be obtained either from
+ private UUID musicRangeInformationIndex;//Index that can be obtained either from
private String lockId;
protected List<Range> ranges;
- private List<UUID> oldMRIIds;
+
+ private boolean ready;
/**
* Each range represents a partition of the database, a database partition is a union of this partitions.
* The only requirement is that the ranges are not overlapping.
*/
+ public DatabasePartition() {
+ this(new ArrayList<Range>(),null,"");
+ }
+
public DatabasePartition(UUID mriIndex) {
this(new ArrayList<Range>(), mriIndex,"");
}
public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String lockId) {
- this.ranges = knownRanges;
+ if(mriIndex==null){
+ ready = false;
+ }
+ else{
+ ready = true;
+ }
+ ranges = knownRanges;
- this.mriIndex = mriIndex;
- this.lockId = lockId;
- this.oldMRIIds = new ArrayList<>();
+ this.setMusicRangeInformationIndex(mriIndex);
+ this.setLockId(lockId);
}
- public DatabasePartition(UUID rangeId, String lockId, List<Range> ranges, List<UUID> oldIds) {
- this.mriIndex = rangeId;
- this.lockId = lockId;
- this.ranges = ranges;
- this.oldMRIIds = oldIds;
- }
-
- /**
+ /**
* This function is used to change the contents of this, with the contents of a different object
* @param otherPartition partition that is used to substitute the local contents
*/
public void updateDatabasePartition(DatabasePartition otherPartition){
- mriIndex = otherPartition.mriIndex;//Index that can be obtained either from
+ musicRangeInformationIndex = otherPartition.musicRangeInformationIndex;//Index that can be obtained either from
lockId = otherPartition.lockId;
ranges = otherPartition.ranges;
+ ready = otherPartition.ready;
}
public String toString(){
- StringBuilder builder = new StringBuilder().append("Row: ["+mriIndex+"], lockId: ["+lockId +"], ranges: [");
- for(Range r: ranges){
- builder.append(r.toString()).append(",");
- }
- builder.append("]");
- return builder.toString();
+ StringBuilder builder = new StringBuilder().append("Row: ["+musicRangeInformationIndex.toString()+"], lockId: ["+lockId +"], ranges: [");
+ for(Range r: ranges){
+ builder.append(r.toString()).append(",");
+ }
+ builder.append("]");
+ return builder.toString();
}
public boolean isLocked(){return lockId != null && !lockId.isEmpty(); }
+ public boolean isReady() {
+ return ready;
+ }
+
+ public void setReady(boolean ready) {
+ this.ready = ready;
+ }
+
public UUID getMRIIndex() {
- return mriIndex;
+ return musicRangeInformationIndex;
}
public void setMusicRangeInformationIndex(UUID musicRangeInformationIndex) {
- this.mriIndex = musicRangeInformationIndex;
+ this.musicRangeInformationIndex = musicRangeInformationIndex;
}
/**
@@ -130,7 +142,7 @@ public class DatabasePartition {
public synchronized List<Range> getSnapshot() {
List<Range> newRange = new ArrayList<>();
for(Range r : ranges){
- newRange.add(r.clone());
+ newRange.add(r.clone());
}
return newRange;
}
@@ -147,7 +159,7 @@ public class DatabasePartition {
}
/**
- * Function to obtain the configuration
+ * Function to obtain the configuration
* @param filepath path to the database range
* @return a new object of type DatabaseRange
* @throws FileNotFoundException
@@ -175,27 +187,12 @@ public class DatabasePartition {
this.lockId = lockId;
}
- /**
- * This function is used to check if we need to create a new row in MRI, beacause one of the new ranges is not contained
- * @param ranges ranges that should be contained in the partition
- * @param partition currently own partition
- * @return
- *
- */
- public boolean owns(List<Range> ranges) {
- for (Range r: ranges) {
- if (!this.ranges.contains(r)) {
- return false;
- }
- }
- return true;
- }
-
- public List<UUID> getOldMRIIds() {
- return oldMRIIds;
- }
-
- public void setOldMRIIds(List<UUID> oldIds) {
- this.oldMRIIds = oldIds;
- }
-}
+ public boolean isContained(Range range){
+ for(Range r: ranges){
+ if(r.overlaps(range)){
+ return true;
+ }
+ }
+ return false;
+ }
+} \ No newline at end of file
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
index 3f45d98..8aca034 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
@@ -103,6 +103,12 @@ public class MDBCUtils {
return UUIDs.random();
}
+ /**
+ * This function is used to generate time based cassandra uuid
+ * @return a timebased UUID that can be used for fields of type uuid in MUSIC/Cassandra
+ */
+ public static UUID generateTimebasedUniqueKey() {return UUIDs.timeBased();}
+
public static Properties getMdbcProperties() {
Properties prop = new Properties();
InputStream input = null;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
index bd0862d..629380d 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
@@ -34,16 +34,10 @@ import java.sql.SQLXML;
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.Executor;
+import org.apache.commons.lang3.NotImplementedException;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.exceptions.QueryException;
import org.onap.music.logging.EELFLoggerDelegate;
@@ -51,10 +45,15 @@ import org.onap.music.logging.format.AppMessages;
import org.onap.music.logging.format.ErrorSeverity;
import org.onap.music.logging.format.ErrorTypes;
import org.onap.music.mdbc.mixins.DBInterface;
+import org.onap.music.mdbc.mixins.LockResult;
import org.onap.music.mdbc.mixins.MixinFactory;
import org.onap.music.mdbc.mixins.MusicInterface;
+import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn;
+import org.onap.music.mdbc.ownership.Dag;
+import org.onap.music.mdbc.ownership.DagNode;
+import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
import org.onap.music.mdbc.query.QueryProcessor;
-import org.onap.music.mdbc.tables.MusicTxDigest;
+import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.TxCommitProgress;
@@ -490,7 +489,8 @@ public class MdbcConnection implements Connection {
//Parse tables from the sql query
Map<String, List<String>> tableToInstruction = QueryProcessor.extractTableFromQuery(sql);
//Check ownership of keys
- this.partition = statemanager.own(this.id, MDBCUtils.getTables(tableToInstruction), dbi);
+ List<Range> ranges = MDBCUtils.getTables(tableToInstruction);
+ this.partition = own(ranges);
dbi.preStatementHook(sql);
}
@@ -541,6 +541,27 @@ public class MdbcConnection implements Connection {
return this.dbi;
}
+ private DatabasePartition own(List<Range> ranges) throws MDBCServiceException {
+ DatabasePartition newPartition = null;
+ OwnershipAndCheckpoint ownAndCheck = mi.getOwnAndCheck();
+ UUID ownOpId = MDBCUtils.generateTimebasedUniqueKey();
+ try {
+ final OwnershipReturn ownershipReturn = mi.own(ranges, partition, ownOpId);
+ Dag dag = ownershipReturn.getDag();
+ DagNode node = dag.getNode(ownershipReturn.getRangeId());
+ MusicRangeInformationRow row = node.getRow();
+ Map<MusicRangeInformationRow, LockResult> lock = new HashMap<>();
+ lock.put(row, new LockResult(row.getPartitionIndex(), ownershipReturn.getOwnerId(), true, ranges));
+ ownAndCheck.checkpoint(this.mi, this.dbi, dag, ranges, lock, ownershipReturn.getOwnershipId());
+ newPartition = new DatabasePartition(ownershipReturn.getRanges(), ownershipReturn.getRangeId(),
+ ownershipReturn.getOwnerId());
+ }
+ finally{
+ ownAndCheck.stopOwnershipTimeoutClock(ownOpId);
+ }
+ return newPartition;
+ }
+
public void relinquishIfRequired(DatabasePartition partition) throws MDBCServiceException {
mi.relinquishIfRequired(partition);
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
index e35c214..cb0cea9 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
@@ -293,12 +293,6 @@ public class MdbcServerLogic extends JdbcMeta{
}
}
-
-
-
-
-
-
@Override
public void rollback(ConnectionHandle ch) {
try {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
index 4bccbba..214678a 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
@@ -51,12 +51,12 @@ public class Range implements Serializable, Cloneable{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Range r = (Range) o;
- return (table.equals(r.table));
+ return (this.overlaps(r)) && (r.overlaps(this));
}
@Override
public int hashCode(){
- return Objects.hash(table);
+ return table.hashCode();
}
@Override
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index 9735800..1359a0d 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -21,6 +21,9 @@ package org.onap.music.mdbc;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.NotImplementedException;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.logging.format.AppMessages;
@@ -29,6 +32,7 @@ import org.onap.music.logging.format.ErrorTypes;
import org.onap.music.mdbc.mixins.DBInterface;
import org.onap.music.mdbc.mixins.MixinFactory;
import org.onap.music.mdbc.mixins.MusicInterface;
+import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn;
import org.onap.music.mdbc.tables.MusicTxDigest;
import org.onap.music.mdbc.tables.TxCommitProgress;
@@ -169,7 +173,6 @@ public class StateManager {
/**
* Opens a connection into database, setting up all necessary triggers, etc
* @param id UUID of a connection
- * @param information
*/
public Connection openConnection(String id) {
Connection sqlConnection;
@@ -241,20 +244,6 @@ public class StateManager {
return openConnection(id);
}
- public DatabasePartition own(String mdbcConnectionId, List<Range> ranges, DBInterface dbi) throws MDBCServiceException {
- DatabasePartition partition = musicInterface.own(ranges, connectionRanges.get(mdbcConnectionId));
- List<UUID> oldRangeIds = partition.getOldMRIIds();
- //\TODO: do in parallel for all range ids
- for(UUID oldRange : oldRangeIds) {
- MusicTxDigest.replayDigestForPartition(musicInterface, oldRange,dbi);
- }
- logger.info("Partition: " + partition.getMRIIndex() + " now owns " + ranges);
- connectionRanges.put(mdbcConnectionId, partition);
- return partition;
- }
-
-
-
public void initializeSystem() {
//\TODO Prefetch data to system using the data ranges as guide
throw new UnsupportedOperationException("Function initialize system needs to be implemented id MdbcStateManager");
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
index fac47c5..5beb6b7 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
@@ -88,7 +88,7 @@ public class TablesConfiguration {
partitionId = partitionInfo.partitionId;
}
//2) Create a row in the transaction information table
- UUID mriTableIndex = MDBCUtils.generateUniqueKey();
+ UUID mriTableIndex = MDBCUtils.generateTimebasedUniqueKey();
//3) Add owner and tit information to partition info table
RedoRow newRedoRow = new RedoRow(mriTableName,mriTableIndex);
//DatabaseOperations.updateRedoRow(musicNamespace,pitName,partitionId,newRedoRow,partitionInfo.owner,null);
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
index 383b4b3..01d346c 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
@@ -112,12 +112,16 @@ public interface DBInterface {
String getPrimaryKey(String sql, String tableName);
- String applyDigest(Map<Range,StagingTable> digest);
-
/**
* Replay a given TxDigest into the local DB
* @param digest
* @throws SQLException if replay cannot occur correctly
*/
void replayTransaction(HashMap<Range,StagingTable> digest) throws SQLException;
+
+ void disableForeignKeyChecks() throws SQLException;
+
+ void enableForeignKeyChecks() throws SQLException;
+
+ void applyTxDigest(HashMap<Range, StagingTable> txDigest) throws SQLException;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java
new file mode 100644
index 0000000..7dd92c4
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc.mixins;
+
+import java.util.List;
+import java.util.UUID;
+import org.onap.music.mdbc.Range;
+
+public class LockResult{
+ private final UUID musicRangeInformationIndex;
+ private final String ownerId;
+ private List<Range> ranges;
+ private final boolean newLock;
+
+ public LockResult(UUID rowId, String ownerId, boolean newLock, List<Range> ranges){
+ this.musicRangeInformationIndex = rowId;
+ this.ownerId=ownerId;
+ this.newLock=newLock;
+ this.ranges=ranges;
+ }
+ public String getOwnerId(){
+ return ownerId;
+ }
+ public boolean isNewLock(){
+ return newLock;
+ }
+ public UUID getIndex() {return musicRangeInformationIndex;}
+ public List<Range> getRanges() {return ranges;}
+ public void addRange(Range range){ranges.add(range);}
+} \ No newline at end of file
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
index 12fe873..8abfba1 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,6 +19,7 @@
*/
package org.onap.music.mdbc.mixins;
+import com.datastax.driver.core.ResultSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -32,10 +33,9 @@ import org.onap.music.exceptions.MusicLockingException;
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
-import org.onap.music.mdbc.tables.MusicTxDigestId;
-import org.onap.music.mdbc.tables.StagingTable;
-import org.onap.music.mdbc.tables.MusicRangeInformationRow;
-import org.onap.music.mdbc.tables.TxCommitProgress;
+import org.onap.music.mdbc.ownership.Dag;
+import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
+import org.onap.music.mdbc.tables.*;
/**
* This Interface defines the methods that MDBC needs for a class to provide access to the persistence layer of MUSIC.
@@ -43,6 +43,29 @@ import org.onap.music.mdbc.tables.TxCommitProgress;
* @author Robert P. Eby
*/
public interface MusicInterface {
+ class OwnershipReturn{
+ private final UUID ownershipId;
+ private final String ownerId;
+ private final UUID rangeId;
+ private final List<Range> ranges;
+ private final Dag dag;
+ public OwnershipReturn(UUID ownershipId, String ownerId, UUID rangeId, List<Range> ranges, Dag dag){
+ this.ownershipId=ownershipId;
+ this.ownerId=ownerId;
+ this.rangeId=rangeId;
+ this.ranges=ranges;
+ this.dag=dag;
+ }
+ public String getOwnerId(){
+ return ownerId;
+ }
+ public UUID getRangeId(){
+ return rangeId;
+ }
+ public List<Range> getRanges(){ return ranges; }
+ public Dag getDag(){return dag;}
+ public UUID getOwnershipId() { return ownershipId; }
+ }
/**
* Get the name of this MusicInterface mixin object.
* @return the name
@@ -77,7 +100,7 @@ public interface MusicInterface {
/**
* This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables.
* The keyspace name comes from the initialization properties passed to the JDBC driver.
- * @throws MusicServiceException
+ * @throws MusicServiceException
*/
void createKeyspace() throws MDBCServiceException;
/**
@@ -144,7 +167,7 @@ public interface MusicInterface {
* @param changedRow This is information about the row that has changed
*/
void updateDirtyRowAndEntityTableInMusic(TableInfo ti, String tableName, JSONObject changedRow);
-
+
Object[] getObjects(TableInfo ti, String tableName, JSONObject row);
/**
* Returns the primary key associated with the given row
@@ -160,84 +183,128 @@ public interface MusicInterface {
*
* @param partition information related to ownership of partitions, used to verify ownership when commiting the Tx
* @param transactionDigest digest of the transaction that is being committed into the Redo log in music. It has to
- * be a HashMap, because it is required to be serializable
+ * be a HashMap, because it is required to be serializable
* @param txId id associated with the log being send
* @param progressKeeper data structure that is used to handle to detect failures, and know what to do
* @throws MDBCServiceException
*/
void commitLog(DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException;
-
- /**
- * This function is used to obtain the information related to a specific row in the MRI table
- * @param partitionIndex index of the row that is going to be retrieved
- * @return all the information related to the table
- * @throws MDBCServiceException
- */
+
+ /**
+ * This function is used to obtain the information related to a specific row in the MRI table
+ * @param partitionIndex index of the row that is going to be retrieved
+ * @return all the information related to the table
+ * @throws MDBCServiceException
+ */
MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException;
/**
- * This function is used to create a new row in the MRI table
- * @param info the information used to create the row
- * @return the new partition object that contain the new information used to create the row
+ * This function is used to get the dependencies of a given range
+ * @param baseRange range for which we search the dependencies
+ * @return dependencies
* @throws MDBCServiceException
*/
+ RangeDependency getMusicRangeDependency(Range baseRange) throws MDBCServiceException;
+
+ /**
+ * This function is used to create a new row in the MRI table
+ * @param info the information used to create the row
+ * @return the new partition object that contain the new information used to create the row
+ * @throws MDBCServiceException
+ */
DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException;
/**
- * This function is used to append an index to the redo log in a MRI row
- * @param partition information related to ownership of partitions, used to verify ownership
- * @param newRecord index of the new record to be appended to the redo log
+ * This function is used to create all the required music dependencies
+ * @param rangeAndDependencies
* @throws MDBCServiceException
*/
- void appendToRedoLog( DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException;
+ void createMusicRangeDependency(RangeDependency rangeAndDependencies) throws MDBCServiceException;
- /**
- * This functions adds the tx digest to
- * @param newId id used as index in the MTD table
- * @param transactionDigest digest that contains all the changes performed in the transaction
- * @throws MDBCServiceException
- */
+ /**
+ * This function is used to append an index to the redo log in a MRI row
+ * @param partition information related to ownership of partitions, used to verify ownership
+ * @param newRecord index of the new record to be appended to the redo log
+ * @throws MDBCServiceException
+ */
+ void appendToRedoLog(DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException;
+
+ /**
+ * This functions adds the tx digest to
+ * @param newId id used as index in the MTD table
+ * @param transactionDigest digest that contains all the changes performed in the transaction
+ * @throws MDBCServiceException
+ */
void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException;
- /**
- * Function used to retrieve a given transaction digest and deserialize it
- * @param id of the transaction digest to be retrieved
- * @return the deserialize transaction digest that can be applied to the local SQL database
- * @throws MDBCServiceException
- */
+ /**
+ * Function used to retrieve a given transaction digest and deserialize it
+ * @param id of the transaction digest to be retrieved
+ * @return the deserialize transaction digest that can be applied to the local SQL database
+ * @throws MDBCServiceException
+ */
HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException;
- /**
- * Use this functions to verify ownership, and own new ranges
- * @param ranges the ranges that should be own after calling this function
- * @param partition current information of the ownership in the system
- * @return a partition indicating the status of the own function result
- * @throws MDBCServiceException
- */
- DatabasePartition own(List<Range> ranges, DatabasePartition partition) throws MDBCServiceException;
+ /**
+ * Use this functions to verify ownership, and own new ranges
+ * @param ranges the ranges that should be own after calling this function
+ * @param partition current information of the ownership in the system
+ * @param ownOpId is the id used to describe this ownership operation (it is not used to create the new row, if any is
+ * required
+ * @return an object indicating the status of the own function result
+ * @throws MDBCServiceException
+ */
+ OwnershipReturn own(List<Range> ranges, DatabasePartition partition, UUID ownOpId) throws MDBCServiceException;
- /**
- * This function relinquish ownership, if it is time to do it, it should be used at the end of a commit operation
- * @param partition information of the partition that is currently being owned
- * @throws MDBCServiceException
- */
+ /**
+ * This function relinquish ownership, if it is time to do it, it should be used at the end of a commit operation
+ * @param partition information of the partition that is currently being owned
+ * @throws MDBCServiceException
+ */
void relinquishIfRequired(DatabasePartition partition) throws MDBCServiceException;
+ /**
+ * This function is in charge of owning all the ranges requested and creating a new row that show the ownership of all
+ * those ranges.
+ * @param rangeId new id to be used in the new row
+ * @param ranges ranges to be owned by the end of the function called
+ * @param partition current ownership status
+ * @return
+ * @throws MDBCServiceException
+ */
+ //OwnershipReturn appendRange(String rangeId, List<Range> ranges, DatabasePartition partition) throws MDBCServiceException;
+
+ /**
+ * This functions relinquishes a range
+ * @param ownerId id of the current ownerh
+ * @param rangeId id of the range to be relinquished
+ * @throws MusicLockingException
+ */
+ void relinquish(String ownerId, String rangeId) throws MDBCServiceException;
+
+ /**
+ * This function return all the range indexes that are currently hold by any of the connections in the system
+ * @return list of ids of rows in MRI
+ */
+ List<UUID> getPartitionIndexes() throws MDBCServiceException;
+
/**
- * This functions relinquishes a range
- * @param ownerId id of the current ownerh
- * @param rangeId id of the range to be relinquished
- * @throws MusicLockingException
+ * This function is in charge of applying the transaction digests to the MUSIC tables.
+ * @param digest this contain all the changes that were perfomed in this digest
+ * @throws MDBCServiceException
*/
- void relinquish(String ownerId, String rangeId) throws MDBCServiceException;
+ void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException;
/**
- * This function return all the range indexes that are currently hold by any of the connections in the system
- * @return list of ids of rows in MRI
+ * This function is in charge of deleting old mri rows that are not longer contain
+ * @param oldRowsAndLocks is a map
+ * @throws MDBCServiceException
*/
- List<UUID> getPartitionIndexes() throws MDBCServiceException;
+ void deleteOldMriRows(Map<UUID,String> oldRowsAndLocks) throws MDBCServiceException;
- void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException;
+ List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException;
+
+ OwnershipAndCheckpoint getOwnAndCheck();
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index 400956e..068a64d 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -23,29 +23,16 @@ import java.io.IOException;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.*;
import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang3.tuple.Pair;
import org.onap.music.mdbc.*;
-import org.onap.music.mdbc.tables.MusicTxDigestId;
-import org.onap.music.mdbc.tables.StagingTable;
-import org.onap.music.mdbc.tables.MusicRangeInformationRow;
-import org.onap.music.mdbc.tables.TxCommitProgress;
-import org.onap.music.service.impl.MusicCassaCore;
+import org.onap.music.mdbc.ownership.Dag;
+import org.onap.music.mdbc.ownership.DagNode;
+import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
+import org.onap.music.mdbc.tables.*;
import org.json.JSONObject;
-import org.onap.music.lockingservice.cassandra.CassaLockStore;
import org.onap.music.datastore.PreparedQueryObject;
import org.onap.music.exceptions.MusicLockingException;
import org.onap.music.exceptions.MusicQueryException;
@@ -96,12 +83,16 @@ public class MusicMixin implements MusicInterface {
public static final String KEY_MUSIC_RFACTOR = "music_rfactor";
/** The property name to use to provide the replication factor for Cassandra. */
public static final String KEY_MUSIC_NAMESPACE = "music_namespace";
+ /** The property name to use to provide a timeout to mdbc (ownership) */
+ public static final String KEY_TIMEOUT = "mdbc_timeout";
/** Namespace for the tables in MUSIC (Cassandra) */
public static final String DEFAULT_MUSIC_NAMESPACE = "namespace";
/** The default property value to use for the Cassandra IP address. */
public static final String DEFAULT_MUSIC_ADDRESS = "localhost";
/** The default property value to use for the Cassandra replication factor. */
public static final int DEFAULT_MUSIC_RFACTOR = 1;
+ /** The default property value to use for the MDBC timeout */
+ public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours
/** The default primary string column, if none is provided. */
public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid";
/** Type of the primary key, if none is defined by the user */
@@ -111,25 +102,56 @@ public class MusicMixin implements MusicInterface {
//\TODO Add logic to change the names when required and create the tables when necessary
private String musicTxDigestTableName = "musictxdigest";
private String musicRangeInformationTableName = "musicrangeinformation";
+ private String musicRangeDependencyTableName = "musicrangedependency";
private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicMixin.class);
- private class LockResult{
- private final UUID musicRangeInformationIndex;
- private final String ownerId;
- private final boolean newLock;
- public LockResult(UUID rowId, String ownerId, boolean newLock){
- this.musicRangeInformationIndex = rowId;
- this.ownerId=ownerId;
- this.newLock=newLock;
+
+
+ private class RangeMriRow{
+ private MusicRangeInformationRow currentRow;
+ private List<MusicRangeInformationRow> oldRows;
+ private final Range range;
+ public RangeMriRow(Range range) {
+ this.range = range;
+ oldRows = new ArrayList<>();
+ }
+ Range getRange(){
+ return range;
+ }
+ public MusicRangeInformationRow getCurrentRow(){
+ return currentRow;
+ }
+ public void setCurrentRow(MusicRangeInformationRow row){
+ currentRow=row;
+ }
+ public void addOldRow(MusicRangeInformationRow row){
+ oldRows.add(row);
+ }
+ public List<MusicRangeInformationRow> getOldRows(){
+ return oldRows;
+ }
+ }
+
+ private class LockRequest{
+ private final String table;
+ private final UUID id;
+ private final List<Range> toLockRanges;
+ public LockRequest(String table, UUID id, List<Range> toLockRanges){
+ this.table=table;
+ this.id=id;
+ this.toLockRanges=toLockRanges;
}
- public String getOwnerId(){
- return ownerId;
+ public UUID getId() {
+ return id;
}
- public boolean getNewLock(){
- return newLock;
+ public List<Range> getToLockRanges() {
+ return toLockRanges;
+ }
+
+ public String getTable() {
+ return table;
}
- public UUID getIndex() {return musicRangeInformationIndex;}
}
@@ -161,11 +183,14 @@ public class MusicMixin implements MusicInterface {
protected final String[] allReplicaIds;
private final String musicAddress;
private final int music_rfactor;
+ protected long timeout;
private MusicConnector mCon = null;
private Session musicSession = null;
private boolean keyspace_created = false;
private Map<String, PreparedStatement> ps_cache = new HashMap<>();
private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>());
+ private Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied;
+ private OwnershipAndCheckpoint ownAndCheck;
public MusicMixin() {
//this.logger = null;
@@ -195,6 +220,12 @@ public class MusicMixin implements MusicInterface {
this.music_ns = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE);
logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns);
+ String t = info.getProperty(KEY_TIMEOUT);
+ this.timeout = (t == null) ? DEFAULT_TIMEOUT : Integer.parseInt(t);
+
+ alreadyApplied = new HashMap<>();
+ ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied,timeout);
+
initializeMetricTables();
}
@@ -241,6 +272,7 @@ public class MusicMixin implements MusicInterface {
Row row = rs.one();
return (row == null) ? "UNKNOWN" : row.getUUID("HOST_ID").toString();
}
+
private String getAllHostIds() {
ResultSet results = null;
try {
@@ -265,6 +297,7 @@ public class MusicMixin implements MusicInterface {
public String getMixinName() {
return "cassandra";
}
+
/**
* Do what is needed to close down the MUSIC connection.
*/
@@ -284,6 +317,7 @@ public class MusicMixin implements MusicInterface {
try {
createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number
createMusicRangeInformationTable();
+ createMusicRangeDependencyTable();
}
catch(MDBCServiceException e){
logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC");
@@ -1069,6 +1103,22 @@ public class MusicMixin implements MusicInterface {
return query;
}
+ private PreparedQueryObject createChangeIsLatestToMriQuery(String mriTable, UUID uuid, String table, boolean isLatest){
+ PreparedQueryObject query = new PreparedQueryObject();
+ StringBuilder appendBuilder = new StringBuilder();
+ appendBuilder.append("UPDATE ")
+ .append(music_ns)
+ .append(".")
+ .append(mriTable)
+ .append(" SET islatest =")
+ .append(isLatest)
+ .append(" WHERE rangeid = ")
+ .append(uuid)
+ .append(";");
+ query.appendQueryString(appendBuilder.toString());
+ return query;
+ }
+
protected ReturnType acquireLock(String fullyQualifiedKey, String lockId) throws MDBCServiceException{
ReturnType lockReturn;
//\TODO Handle better failures to acquire locks
@@ -1087,36 +1137,130 @@ public class MusicMixin implements MusicInterface {
return lockReturn;
}
- protected DatabasePartition waitForLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException {
+ private void addRange(Map<UUID,List<Range>> container, UUID index, Range range){
+ if(!container.containsKey(index)){
+ container.put(index,new ArrayList<Range>());
+ }
+ container.get(index).add(range);
+ }
+
+ private void addRows(Map<UUID,List<Range>> container, RangeMriRow newRow, Range range){
+ //First add current row
+ MusicRangeInformationRow currentRow = newRow.getCurrentRow();
+ addRange(container,currentRow.getPartitionIndex(),range);
+ for(MusicRangeInformationRow row : newRow.getOldRows()){
+ addRange(container,row.getPartitionIndex(),range);
+ }
+ }
+
+ private NavigableMap<UUID, List<Range>> getPendingRows(Map<Range, RangeMriRow> rangeRows){
+ NavigableMap<UUID,List<Range>> pendingRows = new TreeMap<>();
+ rangeRows.forEach((key, value) -> {
+ addRows(pendingRows,value,key);
+ });
+ return pendingRows;
+ }
+
+ private List<Range> lockRow(LockRequest request,Map.Entry<UUID, List<Range>> pending,Map<UUID, String> currentLockRef,
+ String fullyQualifiedKey, String lockId, List<Range> pendingToLock,
+ Map<UUID, LockResult> alreadyHeldLocks)
+ throws MDBCServiceException{
+ List<Range> newRanges = new ArrayList<>();
+ String newFullyQualifiedKey = music_ns + "." + request.getTable() + "." + pending.getKey().toString();
+ String newLockId;
+ boolean success;
+ if (currentLockRef.containsKey(pending.getKey())) {
+ newLockId = currentLockRef.get(pending.getKey());
+ success = (MusicCore.whoseTurnIsIt(newFullyQualifiedKey) == newLockId);
+ } else {
+ newLockId = MusicCore.createLockReference(newFullyQualifiedKey);
+ ReturnType newLockReturn = acquireLock(fullyQualifiedKey, lockId);
+ success = newLockReturn.getResult().compareTo(ResultType.SUCCESS) == 0;
+ }
+ if (!success) {
+ pendingToLock.addAll(pending.getValue());
+ currentLockRef.put(pending.getKey(), newLockId);
+ } else {
+ if(alreadyHeldLocks.containsKey(pending.getKey())){
+ throw new MDBCServiceException("Adding key that already exist");
+ }
+ alreadyHeldLocks.put(pending.getKey(),new LockResult(pending.getKey(), newLockId, true,
+ pending.getValue()));
+ newRanges.addAll(pending.getValue());
+ }
+ return newRanges;
+ }
+
+ private boolean isDifferent(NavigableMap<UUID, List<Range>> previous, NavigableMap<UUID, List<Range>> current){
+ return previous.keySet().equals(current.keySet());
+ }
+
+ protected List<Range> waitForLock(LockRequest request, DatabasePartition partition,
+ Map<UUID, LockResult> rowLock) throws MDBCServiceException {
+ List<Range> newRanges = new ArrayList<>();
+ if(partition.getMRIIndex()!=request.getId()){
+ throw new MDBCServiceException("Invalid argument for wait for lock, range id in request and partition should match");
+ }
+ String fullyQualifiedKey= music_ns+"."+ request.getTable()+"."+request.getId();
String lockId = MusicCore.createLockReference(fullyQualifiedKey);
ReturnType lockReturn = acquireLock(fullyQualifiedKey,lockId);
if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) {
//\TODO Improve the exponential backoff
+ List<Range> pendingToLock = request.getToLockRanges();
+ Map<UUID, String> currentLockRef = new HashMap<>();
int n = 1;
int low = 1;
int high = 1000;
Random r = new Random();
- while(MusicCore.whoseTurnIsIt(fullyQualifiedKey)!=lockId){
+ Map<Range, RangeMriRow> rangeRows = findRangeRows(pendingToLock);
+ NavigableMap<UUID, List<Range>> rowsToLock = getPendingRows(rangeRows);
+ NavigableMap<UUID, List<Range>> prevRows = new TreeMap<>();
+ while (!pendingToLock.isEmpty() && isDifferent(prevRows,rowsToLock) ) {
+ pendingToLock.clear();
try {
Thread.sleep(((int) Math.round(Math.pow(2, n)) * 1000)
+ (r.nextInt(high - low) + low));
} catch (InterruptedException e) {
continue;
}
- n++;
- if(n==20){
- throw new MDBCServiceException("Lock was impossible to obtain, waited for 20 exponential backoffs!") ;
+ n++;
+ if (n == 20) {
+ throw new MDBCServiceException("Lock was impossible to obtain, waited for 20 exponential backoffs!");
+ }
+ //\TODO do this in parallel
+ //\TODO there is a race condition here, from the time we get the find range rows, to the time we lock the row,
+ //\TODO this race condition can only be solved if require to obtain lock to all related rows in MRI
+ //\TODO before fully owning the range
+ //\TODO The rows need to be lock in increasing order of timestamp
+ //there could be a new row created
+ // Note: This loop needs to be perfomed in sorted order of timebased UUID
+ for (Map.Entry<UUID, List<Range>> pending : rowsToLock.entrySet()) {
+ List<Range> rs = lockRow(request, pending, currentLockRef, fullyQualifiedKey, lockId, pendingToLock, rowLock);
+ newRanges.addAll(rs);
}
+ if (n++ == 20) {
+ throw new MDBCServiceException(
+ "Lock was impossible to obtain, waited for 20 exponential backoffs!");
+ }
+ rangeRows = findRangeRows(pendingToLock);
+ prevRows = rowsToLock;
+ rowsToLock = getPendingRows(rangeRows);
}
}
- partition.setLockId(lockId);
- return partition;
+ else {
+ partition.setLockId(lockId);
+ rowLock.put(partition.getMRIIndex(),new LockResult(partition.getMRIIndex(), lockId, true, partition.getSnapshot()));
+ }
+ return newRanges;
}
protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException {
UUID mriIndex = partition.getMRIIndex();
String lockId;
lockId = MusicCore.createLockReference(fullyQualifiedKey);
+ if(lockId==null) {
+ throw new MDBCServiceException("lock reference is null");
+ }
ReturnType lockReturn = acquireLock(fullyQualifiedKey,lockId);
//\TODO this is wrong, we should have a better way to obtain a lock forcefully, clean the queue and obtain the lock
if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) {
@@ -1126,7 +1270,16 @@ public class MusicMixin implements MusicInterface {
return lockId;
}
-
+ protected void changeIsLatestToMRI(MusicRangeInformationRow row, boolean isLatest, LockResult lock) throws MDBCServiceException{
+ PreparedQueryObject appendQuery = createChangeIsLatestToMriQuery(musicRangeInformationTableName, row.getPartitionIndex(),
+ musicTxDigestTableName, isLatest);
+ ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, row.getPartitionIndex().toString(),
+ appendQuery, lock.getOwnerId(), null);
+ if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
+ logger.error(EELFLoggerDelegate.errorLogger, "Error when executing change isLatest operation with return type: "+returnType.getMessage());
+ throw new MDBCServiceException("Error when executing change isLatest operation with return type: "+returnType.getMessage());
+ }
+ }
protected void appendIndexToMri(String lockId, UUID commitId, UUID MriIndex) throws MDBCServiceException{
PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MriIndex, musicTxDigestTableName, commitId);
@@ -1137,23 +1290,14 @@ public class MusicMixin implements MusicInterface {
}
}
- /**
- * Writes the transaction information to metric's txDigest and musicRangeInformation table
- * This officially commits the transaction globally
- */
- @Override
- public void commitLog(DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{
+ public void commitLog(DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest,
+ String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{
UUID mriIndex = partition.getMRIIndex();
- if(mriIndex==null) {
- partition = own(partition.getSnapshot(),partition);
- mriIndex = partition.getMRIIndex();
- System.err.println("MRIINDEX: " + mriIndex);
- }
String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex;
//0. See if reference to lock was already created
String lockId = partition.getLockId();
- if(lockId == null || lockId.isEmpty()) {
- waitForLock(fullyQualifiedMriKey,partition);
+ if(mriIndex==null || lockId == null || lockId.isEmpty()) {
+ own(partition.getSnapshot(),partition, MDBCUtils.generateTimebasedUniqueKey());
}
UUID commitId;
@@ -1174,7 +1318,7 @@ public class MusicMixin implements MusicInterface {
} catch (IOException e) {
throw new MDBCServiceException("Failed to serialized transaction digest with error "+e.toString(), e);
}
- MusicTxDigestId digestId = new MusicTxDigestId(commitId);
+ MusicTxDigestId digestId = new MusicTxDigestId(commitId,-1);
addTxDigest(digestId, serializedTransactionDigest);
//2. Save RRT index to RQ
if(progressKeeper!= null) {
@@ -1241,7 +1385,8 @@ public class MusicMixin implements MusicInterface {
return partitions;
}
- List<Range> getRanges(Row newRow){
+
+ public List<Range> getRanges(Row newRow){
List<Range> partitions = new ArrayList<>();
Set<String> tables = newRow.getSet("keys",String.class);
for (String table:tables){
@@ -1250,22 +1395,36 @@ public class MusicMixin implements MusicInterface {
return partitions;
}
- MusicRangeInformationRow getMRIRowFromCassandraRow(Row newRow){
+ public MusicRangeInformationRow getMRIRowFromCassandraRow(Row newRow){
UUID partitionIndex = newRow.getUUID("rangeid");
List<TupleValue> log = newRow.getList("txredolog",TupleValue.class);
List<MusicTxDigestId> digestIds = new ArrayList<>();
+ int index=0;
for(TupleValue t: log){
//final String tableName = t.getString(0);
- final UUID index = t.getUUID(1);
- digestIds.add(new MusicTxDigestId(index));
+ final UUID id = t.getUUID(1);
+ digestIds.add(new MusicTxDigestId(id,index++));
}
List<Range> partitions = new ArrayList<>();
Set<String> tables = newRow.getSet("keys",String.class);
for (String table:tables){
partitions.add(new Range(table));
}
- return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex, ""),
- digestIds, newRow.getString("ownerid"),newRow.getString("metricprocessid"));
+ return new MusicRangeInformationRow(partitionIndex, new DatabasePartition(partitions, partitionIndex, ""),
+ digestIds, newRow.getString("ownerid"),newRow.getString("metricprocessid"),
+ newRow.getBool("islatest"));
+ }
+
+ public RangeDependency getRangeDependenciesFromCassandraRow(Row newRow){
+ if(newRow == null) return null;
+ String base = newRow.getString("range");
+ Range baseRange = new Range(base);
+ Set<String> dependencies = newRow.getSet("dependencies", String.class);
+ List<Range> rangeDependencies = new ArrayList<>();
+ for(String dependency: dependencies){
+ rangeDependencies.add(new Range(dependency));
+ }
+ return new RangeDependency(baseRange,rangeDependencies);
}
@Override
@@ -1288,6 +1447,21 @@ public class MusicMixin implements MusicInterface {
return getMRIRowFromCassandraRow(newRow);
}
+ @Override
+ public RangeDependency getMusicRangeDependency(Range baseRange) throws MDBCServiceException{
+ String cql = String.format("SELECT * FROM %s.%s WHERE range = ?;", music_ns, musicRangeDependencyTableName);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ pQueryObject.addValue(baseRange.table);
+ Row newRow;
+ try {
+ newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,baseRange.table,null);
+ } catch (MDBCServiceException e) {
+ logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName);
+ throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e);
+ }
+ return getRangeDependenciesFromCassandraRow(newRow);
+ }
/**
* This function creates the TransactionInformation table. It contain information related
@@ -1307,6 +1481,7 @@ public class MusicMixin implements MusicInterface {
fields.append("rangeid uuid, ");
fields.append("keys set<text>, ");
fields.append("ownerid text, ");
+ fields.append("islatest boolean, ");
fields.append("metricprocessid text, ");
//TODO: Frozen is only needed for old versions of cassandra, please update correspondingly
fields.append("txredolog list<frozen<tuple<text,uuid>>> ");
@@ -1330,10 +1505,36 @@ public class MusicMixin implements MusicInterface {
if(lockId == null || lockId.isEmpty()){
throw new MDBCServiceException("Error initializing music range information, error creating a lock for a new row") ;
}
- createEmptyMriRow(newPartition.getMRIIndex(),info.getMetricProcessId(),lockId,newPartition.getSnapshot());
+ createEmptyMriRow(newPartition.getMRIIndex(),info.getMetricProcessId(),lockId,
+ newPartition.getSnapshot(),info.getIsLatest());
+ info.setOwnerId(lockId);
return newPartition;
}
+ @Override
+ public void createMusicRangeDependency(RangeDependency rangeAndDependencies) throws MDBCServiceException {
+ StringBuilder insert = new StringBuilder("INSERT INTO ")
+ .append(this.music_ns)
+ .append('.')
+ .append(this.musicRangeDependencyTableName)
+ .append(" (range,dependencies) VALUES ")
+ .append("(")
+ .append(rangeAndDependencies.getRange().table)
+ .append(",{");
+ boolean first=true;
+ for(Range r: rangeAndDependencies.dependentRanges()){
+ if(first){ first=false; }
+ else {
+ insert.append(',');
+ }
+ insert.append("'").append(r.toString()).append("'");
+ }
+ insert.append("};");
+ PreparedQueryObject query = new PreparedQueryObject();
+ query.appendQueryString(insert.toString());
+ MusicCore.eventualPut(query);
+ }
+
private UUID createEmptyMriRow(List<Range> rangesCopy) {
//TODO: THis should call one of the other createMRIRows
@@ -1373,8 +1574,9 @@ public class MusicMixin implements MusicInterface {
*/
private UUID createEmptyMriRow(String processId, String lockId, List<Range> ranges)
throws MDBCServiceException {
- UUID id = MDBCUtils.generateUniqueKey();
- return createEmptyMriRow(id,processId,lockId,ranges);
+ UUID id = MDBCUtils.generateTimebasedUniqueKey();
+ return createEmptyMriRow(id,processId,lockId,ranges,true);
+
}
/**
@@ -1382,14 +1584,14 @@ public class MusicMixin implements MusicInterface {
* @param processId id of the process that is going to own initially this.
* @return uuid associated to the new row
*/
- private UUID createEmptyMriRow(UUID id, String processId, String lockId, List<Range> ranges)
+ private UUID createEmptyMriRow(UUID id, String processId, String lockId, List<Range> ranges, boolean isLatest)
throws MDBCServiceException{
logger.info("Creating MRI " + id + " for ranges " + ranges);
StringBuilder insert = new StringBuilder("INSERT INTO ")
.append(this.music_ns)
.append('.')
.append(this.musicRangeInformationTableName)
- .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ")
+ .append(" (rangeid,keys,ownerid,islatest,metricprocessid,txredolog) VALUES ")
.append("(")
.append(id)
.append(",{");
@@ -1403,7 +1605,9 @@ public class MusicMixin implements MusicInterface {
}
insert.append("},'")
.append((lockId==null)?"":lockId)
- .append("','")
+ .append("',")
+ .append(isLatest)
+ .append(",'")
.append(processId)
.append("',[]);");
PreparedQueryObject query = new PreparedQueryObject();
@@ -1459,6 +1663,21 @@ public class MusicMixin implements MusicInterface {
}
}
+ private void createMusicRangeDependencyTable() throws MDBCServiceException {
+ String tableName = this.musicRangeDependencyTableName;
+ String priKey = "range";
+ StringBuilder fields = new StringBuilder();
+ fields.append("range text, ");
+ fields.append("dependencies set<text> ");//notice lack of ','
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName,
+ fields, priKey);
+ try {
+ executeMusicWriteQuery(this.music_ns,tableName,cql);
+ } catch (MDBCServiceException e) {
+ logger.error("Initialization error: Failure to create redo records table");
+ throw(e);
+ }
+ }
/**
* Writes the transaction history to the txDigest
@@ -1513,132 +1732,418 @@ public class MusicMixin implements MusicInterface {
return changes;
}
+ ResultSet getAllMriCassandraRows() throws MDBCServiceException {
+ StringBuilder cqlOperation = new StringBuilder();
+ cqlOperation.append("SELECT * FROM ")
+ .append(music_ns)
+ .append(".")
+ .append(musicRangeInformationTableName);
+ return executeMusicRead(cqlOperation.toString());
+ }
+
+ @Override
+ public List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException{
+ List<MusicRangeInformationRow> rows = new ArrayList<>();
+ final ResultSet mriCassandraRows = getAllMriCassandraRows();
+ while (!mriCassandraRows.isExhausted()) {
+ Row musicRow = mriCassandraRows.one();
+ final MusicRangeInformationRow mriRow = getMRIRowFromCassandraRow(musicRow);
+ rows.add(mriRow);
+ }
+ return rows;
+ }
+
+ private RangeMriRow findRangeRow(Range range) throws MDBCServiceException {
+ RangeMriRow row = null;
+ final ResultSet musicResults = getAllMriCassandraRows();
+ while (!musicResults.isExhausted()) {
+ Row musicRow = musicResults.one();
+ final MusicRangeInformationRow mriRow = getMRIRowFromCassandraRow(musicRow);
+ final List<Range> musicRanges = getRanges(musicRow);
+ //\TODO optimize this for loop to avoid redudant access
+ for(Range retrievedRange : musicRanges) {
+ if (retrievedRange.overlaps(range)) {
+ if(row==null){
+ row = new RangeMriRow(range);
+ row.setCurrentRow(mriRow);
+ }
+ else if(row.getCurrentRow().getTimestamp() < mriRow.getTimestamp()){
+ row.addOldRow(row.getCurrentRow());
+ row.setCurrentRow(mriRow);
+ }
+ }
+ }
+ }
+ if(row==null){
+ logger.error("Row in MRI doesn't exist for Range "+range.toString());
+ throw new MDBCServiceException("Row in MRI doesn't exist for Range "+range.toString());
+ }
+ return row;
+ }
+
/**
* This function is used to find all the related uuids associated with the required ranges
* @param ranges ranges to be find
- * @return a map that associated each MRI row to the corresponding ranges
+ * @return a map that associates each MRI row to the corresponding ranges
*/
- private Map<UUID,List<Range>> findRangeRows(List<Range> ranges) throws MDBCServiceException {
- /* \TODO this function needs to be improved, by creating an additional index, or at least keeping a local cache
+ private Map<Range,RangeMriRow> findRangeRows(List<Range> ranges) throws MDBCServiceException {
+ /* \TODO this function needs to be improved, by creating an additional index, or at least keeping a local cache
Additionally, we should at least used pagination and the token function, to avoid retrieving the whole table at
once, this can become problematic if we have too many connections in the overall METRIC system */
- Map<UUID,List<Range>> result = new HashMap<>();
- List<Range> rangesCopy = new LinkedList<>(ranges);
+ Map<Range,RangeMriRow> result = new HashMap<>();
+ for(Range r:ranges){
+ result.put(r,null);
+ }
int counter=0;
- StringBuilder cqlOperation = new StringBuilder();
- cqlOperation.append("SELECT * FROM ")
- .append(music_ns)
- .append(".")
- .append(musicRangeInformationTableName);
- ResultSet musicResults = executeMusicRead(cqlOperation.toString());
+ final ResultSet musicResults = getAllMriCassandraRows();
while (!musicResults.isExhausted()) {
Row musicRow = musicResults.one();
- UUID mriIndex = musicRow.getUUID("rangeid");
+ final MusicRangeInformationRow mriRow = getMRIRowFromCassandraRow(musicRow);
final List<Range> musicRanges = getRanges(musicRow);
+ //\TODO optimize this for loop to avoid redudant access
for(Range retrievedRange : musicRanges) {
- for (Iterator<Range> iterator = rangesCopy.iterator(); iterator.hasNext(); ) {
- Range range = iterator.next();
+ for(Map.Entry<Range,RangeMriRow> e : result.entrySet()) {
+ Range range = e.getKey();
if (retrievedRange.overlaps(range)) {
- // Remove the current element from the iterator and the list.
- if(!result.containsKey(mriIndex)){
- result.put(mriIndex,new ArrayList<>());
+ RangeMriRow r = e.getValue();
+ if(r==null){
+ counter++;
+ RangeMriRow newMriRow = new RangeMriRow(range);
+ newMriRow.setCurrentRow(mriRow);
+ result.replace(range,newMriRow);
+ }
+ else if(r.getCurrentRow().getTimestamp() < mriRow.getTimestamp()){
+ r.addOldRow(r.getCurrentRow());
+ r.setCurrentRow(mriRow);
+ }
+ else{
+ r.addOldRow(mriRow);
}
- List<Range> foundRanges = result.get(mriIndex);
- foundRanges.add(range);
- iterator.remove();
}
}
}
}
- if(!rangesCopy.isEmpty()){
- StringBuilder tables = new StringBuilder();
- for(Range range: rangesCopy){
- tables.append(range.toString()).append(',');
- }
- logger.warn("Row in MRI doesn't exist for tables [ "+tables.toString()+"]");
- createEmptyMriRow(rangesCopy);
+
+ if(ranges.size() != counter){
+ logger.error("Row in MRI doesn't exist for "+Integer.toString(counter)+" ranges");
+ throw new MDBCServiceException("MRI row doesn't exist for "+Integer.toString(counter)+" ranges");
}
return result;
}
- private DatabasePartition lockRow(UUID rowId, List<Range> ranges, DatabasePartition partition)
+ private List<Range> lockRow(LockRequest request, DatabasePartition partition,Map<UUID, LockResult> rowLock)
throws MDBCServiceException {
- List<LockResult> result = new ArrayList<>();
- if(partition.getMRIIndex()==rowId){
- return partition;
+ if(partition.getMRIIndex()==request.id && partition.isLocked()){
+ return new ArrayList<>();
}
//\TODO: this function needs to be improved, to track possible changes in the owner of a set of ranges
- String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+rowId.toString();
+ String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+request.id.toString();
//return List<Range> knownRanges, UUID mriIndex, String lockId
- DatabasePartition newPartition = new DatabasePartition(ranges,rowId,null);
- return waitForLock(fullyQualifiedMriKey,newPartition);
+ DatabasePartition newPartition = new DatabasePartition(request.toLockRanges,request.id,null);
+ return waitForLock(request,newPartition,rowLock);
+ }
+
+ private void unlockKeyInMusic(String table, String key, String lockref) {
+ String fullyQualifiedKey= music_ns+"."+ table+"."+lockref;
+ MusicCore.destroyLockRef(fullyQualifiedKey,lockref);
+ }
+
+ private void releaseLocks(Map<UUID,LockResult> newLocks) throws MDBCServiceException{
+ for(Map.Entry<UUID,LockResult> lock : newLocks.entrySet()) {
+ unlockKeyInMusic(musicRangeInformationTableName, lock.getKey().toString(), lock.getValue().getOwnerId());
+ }
+ }
+
+ private void releaseLocks(List<MusicRangeInformationRow> changed, Map<UUID,LockResult> newLocks) throws MDBCServiceException{
+ for(MusicRangeInformationRow r : changed) {
+ LockResult lock = newLocks.get(r.getPartitionIndex());
+ unlockKeyInMusic(musicRangeInformationTableName, r.getPartitionIndex().toString(),
+ lock.getOwnerId());
+ newLocks.remove(r.getPartitionIndex());
+ }
+ }
+
+ private void releaseAllLocksExcept(UUID finalRow, Map<UUID,LockResult> newLocks) throws MDBCServiceException {
+ Set<UUID> toErase = new HashSet<>();
+ for(Map.Entry<UUID,LockResult> lock : newLocks.entrySet()) {
+ UUID id = lock.getKey();
+ if(id!=finalRow){
+ unlockKeyInMusic(musicRangeInformationTableName, id.toString(), lock.getValue().getOwnerId());
+ toErase.add(id);
+ }
+ }
+ for(UUID id:toErase){
+ newLocks.remove(id);
+ }
+ }
+
+ private List<Range> getExtendedRanges(List<Range> range) throws MDBCServiceException{
+ Set<Range> extendedRange = new HashSet<>();
+ for(Range r: range){
+ extendedRange.add(r);
+ RangeDependency dependencies = getMusicRangeDependency(r);
+ if(dependencies!=null){
+ extendedRange.addAll(dependencies.dependentRanges());
+ }
+ }
+ return new ArrayList<>(extendedRange);
+ }
+
+ private LockResult waitForLock(LockRequest request) throws MDBCServiceException{
+ String fullyQualifiedKey= music_ns+"."+ request.getTable()+"."+request.getId();
+ String lockId = MusicCore.createLockReference(fullyQualifiedKey);
+ ReturnType lockReturn = acquireLock(fullyQualifiedKey,lockId);
+ if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) {
+ //\TODO Improve the exponential backoff
+ int n = 1;
+ int low = 1;
+ int high = 1000;
+ Random r = new Random();
+ while(MusicCore.whoseTurnIsIt(fullyQualifiedKey) != lockId){
+ try {
+ Thread.sleep(((int) Math.round(Math.pow(2, n)) * 1000)
+ + (r.nextInt(high - low) + low));
+ } catch (InterruptedException e) {
+ continue;
+ }
+ n++;
+ if (n == 20) {
+ throw new MDBCServiceException("Lock was impossible to obtain, waited for 20 exponential backoffs!");
+ }
+ }
+ }
+ return new LockResult(request.id,lockId,true,null);
+ }
+
+ private void recoverFromFailureAndUpdateDag(Dag latestDag,List<MusicRangeInformationRow> rows,List<Range> ranges,
+ Map<UUID,LockResult> locks) throws MDBCServiceException{
+ Pair<List<Range>,Set<DagNode>> rangesAndDependents = latestDag.getIncompleteRangesAndDependents();
+ if(rangesAndDependents.getKey()==null || rangesAndDependents.getKey().size()==0 ||
+ rangesAndDependents.getValue()==null || rangesAndDependents.getValue().size() == 0){
+ return;
+ }
+ MusicRangeInformationRow r = createAndAssignLock(rangesAndDependents.getKey());
+ locks.put(r.getPartitionIndex(),new LockResult(r.getPartitionIndex(),r.getOwnerId(),true,rangesAndDependents.getKey()));
+ latestDag.addNewNode(r,new ArrayList<>(rangesAndDependents.getValue()));
+ }
+
+ private List<MusicRangeInformationRow> setReadOnlyAnyDoubleRow(Dag latestDag,List<MusicRangeInformationRow> rows, Map<UUID,LockResult> locks)
+ throws MDBCServiceException{
+ List<MusicRangeInformationRow> returnInfo = new ArrayList<>();
+ List<DagNode> toDisable = latestDag.getOldestDoubles();
+ for(DagNode node : toDisable){
+ changeIsLatestToMRI(node.getRow(),false,locks.get(node.getId()));
+ latestDag.setIsLatest(node.getId(),false);
+ returnInfo.add(node.getRow());
+ }
+ return returnInfo;
+ }
+
+ private MusicRangeInformationRow createAndAssignLock(List<Range> ranges) throws MDBCServiceException {
+ UUID newUUID = MDBCUtils.generateTimebasedUniqueKey();
+ DatabasePartition newPartition = new DatabasePartition(ranges,newUUID,null);
+ MusicRangeInformationRow newRow = new MusicRangeInformationRow(newUUID,newPartition,new ArrayList<>(),
+ null,getMyHostId(),true);
+ createMusicRangeInformation(newRow);
+ return newRow;
+ }
+
+ private OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges,
+ Map<UUID,LockResult> locks, UUID ownershipId) throws MDBCServiceException{
+ recoverFromFailureAndUpdateDag(extendedDag,latestRows,ranges,locks);
+ List<MusicRangeInformationRow> changed = setReadOnlyAnyDoubleRow(extendedDag, latestRows,locks);
+ releaseLocks(changed, locks);
+ MusicRangeInformationRow row = createAndAssignLock(ranges);
+ latestRows.add(row);
+ locks.put(row.getPartitionIndex(),new LockResult(row.getPartitionIndex(),row.getOwnerId(),true,ranges));
+ extendedDag.addNewNodeWithSearch(row,ranges);
+ Pair<List<Range>, Set<DagNode>> missing = extendedDag.getIncompleteRangesAndDependents();
+ if(missing.getKey().size()!=0 && missing.getValue().size()!=0) {
+ MusicRangeInformationRow newRow = createAndAssignLock(missing.getKey());
+ latestRows.add(newRow);
+ locks.put(newRow.getPartitionIndex(), new LockResult(newRow.getPartitionIndex(), newRow.getOwnerId(), true,
+ missing.getKey()));
+ extendedDag.addNewNode(newRow, new ArrayList<>(missing.getValue()));
+ }
+ changed = setReadOnlyAnyDoubleRow(extendedDag, latestRows,locks);
+ releaseLocks(changed,locks);
+ releaseAllLocksExcept(row.getPartitionIndex(),locks);
+ LockResult ownRow = locks.get(row.getPartitionIndex());
+ return new OwnershipReturn(ownershipId, ownRow.getOwnerId(), ownRow.getIndex(),ranges,extendedDag);
+ }
+
+ // \TODO merge with dag code
+ private Map<Range,Set<DagNode>> getIsLatestPerRange(Dag dag, List<MusicRangeInformationRow> rows) throws MDBCServiceException {
+ Map<Range,Set<DagNode>> rowsPerLatestRange = new HashMap<>();
+ for(MusicRangeInformationRow row : rows){
+ DatabasePartition dbPartition = row.getDBPartition();
+ if (row.getIsLatest()) {
+ for(Range range : dbPartition.getSnapshot()){
+ if(!rowsPerLatestRange.containsKey(range)){
+ rowsPerLatestRange.put(range,new HashSet<>());
+ }
+ DagNode node = dag.getNode(row.getPartitionIndex());
+ if(node!=null) {
+ rowsPerLatestRange.get(range).add(node);
+ }
+ else{
+ rowsPerLatestRange.get(range).add(new DagNode(row));
+ }
+ }
+ }
+ }
+ return rowsPerLatestRange;
}
@Override
- public DatabasePartition own(List<Range> ranges, DatabasePartition partition) throws MDBCServiceException {
- if (partition.owns(ranges)) {
- return partition;
- }
- return appendRange(ranges,partition);
+ public OwnershipReturn own(List<Range> ranges, DatabasePartition partition, UUID opId) throws MDBCServiceException {
+ Map<UUID,LockResult> newLocks = new HashMap<>();
+ //Init timeout clock
+ ownAndCheck.startOwnershipTimeoutClock(opId);
+ //Find
+ List<Range> extendedRanges = getExtendedRanges(ranges);
+ List<MusicRangeInformationRow> allMriRows = getAllMriRows();
+ List<MusicRangeInformationRow> rows = ownAndCheck.getRows(allMriRows,extendedRanges, false);
+ Dag dag = Dag.getDag(rows,extendedRanges);
+ Dag prev = new Dag();
+ while( (dag.isDifferent(prev) || !prev.isOwned() ) &&
+ !ownAndCheck.timeout(opId)
+ ){
+ while(dag.hasNextToOwn()){
+ DagNode node = dag.nextToOwn();
+ MusicRangeInformationRow row = node.getRow();
+ UUID uuid = row.getPartitionIndex();
+ if(partition.isLocked()&&partition.getMRIIndex()==uuid||
+ newLocks.containsKey(uuid) ||
+ !row.getIsLatest()){
+ dag.setOwn(node);
+ }
+ else{
+ LockResult lockResult = null;
+ boolean owned = false;
+ while(!owned && !ownAndCheck.timeout(opId)){
+ try {
+ LockRequest request = new LockRequest(musicRangeInformationTableName,uuid,
+ new ArrayList(node.getRangeSet()));
+ lockResult = waitForLock(request);
+ owned = true;
+ }
+ catch (MDBCServiceException e){
+ logger.warn("Locking failed, retrying",e);
+ }
+ }
+ if(owned){
+ dag.setOwn(node);
+ newLocks.put(uuid,lockResult);
+ }
+ else{
+ break;
+ }
+ }
+ }
+ prev=dag;
+ //TODO instead of comparing dags, compare rows
+ allMriRows = getAllMriRows();
+ rows = ownAndCheck.getRows(allMriRows,extendedRanges,false);
+ dag = Dag.getDag(rows,extendedRanges);
+ }
+ if(!prev.isOwned() || dag.isDifferent(prev)){
+ releaseLocks(newLocks);
+ ownAndCheck.stopOwnershipTimeoutClock(opId);
+ logger.error("Error when owning a range: Timeout");
+ throw new MDBCServiceException("Ownership timeout");
+ }
+ Set<Range> allRanges = prev.getAllRanges();
+ List<MusicRangeInformationRow> isLatestRows = ownAndCheck.getRows(allMriRows, new ArrayList<>(allRanges), true);
+ prev.setRowsPerLatestRange(getIsLatestPerRange(dag,isLatestRows));
+ return mergeLatestRows(prev,rows,ranges,newLocks,opId);
}
/**
- * Merge otherpartitions info into the partition
- * @param newId
- * @param otherPartitionsk
- * @param partition
- * @return list of old UUIDs merged
- * @throws MDBCServiceException
+ * This function is used to check if we need to create a new row in MRI, beacause one of the new ranges is not contained
+ * @param ranges ranges that should be contained in the partition
+ * @param partition currently own partition
+ * @return
*/
- private DatabasePartition mergeMriRows(UUID newId, List<DatabasePartition> otherPartitions, DatabasePartition partition)
+ public boolean isAppendRequired(List<Range> ranges, DatabasePartition partition){
+ for(Range r: ranges){
+ if(!partition.isContained(r)){
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private Map<UUID,String> mergeMriRows(String newId, Map<UUID,LockResult> lock, DatabasePartition partition)
throws MDBCServiceException {
- List<UUID> oldIds = new ArrayList<>();
+ Map<UUID,String> oldIds = new HashMap<>();
List<Range> newRanges = new ArrayList<>();
- for (DatabasePartition dbPart : otherPartitions) {
- oldIds.add(dbPart.getMRIIndex());
- newRanges.addAll(dbPart.getSnapshot());
- }
- DatabasePartition newPartition = new DatabasePartition(newRanges,newId,null);
+ for (Map.Entry<UUID,LockResult> entry : lock.entrySet()) {
+ oldIds.put(entry.getKey(),entry.getValue().getOwnerId());
+ //\TODO check if we need to do a locked get? Is that even required?
+ final MusicRangeInformationRow mriRow = getMusicRangeInformation(entry.getKey());
+ final DatabasePartition dbPartition = mriRow.getDBPartition();
+ newRanges.addAll(dbPartition.getSnapshot());
+ }
+ DatabasePartition newPartition = new DatabasePartition(newRanges,UUID.fromString(newId),null);
String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+newId;
- newPartition = waitForLock(fullyQualifiedMriKey, newPartition);
+ UUID newUUID = UUID.fromString(newId);
+ LockRequest newRequest = new LockRequest(musicRangeInformationTableName,newUUID,newRanges);
+ waitForLock(newRequest, newPartition,lock);
+ if(!lock.containsKey(newUUID)||!lock.get(newUUID).isNewLock()){
+ logger.error("When merging rows, lock returned an invalid error");
+ throw new MDBCServiceException("When merging MRI rows, lock returned an invalid error");
+ }
+ final LockResult lockResult = lock.get(newUUID);
partition.updateDatabasePartition(newPartition);
- createEmptyMriRow(partition.getMRIIndex(),myId,partition.getLockId(),partition.getSnapshot());
- return partition;
+ createEmptyMriRow(partition.getMRIIndex(),myId,lockResult.getOwnerId(),partition.getSnapshot(),true);
+ return oldIds;
}
- /**
- * This function is in charge of owning all the ranges requested and creating a new row that show the ownership of all
- * those ranges.
- * @param rangeId new id to be used in the new row
- * @param ranges ranges to be owned by the end of the function called
- * @param partition current ownership status
- * @return
- * @throws MDBCServiceException
- */
- private DatabasePartition appendRange(List<Range> ranges, DatabasePartition partition)
- throws MDBCServiceException {
- UUID newMRIId = generateUniqueKey();
- Map<UUID,List<Range>> rows = findRangeRows(ranges);
- List<DatabasePartition> rowLocks=new ArrayList<>();
+ private void obtainAllLocks(NavigableMap<UUID, List<Range>> rowsToLock,DatabasePartition partition,
+ List<Range> newRanges,Map<UUID, LockResult> rowLock) throws MDBCServiceException {
//\TODO: perform this operations in parallel
- for(Map.Entry<UUID,List<Range>> row : rows.entrySet()){
- DatabasePartition dbPartition;
+ for(Map.Entry<UUID,List<Range>> row : rowsToLock.entrySet()){
+ List<Range> additionalRanges;
try {
- dbPartition = lockRow(row.getKey(),row.getValue(), partition);
+ LockRequest newRequest = new LockRequest(musicRangeInformationTableName,row.getKey(),row.getValue());
+ additionalRanges =lockRow(newRequest, partition, rowLock);
} catch (MDBCServiceException e) {
//TODO: Make a decision if retry or just fail?
- logger.error("Error locking row");
+ logger.error("Error locking row",e);
throw e;
}
- rowLocks.add(dbPartition);
+ newRanges.addAll(additionalRanges);
}
+ }
+
+/* @Override
+ public OwnershipReturn appendRange(String rangeId, List<Range> ranges, DatabasePartition partition)
+ throws MDBCServiceException {
+ if(!isAppendRequired(ranges,partition)){
+ return new OwnershipReturn(partition.getLockId(),UUID.fromString(rangeId),null,null);
+ }
+ Map<Range, RangeMriRow> rows = findRangeRows(ranges);
+ final NavigableMap<UUID, List<Range>> rowsToLock = getPendingRows(rows);
+ HashMap<UUID, LockResult> rowLock = new HashMap<>();
+ List<Range> newRanges = new ArrayList<>();
+
+ obtainAllLocks(rowsToLock,partition,newRanges,rowLock);
+
String lockId;
- List<UUID> oldIds = null;
- if (rowLocks.size()==1) {
- return rowLocks.get(0);
+ Map<UUID,String> oldIds = null;
+ if(rowLock.size()!=1){
+ oldIds = mergeMriRows(rangeId, rowLock, partition);
+ lockId = partition.getLockId();
}
- return mergeMriRows(newMRIId, rowLocks, partition);
- }
+ else{
+ List<LockResult> list = new ArrayList<>(rowLock.values());
+ LockResult lockResult = list.get(0);
+ lockId = lockResult.getOwnerId();
+ }
+
+ return new OwnershipReturn(lockId,UUID.fromString(rangeId),oldIds,newRanges);
+ }*/
@Override
public void relinquish(String ownerId, String rangeId) throws MDBCServiceException{
@@ -1663,19 +2168,13 @@ public class MusicMixin implements MusicInterface {
if(!canTryRelinquishing() || !partition.isLocked()){
return;
}
- CassaLockStore lsHandle;
- try {
- lsHandle = MusicCassaCore.getLockingServiceHandle();
- } catch (MusicLockingException e) {
- logger.error("Error obtaining the locking service handle when checking if relinquish was required");
- throw new MDBCServiceException("Error obtaining locking service"+e.getMessage(), e);
- }
long lockQueueSize;
try {
- lockQueueSize = lsHandle.getLockQueueSize(music_ns, this.musicRangeInformationTableName, partition.getMRIIndex().toString());
- } catch (MusicServiceException|MusicQueryException e) {
+ String fullyQualifiedKey= music_ns+"."+ this.musicRangeInformationTableName+"."+partition.getMRIIndex().toString();
+ lockQueueSize = MusicCore.getLockQueueSize(fullyQualifiedKey);
+ } catch (MusicServiceException|MusicQueryException|MusicLockingException e) {
logger.error("Error obtaining the lock queue size");
- throw new MDBCServiceException("Error obtaining lock queue size: "+e.getMessage(), e);
+ throw new MDBCServiceException("Error obtaining lock queue size: " + e.getMessage(), e);
}
if(lockQueueSize> 1){
//If there is any other node waiting, we just relinquish ownership
@@ -1713,15 +2212,24 @@ public class MusicMixin implements MusicInterface {
String lock)
throws MDBCServiceException{
ResultSet result;
- try {
- result = MusicCore.criticalGet(keyspace,table,primaryKey,cqlObject,lock);
- } catch(MusicServiceException e){
- //\TODO: handle better, at least transform into an MDBCServiceException
- e.printStackTrace();
- throw new MDBCServiceException("Error executing critical get", e);
+ if(lock != null && !lock.isEmpty()) {
+ try {
+ result = MusicCore.criticalGet(keyspace, table, primaryKey, cqlObject, lock);
+ } catch (MusicServiceException e) {
+ e.printStackTrace();
+ throw new MDBCServiceException("Error executing critical get", e);
+ }
+ }
+ else{
+ try {
+ result = MusicCore.atomicGet(keyspace,table,primaryKey,cqlObject);
+ } catch (MusicServiceException|MusicLockingException|MusicQueryException e) {
+ e.printStackTrace();
+ throw new MDBCServiceException("Error executing atomic get", e);
+ }
}
if(result.isExhausted()){
- throw new MDBCServiceException("There is not a row that matches the id "+primaryKey);
+ return null;
}
return result.one();
}
@@ -1729,7 +2237,6 @@ public class MusicMixin implements MusicInterface {
private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject)
throws MDBCServiceException{
ResultSet result = MusicCore.quorumGet(cqlObject);
- //\TODO: handle better, at least transform into an MDBCServiceException
if(result.isExhausted()){
throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]");
}
@@ -1762,9 +2269,37 @@ public class MusicMixin implements MusicInterface {
}
}
+ private void executeMusicLockedDelete(String namespace, String tableName, String primaryKeyValue, String lockId
+ ) throws MDBCServiceException{
+ StringBuilder delete = new StringBuilder("DELETE FROM ")
+ .append(namespace)
+ .append('.')
+ .append(tableName)
+ .append(" WHERE rangeid= ")
+ .append(primaryKeyValue)
+ .append(";");
+ PreparedQueryObject query = new PreparedQueryObject();
+ query.appendQueryString(delete.toString());
+ executeMusicLockedPut(namespace,tableName,primaryKeyValue,query,lockId,null);
+ }
@Override
public void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException{
throw new NotImplementedException("Error, replay transaction in music mixin needs to be implemented");
}
+
+ @Override
+ public void deleteOldMriRows(Map<UUID, String> oldRowsAndLocks) throws MDBCServiceException {
+ //\TODO Do this operations in parallel or combine in only query to cassandra
+ for(Map.Entry<UUID,String> rows : oldRowsAndLocks.entrySet()){
+ //\TODO handle music delete correctly so we can delete the other rows
+ executeMusicLockedDelete(music_ns,musicRangeInformationTableName,rows.getKey().toString(),rows.getValue());
+ }
+ }
+
+ @Override
+ public OwnershipAndCheckpoint getOwnAndCheck(){
+ return ownAndCheck;
+ }
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
index 386865a..af935ef 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
@@ -17,6 +17,7 @@
* limitations under the License.
* ============LICENSE_END======================================================
*/
+
package org.onap.music.mdbc.mixins;
import java.sql.Connection;
@@ -843,7 +844,26 @@ NEW.field refers to the new value
jdbcConn.setAutoCommit(autocommit);
}
-
+
+ @Override
+ public void disableForeignKeyChecks() throws SQLException {
+ Statement disable = jdbcConn.createStatement();
+ disable.execute("SET FOREIGN_KEY_CHECKS=0");
+ disable.closeOnCompletion();
+ }
+
+ @Override
+ public void enableForeignKeyChecks() throws SQLException {
+ Statement enable = jdbcConn.createStatement();
+ enable.execute("SET FOREIGN_KEY_CHECKS=1");
+ enable.closeOnCompletion();
+ }
+
+ @Override
+ public void applyTxDigest(HashMap<Range, StagingTable> txDigest) throws SQLException {
+ replayTransaction(txDigest);
+ }
+
/**
* Replays operation into database, usually from txDigest
* @param jdbcStmt
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
new file mode 100644
index 0000000..a1228d5
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
@@ -0,0 +1,402 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc.ownership;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.DatabasePartition;
+import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.tables.MriRowComparator;
+import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+
+public class Dag {
+
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Dag.class);
+
+ private boolean valid;
+ private boolean ownInit;
+ private boolean readyInit;
+ private Map<UUID,DagNode> nodes;
+ private Queue<DagNode> readyNodes;
+ private Queue<DagNode> toApplyNodes;
+ private Map<Range,Set<DagNode>> rowsPerLatestRange;
+ private List<Range> ranges;
+
+ public Dag(){
+ this(false);
+ }
+
+
+ public Dag(boolean isValid){
+ valid=isValid;
+ ranges=null;
+ readyNodes = new LinkedList<>();
+ toApplyNodes = new LinkedList<>();
+ nodes = new HashMap<>();
+ ownInit = false;
+ readyInit = false;
+ rowsPerLatestRange = null;
+ }
+
+ private void createDag(List<MusicRangeInformationRow> rows, List<Range> ranges){
+ this.ranges = new ArrayList<>(ranges);
+ Map<Range,DagNode> latestRow = new HashMap<>();
+ Collections.sort(rows, new MriRowComparator());
+ for(MusicRangeInformationRow row : rows){
+ if(!nodes.containsKey(row.getPartitionIndex())){
+ DagNode node = new DagNode(row);
+ nodes.put(row.getPartitionIndex(),node);
+ for(Range range : ranges){
+ List<Range> nodeRanges = row.getDBPartition().getSnapshot();
+ for(Range nRange : nodeRanges){
+ if(nRange.overlaps(range)){
+ if(latestRow.containsKey(range)){
+ final DagNode dagNode = latestRow.get(range);
+ dagNode.addOutgoingEdge(node);
+ node.addIncomingEdge(dagNode);
+ }
+ latestRow.put(range,node);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public static Dag getDag(List<MusicRangeInformationRow> rows, List<Range> ranges){
+ Dag newDag = new Dag(true);
+ newDag.createDag(rows,ranges);
+ return newDag;
+ }
+
+ public void setRowsPerLatestRange(Map<Range, Set<DagNode>> rowsPerLatestRange) {
+ this.rowsPerLatestRange = rowsPerLatestRange;
+ }
+
+ private void initApplyDatastructures(){
+ readyInit=true;
+ nodes.forEach((id, node) -> {
+ if(node.hasNotIncomingEdges()) {
+ toApplyNodes.add(node);
+ }
+ });
+ }
+
+ private void initOwnDatastructures(){
+ ownInit = true;
+ nodes.forEach((id, node) -> {
+ if(node.hasNotIncomingEdges()) {
+ readyNodes.add(node);
+ }
+ });
+ }
+
+ public DagNode getNode(UUID rowId) throws MDBCServiceException {
+ if(!nodes.containsKey(rowId)){
+ return null;
+ }
+ return nodes.get(rowId);
+ }
+
+ public synchronized boolean hasNextToOwn(){
+ if(!ownInit){
+ initOwnDatastructures();
+ }
+ return !readyNodes.isEmpty();
+ }
+
+ public synchronized DagNode nextToOwn() throws MDBCServiceException {
+ if(!ownInit){
+ initOwnDatastructures();
+ }
+ DagNode nextNode = readyNodes.poll();
+ if(nextNode == null){
+ throw new MDBCServiceException("Next To Own was call without checking has next to own");
+ }
+ return nextNode;
+ }
+
+ public synchronized DagNode nextToApply(List<Range> ranges){
+ if(!readyInit){
+ initApplyDatastructures();
+ }
+ Set<Range> rangesSet = new HashSet<>(ranges);
+ while(!toApplyNodes.isEmpty()){
+ DagNode nextNode = toApplyNodes.poll();
+ List<DagNode> outgoing = nextNode.getOutgoingEdges();
+ for(DagNode out : outgoing){
+ out.setApplyDependencyReady(nextNode);
+ if(out.areApplyDependenciesReady()){
+ toApplyNodes.add(out);
+ }
+ }
+ if(!nextNode.wasApplied(rangesSet)){
+ return nextNode;
+ }
+ }
+ return null;
+ }
+
+ public synchronized boolean isDifferent(Dag other){
+ Set<DagNode> thisSet = new HashSet<>(nodes.values());
+ Set<DagNode> otherSet = new HashSet<>(other.nodes.values());
+ return !(thisSet.size()==otherSet.size() &&
+ thisSet.containsAll(otherSet));
+ }
+
+ public synchronized boolean isOwned(){
+ if(!valid){
+ return false;
+ }
+ else if(nodes.isEmpty()){
+ return true;
+ }
+ for(Map.Entry<UUID,DagNode> pair : nodes.entrySet()){
+ if(!pair.getValue().isOwned()){
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void setOwn(DagNode node) throws MDBCServiceException {
+ if(node == null){
+ throw new MDBCServiceException("Set Own was call with a null node");
+ }
+ final DagNode dagNode = nodes.get(node.getId());
+ if(dagNode == null){
+ throw new MDBCServiceException("Set Own was call with a node that is not in the DAG");
+ }
+ dagNode.setOwned();
+ for(DagNode next: dagNode.getOutgoingEdges()){
+ next.setOwnDependencyReady(dagNode);
+ if (next.areOwnDependenciesReady()) {
+ readyNodes.add(next);
+ }
+ }
+ }
+
+ public void setReady(DagNode node, Range range) throws MDBCServiceException {
+ if(node == null){
+ throw new MDBCServiceException("Set Ready was call with a null node");
+ }
+ final DagNode dagNode = nodes.get(node.getId());
+ if(dagNode == null){
+ throw new MDBCServiceException("Set Ready was call with a node that is not in the DAG");
+ }
+ dagNode.addReady(range);
+ }
+
+ public void setPartiallyReady(DagNode node, Range range, int index) throws MDBCServiceException {
+ if(node == null){
+ throw new MDBCServiceException("Set Ready was call with a null node");
+ }
+ final DagNode dagNode = nodes.get(node.getId());
+ if(dagNode == null){
+ throw new MDBCServiceException("Set Ready was call with a node that is not in the DAG");
+ }
+ dagNode.addPartiallyReady(range,index);
+ }
+
+ public synchronized boolean applied(){
+ if(!valid) {
+ return false;
+ }
+ if(!readyInit){
+ initApplyDatastructures();
+ }
+ return toApplyNodes.isEmpty();
+ }
+
+ public void setAlreadyApplied(Map<Range, Pair<MusicRangeInformationRow,Integer>> alreadyApplied, Set<Range> ranges)
+ throws MDBCServiceException {
+ for(Map.Entry<UUID,DagNode> node : nodes.entrySet()){
+ Set<Range> intersection = new HashSet<>(ranges);
+ intersection.retainAll(node.getValue().getRangeSet());
+ for(Range r : intersection){
+ if(alreadyApplied.containsKey(r)){
+ final Pair<MusicRangeInformationRow, Integer> appliedPair = alreadyApplied.get(r);
+ final MusicRangeInformationRow appliedRow = appliedPair.getKey();
+ final int index = appliedPair.getValue();
+ final long appliedTimestamp = appliedRow.getTimestamp();
+ final long nodeTimestamp = node.getValue().getTimestamp();
+ if(appliedTimestamp > nodeTimestamp){
+ setReady(node.getValue(),r);
+ }
+ else if(appliedTimestamp == nodeTimestamp){
+ setPartiallyReady(node.getValue(),r,index);
+ }
+ }
+ }
+ }
+ }
+
+ public void addNewNode(MusicRangeInformationRow row, List<DagNode> dependencies) throws MDBCServiceException {
+ boolean found=false;
+ if (ranges != null) {
+ DatabasePartition dbPartition = row.getDBPartition();
+ for(Range range : dbPartition.getSnapshot()){
+ for(Range dagRange : ranges){
+ if(dagRange.overlaps(range)){
+ found = true;
+ break;
+ }
+ }
+ if(found) break;
+ }
+ if(!found) {
+ return;
+ }
+ }
+
+ DagNode newNode = new DagNode(row);
+ nodes.put(row.getPartitionIndex(),newNode);
+ for(DagNode dependency : dependencies) {
+ newNode.addIncomingEdge(dependency);
+ DagNode localNode = getNode(dependency.getId());
+ localNode.addOutgoingEdge(newNode);
+ }
+ }
+
+ public void addNewNodeWithSearch(MusicRangeInformationRow row, List<Range> ranges) throws MDBCServiceException {
+ Map<Range,DagNode> newestNode = new HashMap<>();
+ for(DagNode node : nodes.values()){
+ for(Range range : ranges) {
+ if (node.getRangeSet().contains(range)){
+ if(!newestNode.containsKey(range)){
+ newestNode.put(range,node);
+ }
+ else{
+ DagNode current = newestNode.get(range);
+ if(node.getTimestamp() > current.getTimestamp()){
+ newestNode.put(range,node);
+ }
+ }
+ }
+ }
+ }
+ List<DagNode> dependencies = newestNode.values().stream().distinct().collect(Collectors.toList());
+ addNewNode(row,dependencies);
+ }
+
+ public Set<Range> getAllRanges(){
+ Set<Range> ranges = new HashSet<>();
+ for(DagNode node : nodes.values()){
+ ranges.addAll(node.getRangeSet());
+ }
+ return ranges;
+ }
+
+ public void setIsLatest(UUID id, boolean isLatest){
+ DagNode dagNode = nodes.get(id);
+ dagNode.setIsLatest(isLatest);
+ if(isLatest) {
+ MusicRangeInformationRow row = dagNode.getRow();
+ DatabasePartition dbPartition = row.getDBPartition();
+ for (Range range : dbPartition.getSnapshot()) {
+ if (!rowsPerLatestRange.containsKey(range)) {
+ rowsPerLatestRange.put(range, new HashSet<>());
+ }
+ rowsPerLatestRange.get(range).add(dagNode);
+ }
+ }
+ else{
+ MusicRangeInformationRow row = dagNode.getRow();
+ DatabasePartition dbPartition = row.getDBPartition();
+ for (Range range : dbPartition.getSnapshot()) {
+ if (rowsPerLatestRange.containsKey(range)) {
+ rowsPerLatestRange.get(range).remove(dagNode);
+ }
+ }
+ }
+ }
+
+ private Map<Range,Set<DagNode>> getIsLatestPerRange(){
+ if(rowsPerLatestRange == null){
+ rowsPerLatestRange = new HashMap<>();
+ }
+ for(DagNode node : nodes.values()){
+ MusicRangeInformationRow row = node.getRow();
+ DatabasePartition dbPartition = row.getDBPartition();
+ if (row.getIsLatest()) {
+ for(Range range : dbPartition.getSnapshot()){
+ if(!rowsPerLatestRange.containsKey(range)){
+ rowsPerLatestRange.put(range,new HashSet<>());
+ }
+ rowsPerLatestRange.get(range).add(node);
+ }
+ }
+ }
+ return new HashMap<>(rowsPerLatestRange);
+ }
+
+ private List<DagNode> getOldestDoubleRows(Map<Range,Set<DagNode>> rowPerLatestRange) throws MDBCServiceException {
+ Set<DagNode> oldest = new HashSet<>();
+ for(Map.Entry<Range,Set<DagNode>> rangeAndNodes : rowPerLatestRange.entrySet()){
+ Range range = rangeAndNodes.getKey();
+ Set<DagNode> nodes = rangeAndNodes.getValue();
+ if(nodes.size() > 2){
+ logger.error("Range "+range.table+"has more than 2 active rows");
+ throw new MDBCServiceException("Range has more than 2 active rows");
+ }
+ else if(nodes.size()==2){
+ DagNode older = null;
+ long olderTimestamp = Long.MAX_VALUE;
+ for(DagNode node : nodes){
+ if(olderTimestamp > node.getTimestamp()){
+ older = node;
+ olderTimestamp=node.getTimestamp();
+ }
+ }
+ oldest.add(older);
+ }
+ }
+ return new ArrayList<>(oldest);
+ }
+
+ public List<DagNode> getOldestDoubles() throws MDBCServiceException{
+ Map<Range,Set<DagNode>> rowsPerLatestRange = getIsLatestPerRange();
+ List<DagNode> toDisable = getOldestDoubleRows(rowsPerLatestRange);
+ return toDisable;
+ }
+
+ public Pair<List<Range>,Set<DagNode>> getIncompleteRangesAndDependents() throws MDBCServiceException {
+ List<Range> incomplete = new ArrayList<>();
+ Set<DagNode> dependents = new HashSet<>();
+ Map<Range,Set<DagNode>> rowsPerLatestRange = getIsLatestPerRange();
+ List<DagNode> toDisable = getOldestDoubleRows(rowsPerLatestRange);
+ for(DagNode node : toDisable) {
+ for (Range range : node.getRangeSet()) {
+ rowsPerLatestRange.get(range).remove(node);
+ if (rowsPerLatestRange.get(range).size() == 0) {
+ incomplete.add(range);
+ dependents.add(node);
+ }
+ }
+ }
+ return Pair.of(incomplete,dependents);
+ }
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java
new file mode 100644
index 0000000..e737b26
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java
@@ -0,0 +1,202 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc.ownership;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
+
+public class DagNode {
+
+ private boolean owned;
+ private boolean applyInit;
+ private final MusicRangeInformationRow row;
+ private int currentIndex;
+ private Set<DagNode> dependencies;
+ private Set<DagNode> outgoingEdges;
+ private Set<DagNode> readyOwnDependencies;
+ private Set<DagNode> readyAppliedDependencies;
+ private List<Range> alreadyApplied;
+ private Map<Range,Integer> partiallyApplied;
+ private Map<Range,Integer> startIndex;
+
+ public DagNode(MusicRangeInformationRow row){
+ this.row = row;
+ owned = false;
+ applyInit = false;
+ currentIndex = 0;
+ dependencies = new HashSet<>();
+ outgoingEdges = new HashSet<>();
+ readyOwnDependencies = new HashSet<>();
+ readyAppliedDependencies = new HashSet<>();
+ alreadyApplied = new ArrayList<>();
+ partiallyApplied = new HashMap<>();
+ startIndex = new HashMap<>();
+ }
+
+ public MusicRangeInformationRow getRow() { return row;}
+
+ public synchronized void setOwned(){
+ owned = true;
+ }
+
+ public synchronized boolean isOwned(){
+ return owned;
+ }
+
+ public UUID getId(){
+ return row.getPartitionIndex();
+ }
+
+ public synchronized void addIncomingEdge(DagNode sourceNode){
+ dependencies.add(sourceNode);
+ }
+
+ public synchronized void addOutgoingEdge(DagNode destinationNode){
+ outgoingEdges.add(destinationNode);
+ }
+
+ public synchronized boolean hasNotIncomingEdges(){
+ return dependencies.isEmpty();
+ }
+
+ public synchronized List<DagNode> getOutgoingEdges(){
+ return new ArrayList<>(outgoingEdges);
+ }
+
+ public synchronized void addReady(Range r) throws MDBCServiceException {
+ if(!row.getDBPartition().isContained(r)){
+ throw new MDBCServiceException("Range was set ready to a node that doesn't own it");
+ }
+ alreadyApplied.add(r);
+ }
+
+ public synchronized void addPartiallyReady(Range r, int index){
+ partiallyApplied.put(r,index);
+ }
+
+ public synchronized void setOwnDependencyReady(DagNode other){
+ readyOwnDependencies.add(other);
+ }
+
+ public synchronized boolean areOwnDependenciesReady(){
+ final int dSize = dependencies.size();
+ final int oSize = readyOwnDependencies.size();
+ return (dSize == oSize) && dependencies.containsAll(readyOwnDependencies);
+ }
+
+ public synchronized void setApplyDependencyReady(DagNode other){
+ readyAppliedDependencies.add(other);
+ }
+
+ public synchronized boolean areApplyDependenciesReady(){
+ final int dSize = dependencies.size();
+ final int oSize = readyAppliedDependencies.size();
+ return (dSize == oSize) && dependencies.containsAll(readyAppliedDependencies);
+ }
+
+ private void initializeApply(Set<Range> ranges){
+ applyInit = true;
+ int redoSize = row.getRedoLog().size();
+ // No need to apply
+ for(Range r: alreadyApplied){
+ startIndex.put(r,redoSize);
+ }
+ // Only apply the required subsection
+ partiallyApplied.forEach((r, index) -> {
+ startIndex.put(r,index);
+ });
+ // All other ranges need to be applied completely
+ Set<Range> alreadySet = new HashSet<>(alreadyApplied);
+ Set<Range> partialSet = partiallyApplied.keySet();
+ Set<Range> pending = new HashSet<>(ranges);
+ pending.removeAll(alreadySet);
+ pending.removeAll(partialSet);
+ for(Range r: pending){
+ startIndex.put(r,-1);
+ }
+ //Get the index of the redo log to begin with
+ currentIndex = startIndex.values().stream().mapToInt(v->v).min().orElse(0);
+ currentIndex = currentIndex+1;
+ }
+
+ public synchronized Pair<MusicTxDigestId, List<Range>> nextNotAppliedTransaction(Set<Range> ranges){
+ if(row.getRedoLog().isEmpty()) return null;
+ if(!applyInit){
+ initializeApply(ranges);
+ }
+ final List<MusicTxDigestId> redoLog = row.getRedoLog();
+ if(currentIndex < redoLog.size()){
+ List<Range> responseRanges= new ArrayList<>();
+ startIndex.forEach((r, index) -> {
+ if(index < currentIndex){
+ responseRanges.add(r);
+ }
+ });
+ return Pair.of(redoLog.get(currentIndex++),responseRanges);
+ }
+ return null;
+ }
+
+ public void setIsLatest(boolean isLatest){
+ row.setIsLatest(isLatest);
+ }
+
+ public synchronized Set<Range> getRangeSet(){
+ return new HashSet<>(row.getDBPartition().getSnapshot());
+ }
+
+ public synchronized boolean wasApplied(Set<Range> ranges){
+ if(row.getRedoLog().isEmpty()) return true;
+ if(!applyInit){
+ initializeApply(ranges);
+ }
+ return currentIndex >= row.getRedoLog().size();
+ }
+
+ public long getTimestamp(){
+ return row.getTimestamp();
+ }
+
+
+ @Override
+ public boolean equals(Object o){
+ if (this == o) return true;
+ if(o == null) return false;
+ if(!(o instanceof DagNode)) return false;
+ DagNode other = (DagNode) o;
+ return other.row.getPartitionIndex().equals(this.row.getPartitionIndex());
+ }
+
+ @Override
+ public int hashCode(){
+ return row.getPartitionIndex().hashCode();
+ }
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
new file mode 100644
index 0000000..4ccd21d
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
@@ -0,0 +1,225 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc.ownership;
+
+import java.sql.SQLException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.mixins.DBInterface;
+import org.onap.music.mdbc.mixins.LockResult;
+import org.onap.music.mdbc.mixins.MusicInterface;
+import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
+import org.onap.music.mdbc.tables.StagingTable;
+
+public class OwnershipAndCheckpoint{
+
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(OwnershipAndCheckpoint.class);
+ private Lock checkpointLock;
+ private AtomicBoolean change;
+ private Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied;
+ private Map<UUID,Long> ownershipBeginTime;
+ private long timeoutInMs;
+
+ public OwnershipAndCheckpoint(){
+ this(new HashMap<>(),Long.MAX_VALUE);
+ }
+
+ public OwnershipAndCheckpoint(Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied, long timeoutInMs){
+ change = new AtomicBoolean(true);
+ checkpointLock = new ReentrantLock();
+ this.alreadyApplied = alreadyApplied;
+ ownershipBeginTime = new HashMap<>();
+ this.timeoutInMs = timeoutInMs;
+ }
+
+ public void startOwnershipTimeoutClock(UUID id){
+ ownershipBeginTime.put(id,System.currentTimeMillis());
+ }
+
+ public void stopOwnershipTimeoutClock(UUID id){
+ if(ownershipBeginTime.containsKey(id)) {
+ ownershipBeginTime.remove(id);
+ }
+ else{
+ logger.warn("clock was deleted with an invalid/stale id "+id);
+ }
+ }
+
+ public boolean timeout(UUID id) throws MDBCServiceException {
+ long current = System.currentTimeMillis();
+ if(!ownershipBeginTime.containsKey(id)){
+ throw new MDBCServiceException("timeout was call with an invalid id");
+ }
+ Long beginTime = ownershipBeginTime.get(id);
+ if(current-beginTime > timeoutInMs){
+ return true;
+ }
+ return false;
+ }
+
+ public List<MusicRangeInformationRow> getRows(List<MusicRangeInformationRow> allMriRows, List<Range> ranges,
+ boolean onlyIsLatest){
+ List<MusicRangeInformationRow> rows = new ArrayList<>();
+ for(MusicRangeInformationRow row : allMriRows){
+ if(onlyIsLatest && !row.getIsLatest()){
+ continue;
+ }
+ final List<Range> rowRanges = row.getDBPartition().getSnapshot();
+ boolean found = false;
+ for(Range sRange : ranges){
+ for(Range rRange: rowRanges) {
+ if(sRange.overlaps(rRange)){
+ rows.add(row);
+ found=true;
+ break;
+ }
+ }
+ if(found) break;
+ }
+ }
+ return rows;
+ }
+
+ private List<MusicRangeInformationRow> getRows(MusicInterface music, List<Range> ranges, boolean onlyIsLatest)
+ throws MDBCServiceException {
+ final List<MusicRangeInformationRow> allMriRows = music.getAllMriRows();
+ return getRows(allMriRows,ranges,onlyIsLatest);
+ }
+
+ public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, List<Range> ranges,
+ Map<MusicRangeInformationRow, LockResult> locks, UUID ownOpId) throws MDBCServiceException {
+ try {
+ checkpointLock.lock();
+ change.set(true);
+ Set<Range> rangesSet = new HashSet<>(ranges);
+ extendedDag.setAlreadyApplied(alreadyApplied, rangesSet);
+ applyRequiredChanges(mi, di, extendedDag, ranges, ownOpId);
+ }
+ catch(MDBCServiceException e){
+ stopOwnershipTimeoutClock(ownOpId);
+ throw e;
+ }
+ finally {
+ checkpointLock.unlock();
+ }
+ }
+
+ private void enableForeignKeys(DBInterface di) throws MDBCServiceException {
+ try {
+ di.enableForeignKeyChecks();
+ } catch (SQLException e) {
+ throw new MDBCServiceException("Error enabling foreign keys checks",e);
+ }
+ }
+
+ private void disableForeignKeys(DBInterface di) throws MDBCServiceException {
+ try {
+ di.disableForeignKeyChecks();
+ } catch (SQLException e) {
+ throw new MDBCServiceException("Error disable foreign keys checks",e);
+ }
+ }
+
+ private void applyTxDigest(DBInterface di, HashMap<Range, StagingTable> txDigest)
+ throws MDBCServiceException {
+ try {
+ di.applyTxDigest(txDigest);
+ } catch (SQLException e) {
+ throw new MDBCServiceException("Error applying tx digest in local SQL",e);
+ }
+ }
+
+ public void warmup(MusicInterface mi, DBInterface di, List<Range> ranges) throws MDBCServiceException {
+ boolean ready = false;
+ change.set(true);
+ Set<Range> rangeSet = new HashSet<Range>(ranges);
+ Dag dag = new Dag(false);
+ while(!ready){
+ if(change.get()){
+ change.set(false);
+ final List<MusicRangeInformationRow> rows = getRows(mi, ranges,false);
+ dag = Dag.getDag(rows,ranges);
+ }
+ else if(!dag.applied()){
+ DagNode node = dag.nextToApply(ranges);
+ if(node!=null) {
+ Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet);
+ while (pair != null) {
+ disableForeignKeys(di);
+ checkpointLock.lock();
+ if (change.get()) {
+ enableForeignKeys(di);
+ checkpointLock.unlock();
+ break;
+ } else {
+ final HashMap<Range, StagingTable> txDigest = mi.getTxDigest(pair.getKey());
+ applyTxDigest(di, txDigest);
+ for (Range r : pair.getValue()) {
+ alreadyApplied.put(r, Pair.of(node.getRow(), pair.getKey().index));
+ }
+ }
+ pair = node.nextNotAppliedTransaction(rangeSet);
+ enableForeignKeys(di);
+ checkpointLock.unlock();
+ }
+ }
+ }
+ else{
+ ready = true;
+ }
+ }
+ }
+
+ private void applyRequiredChanges(MusicInterface mi, DBInterface db, Dag extendedDag, List<Range> ranges, UUID ownOpId)
+ throws MDBCServiceException {
+ Set<Range> rangeSet = new HashSet<Range>(ranges);
+ disableForeignKeys(db);
+ while(!extendedDag.applied()){
+ DagNode node = extendedDag.nextToApply(ranges);
+ if(node!=null) {
+ Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet);
+ while (pair != null) {
+ final HashMap<Range, StagingTable> txDigest = mi.getTxDigest(pair.getKey());
+ applyTxDigest(db, txDigest);
+ for (Range r : pair.getValue()) {
+ alreadyApplied.put(r, Pair.of(node.getRow(), pair.getKey().index));
+ }
+ pair = node.nextNotAppliedTransaction(rangeSet);
+ if (timeout(ownOpId)) {
+ enableForeignKeys(db);
+ throw new MDBCServiceException("Timeout apply changes to local dbi");
+ }
+ }
+ }
+ }
+ enableForeignKeys(db);
+
+ }
+
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriRowComparator.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriRowComparator.java
new file mode 100644
index 0000000..281d763
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriRowComparator.java
@@ -0,0 +1,31 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc.tables;
+
+import java.util.Comparator;
+
+public class MriRowComparator implements Comparator<MusicRangeInformationRow> {
+
+ @Override
+ public int compare(MusicRangeInformationRow o1, MusicRangeInformationRow o2) {
+ return Long.compare(o1.getTimestamp(),o2.getTimestamp());
+ }
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java
index 94011d7..2c5af2c 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,30 +19,38 @@
*/
package org.onap.music.mdbc.tables;
+import java.sql.Timestamp;
import java.util.List;
import java.util.UUID;
import org.onap.music.mdbc.DatabasePartition;
-public final class MusicRangeInformationRow {
+public final class MusicRangeInformationRow implements Comparable<MusicRangeInformationRow>{
private final DatabasePartition dbPartition;
- //private final UUID partitionIndex;
+ private final UUID partitionIndex;
private final List<MusicTxDigestId> redoLog;
- private final String ownerId;
+ private String ownerId;
private final String metricProcessId;
+ private boolean isLatest;
- public MusicRangeInformationRow (DatabasePartition dbPartition, List<MusicTxDigestId> redoLog,
- String ownerId, String metricProcessId) {
+ public MusicRangeInformationRow (UUID partitionIndex, DatabasePartition dbPartition, List<MusicTxDigestId> redoLog,
+ String ownerId, String metricProcessId, boolean isLatest) {
+ this.partitionIndex=partitionIndex;
this.dbPartition = dbPartition;
this.redoLog = redoLog;
this.ownerId = ownerId;
this.metricProcessId = metricProcessId;
+ this.isLatest = isLatest;
}
- /*public UUID getPartitionIndex() {
- return dbPartition.getMusicRangeInformationIndex();
- } */
-
+ public UUID getPartitionIndex() {
+ return dbPartition.getMRIIndex();
+ }
+
+ public boolean getIsLatest(){ return isLatest; }
+
+ public void setIsLatest(boolean isLatest){ this.isLatest = isLatest; }
+
public DatabasePartition getDBPartition() {
return this.dbPartition;
}
@@ -58,5 +66,35 @@ public final class MusicRangeInformationRow {
public String getMetricProcessId() {
return metricProcessId;
}
-
+
+ public long getTimestamp(){
+ return partitionIndex.timestamp();
+ }
+
+ public void setOwnerId(String newOwnerId){
+ this.ownerId=newOwnerId;
+ }
+
+ @Override
+ public int compareTo(MusicRangeInformationRow o) {
+ long thisTimestamp = this.getTimestamp();
+ long oTimestamp = o.getTimestamp();
+ //descending order
+ int returnVal = (thisTimestamp>oTimestamp)?-1:(thisTimestamp==oTimestamp)?0:1;
+ return returnVal;
+ }
+
+ @Override
+ public boolean equals(Object o){
+ if (this == o) return true;
+ if(o == null) return false;
+ if(!(o instanceof MusicRangeInformationRow)) return false;
+ MusicRangeInformationRow other = (MusicRangeInformationRow) o;
+ return other.getPartitionIndex().equals(this.getPartitionIndex());
+ }
+
+ @Override
+ public int hashCode(){
+ return partitionIndex.hashCode();
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
index b7c37ba..fa15354 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
@@ -113,7 +113,9 @@ public class MusicTxDigest {
for (MusicTxDigestId txId: partitionsRedoLogTxIds) {
HashMap<Range, StagingTable> transaction = mi.getTxDigest(txId);
try {
+ //\TODO do this two operations in parallel
dbi.replayTransaction(transaction);
+ mi.replayTransaction(transaction);
} catch (SQLException e) {
logger.error("Rolling back the entire digest replay. " + partitionId);
return;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
index fda34e2..1c37db0 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
@@ -23,12 +23,28 @@ import java.util.UUID;
public final class MusicTxDigestId {
public final UUID txId;
+ public final int index;
- public MusicTxDigestId(UUID primaryKey) {
+ public MusicTxDigestId(UUID primaryKey, int index) {
this.txId= primaryKey;
+ this.index=index;
}
public boolean isEmpty() {
return (this.txId==null);
}
+
+ @Override
+ public boolean equals(Object o){
+ if (this == o) return true;
+ if(o == null) return false;
+ if(!(o instanceof MusicTxDigestId)) return false;
+ MusicTxDigestId other = (MusicTxDigestId) o;
+ return other.txId.equals(this.txId);
+ }
+
+ @Override
+ public int hashCode(){
+ return txId.hashCode();
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/RangeDependency.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/RangeDependency.java
new file mode 100644
index 0000000..cc8c875
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/RangeDependency.java
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc.tables;
+
+import org.onap.music.mdbc.Range;
+
+import java.util.List;
+
+public class RangeDependency {
+ final Range baseRange;
+ final List<Range> dependentRanges;
+
+ public RangeDependency(Range baseRange, List<Range> dependentRanges){
+ this.baseRange=baseRange;
+ this.dependentRanges=dependentRanges;
+ }
+ public Range getRange(){
+ return baseRange;
+ }
+ public List<Range> dependentRanges(){
+ return dependentRanges;
+ }
+}