aboutsummaryrefslogtreecommitdiffstats
path: root/mdbc-server/src
diff options
context:
space:
mode:
Diffstat (limited to 'mdbc-server/src')
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java18
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java1
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java59
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java21
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json10
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java3
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java51
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java9
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java7
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java17
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java2
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java42
-rwxr-xr-xmdbc-server/src/main/resources/music.properties2
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java5
14 files changed, 168 insertions, 79 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
index ff8eb80..4204960 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
@@ -73,7 +73,7 @@ public class DatabasePartition {
* This function is used to change the contents of this, with the contents of a different object
* @param otherPartition partition that is used to substitute the local contents
*/
- public void updateDatabasePartition(DatabasePartition otherPartition){
+ public synchronized void updateDatabasePartition(DatabasePartition otherPartition){
musicRangeInformationIndex = otherPartition.musicRangeInformationIndex;//Index that can be obtained either from
lockId = otherPartition.lockId;
ranges = otherPartition.ranges;
@@ -90,21 +90,21 @@ public class DatabasePartition {
}
- public boolean isLocked(){return lockId != null && !lockId.isEmpty(); }
+ public synchronized boolean isLocked(){return lockId != null && !lockId.isEmpty(); }
- public boolean isReady() {
+ public synchronized boolean isReady() {
return ready;
}
- public void setReady(boolean ready) {
+ public synchronized void setReady(boolean ready) {
this.ready = ready;
}
- public UUID getMRIIndex() {
+ public synchronized UUID getMRIIndex() {
return musicRangeInformationIndex;
}
- public void setMusicRangeInformationIndex(UUID musicRangeInformationIndex) {
+ public synchronized void setMusicRangeInformationIndex(UUID musicRangeInformationIndex) {
this.musicRangeInformationIndex = musicRangeInformationIndex;
}
@@ -179,15 +179,15 @@ public class DatabasePartition {
return range;
}
- public String getLockId() {
+ public synchronized String getLockId() {
return lockId;
}
- public void setLockId(String lockId) {
+ public synchronized void setLockId(String lockId) {
this.lockId = lockId;
}
- public boolean isContained(Range range){
+ public synchronized boolean isContained(Range range){
for(Range r: ranges){
if(r.overlaps(range)){
return true;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
index 69a678b..d336eef 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
@@ -504,6 +504,7 @@ public class MdbcConnection implements Connection {
DatabasePartition tempPartition = own(scQueryTables);
if(tempPartition!=null && tempPartition != partition) {
this.partition.updateDatabasePartition(tempPartition);
+ mi.reloadAlreadyApplied(this.partition);
}
dbi.preStatementHook(sql);
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index 1f722f1..b403dd2 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -45,6 +45,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
* \TODO Implement an interface for the server logic and a factory
@@ -79,7 +82,10 @@ public class StateManager {
/** Identifier for this server instance */
private String mdbcServerName;
private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition
+ private final Lock eventualLock = new ReentrantLock();
private List<Range> eventualRanges;
+ private final Lock warmupLock = new ReentrantLock();
+ private List<Range> warmupRanges;
public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
this.sqlDBName = sqlDBName;
@@ -87,7 +93,7 @@ public class StateManager {
this.info = info;
this.mdbcServerName = mdbcServerName;
- this.connectionRanges = new HashMap<>();
+ this.connectionRanges = new ConcurrentHashMap<>();
this.transactionInfo = new TxCommitProgress();
//\fixme this might not be used, delete?
try {
@@ -145,17 +151,52 @@ public class StateManager {
return this.musicInterface;
}
- public List<DatabasePartition> getRanges() {
+ public List<DatabasePartition> getPartitions() {
return new ArrayList<>(connectionRanges.values());
}
+ public List<Range> getWarmupRanges(){
+ warmupLock.lock();
+ List<Range> returnArray;
+ try {
+ if(warmupRanges!=null) {
+ returnArray = new ArrayList<>(warmupRanges);
+ }
+ else{
+ returnArray = null;
+ }
+ }
+ finally{
+ warmupLock.unlock();
+ }
+ return returnArray;
+ }
public List<Range> getEventualRanges() {
- return eventualRanges;
+ eventualLock.lock();
+ List<Range> returnArray;
+ try {
+ if(eventualRanges!=null){
+ returnArray = new ArrayList<>(eventualRanges);
+ }
+ else{
+ returnArray= null;
+ }
+ }
+ finally{
+ eventualLock.unlock();
+ }
+ return returnArray;
}
public void setEventualRanges(List<Range> eventualRanges) {
- this.eventualRanges = eventualRanges;
+ eventualLock.lock();
+ try {
+ this.eventualRanges = eventualRanges;
+ }
+ finally{
+ eventualLock.unlock();
+ }
}
public void closeConnection(String connectionId){
@@ -267,4 +308,14 @@ public class StateManager {
}
}
+
+ public void setWarmupRanges(List<Range> warmupRanges) {
+ warmupLock.lock();
+ try {
+ this.warmupRanges = warmupRanges;
+ }
+ finally{
+ warmupLock.unlock();
+ }
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
index 8497911..a9d179f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
@@ -51,6 +51,7 @@ public class TablesConfiguration {
private String internalNamespace;
private int internalReplicationFactor;
private String musicNamespace;
+ private int musicReplicationFactor;
private String tableToPartitionName;
private String partitionInformationTableName;
private String redoHistoryTableName;
@@ -66,6 +67,7 @@ public class TablesConfiguration {
*/
public List<NodeConfiguration> initializeAndCreateNodeConfigurations() throws MDBCServiceException {
logger.info("initializing the required spaces");
+ createKeyspaces();
initInternalNamespace();
List<NodeConfiguration> nodeConfigs = new ArrayList<>();
@@ -81,10 +83,6 @@ public class TablesConfiguration {
String partitionId;
if(partitionInfo.partitionId==null || partitionInfo.partitionId.isEmpty()){
- if(partitionInfo.replicationFactor==0){
- logger.error("Replication factor and partition id are both empty, and this is an invalid configuration" );
- throw new MDBCServiceException("Replication factor and partition id are both empty, and this is an invalid configuration");
- }
//1) Create a row in the partition info table
partitionId = MDBCUtils.generateTimebasedUniqueKey().toString();
}
@@ -110,6 +108,12 @@ public class TablesConfiguration {
return nodeConfigs;
}
+ private void createKeyspaces() throws MDBCServiceException {
+ MusicMixin.createKeyspace(internalNamespace,internalReplicationFactor);
+ MusicMixin.createKeyspace(musicNamespace,musicReplicationFactor);
+
+ }
+
private void checkIfMriIsEmpty(String mriTableName) throws MDBCServiceException {
//First check if table exists
StringBuilder checkTableExistsString = new StringBuilder("SELECT table_name FROM system_schema.tables WHERE keyspace_name='")
@@ -172,7 +176,6 @@ public class TablesConfiguration {
private String mriTableName;
private String mtxdTableName;
private String partitionId;
- private int replicationFactor;
public List<Range> getTables() {
return tables;
@@ -206,14 +209,6 @@ public class TablesConfiguration {
this.partitionId = partitionId;
}
- public int getReplicationFactor() {
- return replicationFactor;
- }
-
- public void setReplicationFactor(int replicationFactor) {
- this.replicationFactor = replicationFactor;
- }
-
public String getMtxdTableName(){
return mtxdTableName;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json
index 2e4e0ee..8cbbfec 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json
@@ -9,15 +9,15 @@
"owner": "",
"mriTableName": "musicrangeinformation",
"mtxdTableName": "musictxdigest",
- "partitionId": "",
- "replicationFactor": 1
+ "partitionId": ""
}
],
+ "internalNamespace": "music_internal",
+ "internalReplicationFactor": 1,
"musicNamespace": "namespace",
+ "musicReplicationFactor": 1,
"tableToPartitionName": "tabletopartition",
"partitionInformationTableName": "partitioninfo",
"redoHistoryTableName": "redohistory",
- "sqlDatabaseName": "test",
- "internalNamespace": "music_internal",
- "internalReplicationFactor": 1
+ "sqlDatabaseName": "test"
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
index 35b6121..49d4c71 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
@@ -321,7 +321,8 @@ public interface MusicInterface {
OwnershipAndCheckpoint getOwnAndCheck();
-
ArrayList<HashMap<Range, StagingTable>> getEveTxDigest() throws MDBCServiceException;
+
+ void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index e8028c1..999c67f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -36,6 +36,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;
@@ -57,11 +58,7 @@ import org.onap.music.mdbc.TableInfo;
import org.onap.music.mdbc.ownership.Dag;
import org.onap.music.mdbc.ownership.DagNode;
import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
-import org.onap.music.mdbc.tables.MusicRangeInformationRow;
-import org.onap.music.mdbc.tables.MusicTxDigestId;
-import org.onap.music.mdbc.tables.RangeDependency;
-import org.onap.music.mdbc.tables.StagingTable;
-import org.onap.music.mdbc.tables.TxCommitProgress;
+import org.onap.music.mdbc.tables.*;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.DataType;
@@ -208,7 +205,7 @@ public class MusicMixin implements MusicInterface {
private boolean keyspace_created = false;
private Map<String, PreparedStatement> ps_cache = new HashMap<>();
private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>());
- private Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied;
+ private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
private OwnershipAndCheckpoint ownAndCheck;
public MusicMixin() {
@@ -243,7 +240,7 @@ public class MusicMixin implements MusicInterface {
String t = info.getProperty(KEY_TIMEOUT);
this.timeout = (t == null) ? DEFAULT_TIMEOUT : Integer.parseInt(t);
- alreadyApplied = new HashMap<>();
+ alreadyApplied = new ConcurrentHashMap<>();
ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied,timeout);
initializeMetricTables();
@@ -263,21 +260,25 @@ public class MusicMixin implements MusicInterface {
*/
@Override
public void createKeyspace() throws MDBCServiceException {
+ createKeyspace(this.music_ns,this.music_rfactor);
+ }
+ public static void createKeyspace(String keyspace, int replicationFactor) throws MDBCServiceException {
Map<String,Object> replicationInfo = new HashMap<>();
replicationInfo.put("'class'", "'SimpleStrategy'");
- replicationInfo.put("'replication_factor'", music_rfactor);
+ replicationInfo.put("'replication_factor'", replicationFactor);
PreparedQueryObject queryObject = new PreparedQueryObject();
queryObject.appendQueryString(
- "CREATE KEYSPACE IF NOT EXISTS " + this.music_ns +
+ "CREATE KEYSPACE IF NOT EXISTS " + keyspace +
" WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":"));
try {
MusicCore.nonKeyRelatedPut(queryObject, "eventual");
} catch (MusicServiceException e) {
- if (!e.getMessage().equals("Keyspace "+music_ns+" already exists")) {
- throw new MDBCServiceException("Error creating namespace: "+music_ns+". Internal error:"+e.getErrorMessage(), e);
+ if (!e.getMessage().equals("Keyspace "+keyspace+" already exists")) {
+ throw new MDBCServiceException("Error creating namespace: "+keyspace+". Internal error:"+e.getErrorMessage(),
+ e);
}
}
}
@@ -1335,6 +1336,7 @@ public class MusicMixin implements MusicInterface {
//0. See if reference to lock was already created
String lockId = partition.getLockId();
if(mriIndex==null || lockId == null || lockId.isEmpty()) {
+ //\TODO fix this
own(partition.getSnapshot(),partition, MDBCUtils.generateTimebasedUniqueKey());
}
@@ -1361,7 +1363,7 @@ public class MusicMixin implements MusicInterface {
} catch (IOException e) {
throw new MDBCServiceException("Failed to serialized transaction digest with error " + e.toString(), e);
}
- MusicTxDigestId digestId = new MusicTxDigestId(commitId, -1);
+ MusicTxDigestId digestId = new MusicTxDigestId(mriIndex, -1);
addTxDigest(digestId, serializedTransactionDigest);
//2. Save RRT index to RQ
if (progressKeeper != null) {
@@ -1369,6 +1371,20 @@ public class MusicMixin implements MusicInterface {
}
//3. Append RRT index into the corresponding TIT row array
appendToRedoLog(partition, digestId);
+ List<Range> ranges = partition.getSnapshot();
+ for(Range r : ranges) {
+ if(!alreadyApplied.containsKey(r)){
+ throw new MDBCServiceException("already applied data structure was not updated correctly and range "
+ +r+" is not contained");
+ }
+ Pair<MriReference, Integer> rowAndIndex = alreadyApplied.get(r);
+ MriReference key = rowAndIndex.getKey();
+ if(!mriIndex.equals(key.index)){
+ throw new MDBCServiceException("already applied data structure was not updated correctly and range "+
+ r+" is not pointing to row: "+mriIndex.toString());
+ }
+ alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), rowAndIndex.getValue()+1));
+ }
}
}
@@ -1936,8 +1952,17 @@ public class MusicMixin implements MusicInterface {
return ecDigestList;
}
+ @Override
+ public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException {
+ List<Range> snapshot = partition.getSnapshot();
+ UUID row = partition.getMRIIndex();
+ for(Range r : snapshot){
+ alreadyApplied.put(r,Pair.of(new MriReference(row),-1));
+ }
+
+ }
+
-
ResultSet getAllMriCassandraRows() throws MDBCServiceException {
StringBuilder cqlOperation = new StringBuilder();
cqlOperation.append("SELECT * FROM ")
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
index 00abe85..64f4e0c 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
@@ -949,9 +949,12 @@ NEW.field refers to the new value
StringBuilder keyCondStmt = new StringBuilder();
String and = "";
for (String key: primaryKeys.keySet()) {
- Object val = primaryKeys.get(key);
- keyCondStmt.append(and + key + "=\"" + val + "\"");
- and = " AND ";
+ // We cannot use the default primary key for the sql table and operations
+ if(!key.equals(mi.getMusicDefaultPrimaryKeyName())) {
+ Object val = primaryKeys.get(key);
+ keyCondStmt.append(and + key + "=\"" + val + "\"");
+ and = " AND ";
+ }
}
return keyCondStmt.toString();
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
index 68d1f19..02c5d7b 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
@@ -29,6 +29,7 @@ import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MriRowComparator;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
@@ -231,15 +232,15 @@ public class Dag {
return toApplyNodes.isEmpty();
}
- public void setAlreadyApplied(Map<Range, Pair<MusicRangeInformationRow,Integer>> alreadyApplied, Set<Range> ranges)
+ public void setAlreadyApplied(Map<Range, Pair<MriReference,Integer>> alreadyApplied, Set<Range> ranges)
throws MDBCServiceException {
for(Map.Entry<UUID,DagNode> node : nodes.entrySet()){
Set<Range> intersection = new HashSet<>(ranges);
intersection.retainAll(node.getValue().getRangeSet());
for(Range r : intersection){
if(alreadyApplied.containsKey(r)){
- final Pair<MusicRangeInformationRow, Integer> appliedPair = alreadyApplied.get(r);
- final MusicRangeInformationRow appliedRow = appliedPair.getKey();
+ final Pair<MriReference, Integer> appliedPair = alreadyApplied.get(r);
+ final MriReference appliedRow = appliedPair.getKey();
final int index = appliedPair.getValue();
final long appliedTimestamp = appliedRow.getTimestamp();
final long nodeTimestamp = node.getValue().getTimestamp();
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
index 4ccd21d..8ec1793 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
@@ -33,6 +33,7 @@ import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.mixins.DBInterface;
import org.onap.music.mdbc.mixins.LockResult;
import org.onap.music.mdbc.mixins.MusicInterface;
+import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.StagingTable;
@@ -42,7 +43,7 @@ public class OwnershipAndCheckpoint{
private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(OwnershipAndCheckpoint.class);
private Lock checkpointLock;
private AtomicBoolean change;
- private Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied;
+ private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
private Map<UUID,Long> ownershipBeginTime;
private long timeoutInMs;
@@ -50,7 +51,7 @@ public class OwnershipAndCheckpoint{
this(new HashMap<>(),Long.MAX_VALUE);
}
- public OwnershipAndCheckpoint(Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied, long timeoutInMs){
+ public OwnershipAndCheckpoint(Map<Range, Pair<MriReference, Integer>> alreadyApplied, long timeoutInMs){
change = new AtomicBoolean(true);
checkpointLock = new ReentrantLock();
this.alreadyApplied = alreadyApplied;
@@ -114,6 +115,9 @@ public class OwnershipAndCheckpoint{
public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, List<Range> ranges,
Map<MusicRangeInformationRow, LockResult> locks, UUID ownOpId) throws MDBCServiceException {
+ if(ranges.isEmpty()){
+ return;
+ }
try {
checkpointLock.lock();
change.set(true);
@@ -156,6 +160,9 @@ public class OwnershipAndCheckpoint{
}
public void warmup(MusicInterface mi, DBInterface di, List<Range> ranges) throws MDBCServiceException {
+ if(ranges.isEmpty()){
+ return;
+ }
boolean ready = false;
change.set(true);
Set<Range> rangeSet = new HashSet<Range>(ranges);
@@ -181,7 +188,8 @@ public class OwnershipAndCheckpoint{
final HashMap<Range, StagingTable> txDigest = mi.getTxDigest(pair.getKey());
applyTxDigest(di, txDigest);
for (Range r : pair.getValue()) {
- alreadyApplied.put(r, Pair.of(node.getRow(), pair.getKey().index));
+ MusicRangeInformationRow row = node.getRow();
+ alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index));
}
}
pair = node.nextNotAppliedTransaction(rangeSet);
@@ -208,7 +216,8 @@ public class OwnershipAndCheckpoint{
final HashMap<Range, StagingTable> txDigest = mi.getTxDigest(pair.getKey());
applyTxDigest(db, txDigest);
for (Range r : pair.getValue()) {
- alreadyApplied.put(r, Pair.of(node.getRow(), pair.getKey().index));
+ MusicRangeInformationRow row = node.getRow();
+ alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index));
}
pair = node.nextNotAppliedTransaction(rangeSet);
if (timeout(ownOpId)) {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java
index 69f2c31..8aad335 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java
@@ -28,4 +28,6 @@ public final class MriReference {
this.index= index;
}
+ public long getTimestamp() { return index.timestamp();}
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
index 1da2d79..3b6953c 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
@@ -21,10 +21,7 @@ package org.onap.music.mdbc.tables;
import java.sql.SQLException;
import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.TimeUnit;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
@@ -70,23 +67,26 @@ public class MusicTxDigest {
continue;
}
//2) for each partition I don't own
- List<DatabasePartition> ranges = stateManager.getRanges();
- if(ranges.size()!=0) {
- DatabasePartition myPartition = ranges.get(0);
- for (UUID partition : partitions) {
- if (!partition.equals(myPartition.getMRIIndex())) {
- try {
- //replayDigestForPartition(mi, partition, dbi);
- mi.getOwnAndCheck().warmup(mi, dbi, myPartition.getSnapshot());
- } catch (MDBCServiceException e) {
- logger.error("Unable to update for partition : " + partition + ". " + e.getMessage());
- continue;
- }
- }
- }
- }
-
- //Step 3: ReplayDigest() for E.C conditions
+ final List<Range> warmuplist = stateManager.getWarmupRanges();
+ if(warmuplist!=null) {
+ final Set<Range> warmupRanges = new HashSet(warmuplist);
+ final List<DatabasePartition> currentPartitions = stateManager.getPartitions();
+ List<Range> missingRanges = new ArrayList<>();
+ if (currentPartitions.size() != 0) {
+ for (DatabasePartition part : currentPartitions) {
+ List<Range> partitionRanges = part.getSnapshot();
+ warmupRanges.removeAll(partitionRanges);
+ }
+ try {
+ mi.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
+ } catch (MDBCServiceException e) {
+ logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
+ continue;
+ }
+ }
+ }
+
+ //Step 3: ReplayDigest() for E.C conditions
try {
replayDigest(mi,dbi);
} catch (MDBCServiceException e) {
diff --git a/mdbc-server/src/main/resources/music.properties b/mdbc-server/src/main/resources/music.properties
index a676f70..21f3e92 100755
--- a/mdbc-server/src/main/resources/music.properties
+++ b/mdbc-server/src/main/resources/music.properties
@@ -1,5 +1,5 @@
cassandra.host =\
- 143.215.128.49
+ 192.168.1.19
cassandra.user =\
metric
cassandra.password =\
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
index 85e31cd..da64595 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
@@ -34,6 +34,7 @@ import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.MusicTxDigestId;
@@ -191,7 +192,7 @@ public class DagTest {
@Test
public void nextToApply2() throws InterruptedException, MDBCServiceException {
- Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied = new HashMap<>();
+ Map<Range, Pair<MriReference, Integer>> alreadyApplied = new HashMap<>();
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> ranges = new ArrayList<>( Arrays.asList(
new Range("range1")
@@ -206,7 +207,7 @@ public class DagTest {
new MusicTxDigestId(MDBCUtils.generateUniqueKey(),1)
));
MusicRangeInformationRow newRow = createNewRow(new ArrayList<>(ranges), "", false, redo2);
- alreadyApplied.put(new Range("range1"),Pair.of(newRow, 0));
+ alreadyApplied.put(new Range("range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0));
rows.add(newRow);
MILLISECONDS.sleep(10);
List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList(