diff options
author | Enrique Saurez <esaurez@gatech.edu> | 2019-01-26 22:51:56 -0500 |
---|---|---|
committer | Enrique Saurez <enrique.saurez@gmail.com> | 2019-01-29 14:20:49 -0500 |
commit | 2a4f867f4e6b8c896124958885119475eee1cbb0 (patch) | |
tree | 278c7be22759c8b37be2b2f6bb52d38475e67a51 /mdbc-server | |
parent | b357ddb0869c2cc44eb0bd5800ed66914f49c468 (diff) |
Keyspace creation, and handling of mdbc_cuid
Issue-ID: MUSIC-281
Signed-off-by: Enrique Saurez<enrique.saurez@gmail.com>
Change-Id: I3bc685e30e064c1c93386301385115632b179449
Diffstat (limited to 'mdbc-server')
14 files changed, 168 insertions, 79 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java index ff8eb80..4204960 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java @@ -73,7 +73,7 @@ public class DatabasePartition { * This function is used to change the contents of this, with the contents of a different object * @param otherPartition partition that is used to substitute the local contents */ - public void updateDatabasePartition(DatabasePartition otherPartition){ + public synchronized void updateDatabasePartition(DatabasePartition otherPartition){ musicRangeInformationIndex = otherPartition.musicRangeInformationIndex;//Index that can be obtained either from lockId = otherPartition.lockId; ranges = otherPartition.ranges; @@ -90,21 +90,21 @@ public class DatabasePartition { } - public boolean isLocked(){return lockId != null && !lockId.isEmpty(); } + public synchronized boolean isLocked(){return lockId != null && !lockId.isEmpty(); } - public boolean isReady() { + public synchronized boolean isReady() { return ready; } - public void setReady(boolean ready) { + public synchronized void setReady(boolean ready) { this.ready = ready; } - public UUID getMRIIndex() { + public synchronized UUID getMRIIndex() { return musicRangeInformationIndex; } - public void setMusicRangeInformationIndex(UUID musicRangeInformationIndex) { + public synchronized void setMusicRangeInformationIndex(UUID musicRangeInformationIndex) { this.musicRangeInformationIndex = musicRangeInformationIndex; } @@ -179,15 +179,15 @@ public class DatabasePartition { return range; } - public String getLockId() { + public synchronized String getLockId() { return lockId; } - public void setLockId(String lockId) { + public synchronized void setLockId(String lockId) { this.lockId = lockId; } - public boolean isContained(Range range){ + public synchronized boolean isContained(Range range){ for(Range r: ranges){ if(r.overlaps(range)){ return true; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java index 69a678b..d336eef 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java @@ -504,6 +504,7 @@ public class MdbcConnection implements Connection { DatabasePartition tempPartition = own(scQueryTables); if(tempPartition!=null && tempPartition != partition) { this.partition.updateDatabasePartition(tempPartition); + mi.reloadAlreadyApplied(this.partition); } dbi.preStatementHook(sql); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index 1f722f1..b403dd2 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -45,6 +45,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * \TODO Implement an interface for the server logic and a factory @@ -79,7 +82,10 @@ public class StateManager { /** Identifier for this server instance */ private String mdbcServerName; private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition + private final Lock eventualLock = new ReentrantLock(); private List<Range> eventualRanges; + private final Lock warmupLock = new ReentrantLock(); + private List<Range> warmupRanges; public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException { this.sqlDBName = sqlDBName; @@ -87,7 +93,7 @@ public class StateManager { this.info = info; this.mdbcServerName = mdbcServerName; - this.connectionRanges = new HashMap<>(); + this.connectionRanges = new ConcurrentHashMap<>(); this.transactionInfo = new TxCommitProgress(); //\fixme this might not be used, delete? try { @@ -145,17 +151,52 @@ public class StateManager { return this.musicInterface; } - public List<DatabasePartition> getRanges() { + public List<DatabasePartition> getPartitions() { return new ArrayList<>(connectionRanges.values()); } + public List<Range> getWarmupRanges(){ + warmupLock.lock(); + List<Range> returnArray; + try { + if(warmupRanges!=null) { + returnArray = new ArrayList<>(warmupRanges); + } + else{ + returnArray = null; + } + } + finally{ + warmupLock.unlock(); + } + return returnArray; + } public List<Range> getEventualRanges() { - return eventualRanges; + eventualLock.lock(); + List<Range> returnArray; + try { + if(eventualRanges!=null){ + returnArray = new ArrayList<>(eventualRanges); + } + else{ + returnArray= null; + } + } + finally{ + eventualLock.unlock(); + } + return returnArray; } public void setEventualRanges(List<Range> eventualRanges) { - this.eventualRanges = eventualRanges; + eventualLock.lock(); + try { + this.eventualRanges = eventualRanges; + } + finally{ + eventualLock.unlock(); + } } public void closeConnection(String connectionId){ @@ -267,4 +308,14 @@ public class StateManager { } } + + public void setWarmupRanges(List<Range> warmupRanges) { + warmupLock.lock(); + try { + this.warmupRanges = warmupRanges; + } + finally{ + warmupLock.unlock(); + } + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java index 8497911..a9d179f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java @@ -51,6 +51,7 @@ public class TablesConfiguration { private String internalNamespace; private int internalReplicationFactor; private String musicNamespace; + private int musicReplicationFactor; private String tableToPartitionName; private String partitionInformationTableName; private String redoHistoryTableName; @@ -66,6 +67,7 @@ public class TablesConfiguration { */ public List<NodeConfiguration> initializeAndCreateNodeConfigurations() throws MDBCServiceException { logger.info("initializing the required spaces"); + createKeyspaces(); initInternalNamespace(); List<NodeConfiguration> nodeConfigs = new ArrayList<>(); @@ -81,10 +83,6 @@ public class TablesConfiguration { String partitionId; if(partitionInfo.partitionId==null || partitionInfo.partitionId.isEmpty()){ - if(partitionInfo.replicationFactor==0){ - logger.error("Replication factor and partition id are both empty, and this is an invalid configuration" ); - throw new MDBCServiceException("Replication factor and partition id are both empty, and this is an invalid configuration"); - } //1) Create a row in the partition info table partitionId = MDBCUtils.generateTimebasedUniqueKey().toString(); } @@ -110,6 +108,12 @@ public class TablesConfiguration { return nodeConfigs; } + private void createKeyspaces() throws MDBCServiceException { + MusicMixin.createKeyspace(internalNamespace,internalReplicationFactor); + MusicMixin.createKeyspace(musicNamespace,musicReplicationFactor); + + } + private void checkIfMriIsEmpty(String mriTableName) throws MDBCServiceException { //First check if table exists StringBuilder checkTableExistsString = new StringBuilder("SELECT table_name FROM system_schema.tables WHERE keyspace_name='") @@ -172,7 +176,6 @@ public class TablesConfiguration { private String mriTableName; private String mtxdTableName; private String partitionId; - private int replicationFactor; public List<Range> getTables() { return tables; @@ -206,14 +209,6 @@ public class TablesConfiguration { this.partitionId = partitionId; } - public int getReplicationFactor() { - return replicationFactor; - } - - public void setReplicationFactor(int replicationFactor) { - this.replicationFactor = replicationFactor; - } - public String getMtxdTableName(){ return mtxdTableName; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json index 2e4e0ee..8cbbfec 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json @@ -9,15 +9,15 @@ "owner": "", "mriTableName": "musicrangeinformation", "mtxdTableName": "musictxdigest", - "partitionId": "", - "replicationFactor": 1 + "partitionId": "" } ], + "internalNamespace": "music_internal", + "internalReplicationFactor": 1, "musicNamespace": "namespace", + "musicReplicationFactor": 1, "tableToPartitionName": "tabletopartition", "partitionInformationTableName": "partitioninfo", "redoHistoryTableName": "redohistory", - "sqlDatabaseName": "test", - "internalNamespace": "music_internal", - "internalReplicationFactor": 1 + "sqlDatabaseName": "test" } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java index 35b6121..49d4c71 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java @@ -321,7 +321,8 @@ public interface MusicInterface { OwnershipAndCheckpoint getOwnAndCheck(); - ArrayList<HashMap<Range, StagingTable>> getEveTxDigest() throws MDBCServiceException; + + void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index e8028c1..999c67f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiFunction; import org.apache.commons.lang.NotImplementedException; import org.apache.commons.lang3.tuple.Pair; @@ -57,11 +58,7 @@ import org.onap.music.mdbc.TableInfo; import org.onap.music.mdbc.ownership.Dag; import org.onap.music.mdbc.ownership.DagNode; import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint; -import org.onap.music.mdbc.tables.MusicRangeInformationRow; -import org.onap.music.mdbc.tables.MusicTxDigestId; -import org.onap.music.mdbc.tables.RangeDependency; -import org.onap.music.mdbc.tables.StagingTable; -import org.onap.music.mdbc.tables.TxCommitProgress; +import org.onap.music.mdbc.tables.*; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.ColumnDefinitions; import com.datastax.driver.core.DataType; @@ -208,7 +205,7 @@ public class MusicMixin implements MusicInterface { private boolean keyspace_created = false; private Map<String, PreparedStatement> ps_cache = new HashMap<>(); private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>()); - private Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied; + private Map<Range, Pair<MriReference, Integer>> alreadyApplied; private OwnershipAndCheckpoint ownAndCheck; public MusicMixin() { @@ -243,7 +240,7 @@ public class MusicMixin implements MusicInterface { String t = info.getProperty(KEY_TIMEOUT); this.timeout = (t == null) ? DEFAULT_TIMEOUT : Integer.parseInt(t); - alreadyApplied = new HashMap<>(); + alreadyApplied = new ConcurrentHashMap<>(); ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied,timeout); initializeMetricTables(); @@ -263,21 +260,25 @@ public class MusicMixin implements MusicInterface { */ @Override public void createKeyspace() throws MDBCServiceException { + createKeyspace(this.music_ns,this.music_rfactor); + } + public static void createKeyspace(String keyspace, int replicationFactor) throws MDBCServiceException { Map<String,Object> replicationInfo = new HashMap<>(); replicationInfo.put("'class'", "'SimpleStrategy'"); - replicationInfo.put("'replication_factor'", music_rfactor); + replicationInfo.put("'replication_factor'", replicationFactor); PreparedQueryObject queryObject = new PreparedQueryObject(); queryObject.appendQueryString( - "CREATE KEYSPACE IF NOT EXISTS " + this.music_ns + + "CREATE KEYSPACE IF NOT EXISTS " + keyspace + " WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":")); try { MusicCore.nonKeyRelatedPut(queryObject, "eventual"); } catch (MusicServiceException e) { - if (!e.getMessage().equals("Keyspace "+music_ns+" already exists")) { - throw new MDBCServiceException("Error creating namespace: "+music_ns+". Internal error:"+e.getErrorMessage(), e); + if (!e.getMessage().equals("Keyspace "+keyspace+" already exists")) { + throw new MDBCServiceException("Error creating namespace: "+keyspace+". Internal error:"+e.getErrorMessage(), + e); } } } @@ -1335,6 +1336,7 @@ public class MusicMixin implements MusicInterface { //0. See if reference to lock was already created String lockId = partition.getLockId(); if(mriIndex==null || lockId == null || lockId.isEmpty()) { + //\TODO fix this own(partition.getSnapshot(),partition, MDBCUtils.generateTimebasedUniqueKey()); } @@ -1361,7 +1363,7 @@ public class MusicMixin implements MusicInterface { } catch (IOException e) { throw new MDBCServiceException("Failed to serialized transaction digest with error " + e.toString(), e); } - MusicTxDigestId digestId = new MusicTxDigestId(commitId, -1); + MusicTxDigestId digestId = new MusicTxDigestId(mriIndex, -1); addTxDigest(digestId, serializedTransactionDigest); //2. Save RRT index to RQ if (progressKeeper != null) { @@ -1369,6 +1371,20 @@ public class MusicMixin implements MusicInterface { } //3. Append RRT index into the corresponding TIT row array appendToRedoLog(partition, digestId); + List<Range> ranges = partition.getSnapshot(); + for(Range r : ranges) { + if(!alreadyApplied.containsKey(r)){ + throw new MDBCServiceException("already applied data structure was not updated correctly and range " + +r+" is not contained"); + } + Pair<MriReference, Integer> rowAndIndex = alreadyApplied.get(r); + MriReference key = rowAndIndex.getKey(); + if(!mriIndex.equals(key.index)){ + throw new MDBCServiceException("already applied data structure was not updated correctly and range "+ + r+" is not pointing to row: "+mriIndex.toString()); + } + alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), rowAndIndex.getValue()+1)); + } } } @@ -1936,8 +1952,17 @@ public class MusicMixin implements MusicInterface { return ecDigestList; } + @Override + public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException { + List<Range> snapshot = partition.getSnapshot(); + UUID row = partition.getMRIIndex(); + for(Range r : snapshot){ + alreadyApplied.put(r,Pair.of(new MriReference(row),-1)); + } + + } + - ResultSet getAllMriCassandraRows() throws MDBCServiceException { StringBuilder cqlOperation = new StringBuilder(); cqlOperation.append("SELECT * FROM ") diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java index 00abe85..64f4e0c 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java @@ -949,9 +949,12 @@ NEW.field refers to the new value StringBuilder keyCondStmt = new StringBuilder(); String and = ""; for (String key: primaryKeys.keySet()) { - Object val = primaryKeys.get(key); - keyCondStmt.append(and + key + "=\"" + val + "\""); - and = " AND "; + // We cannot use the default primary key for the sql table and operations + if(!key.equals(mi.getMusicDefaultPrimaryKeyName())) { + Object val = primaryKeys.get(key); + keyCondStmt.append(and + key + "=\"" + val + "\""); + and = " AND "; + } } return keyCondStmt.toString(); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java index 68d1f19..02c5d7b 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java @@ -29,6 +29,7 @@ import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MriRowComparator; import org.onap.music.mdbc.tables.MusicRangeInformationRow; @@ -231,15 +232,15 @@ public class Dag { return toApplyNodes.isEmpty(); } - public void setAlreadyApplied(Map<Range, Pair<MusicRangeInformationRow,Integer>> alreadyApplied, Set<Range> ranges) + public void setAlreadyApplied(Map<Range, Pair<MriReference,Integer>> alreadyApplied, Set<Range> ranges) throws MDBCServiceException { for(Map.Entry<UUID,DagNode> node : nodes.entrySet()){ Set<Range> intersection = new HashSet<>(ranges); intersection.retainAll(node.getValue().getRangeSet()); for(Range r : intersection){ if(alreadyApplied.containsKey(r)){ - final Pair<MusicRangeInformationRow, Integer> appliedPair = alreadyApplied.get(r); - final MusicRangeInformationRow appliedRow = appliedPair.getKey(); + final Pair<MriReference, Integer> appliedPair = alreadyApplied.get(r); + final MriReference appliedRow = appliedPair.getKey(); final int index = appliedPair.getValue(); final long appliedTimestamp = appliedRow.getTimestamp(); final long nodeTimestamp = node.getValue().getTimestamp(); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java index 4ccd21d..8ec1793 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java @@ -33,6 +33,7 @@ import org.onap.music.mdbc.Range; import org.onap.music.mdbc.mixins.DBInterface; import org.onap.music.mdbc.mixins.LockResult; import org.onap.music.mdbc.mixins.MusicInterface; +import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.StagingTable; @@ -42,7 +43,7 @@ public class OwnershipAndCheckpoint{ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(OwnershipAndCheckpoint.class); private Lock checkpointLock; private AtomicBoolean change; - private Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied; + private Map<Range, Pair<MriReference, Integer>> alreadyApplied; private Map<UUID,Long> ownershipBeginTime; private long timeoutInMs; @@ -50,7 +51,7 @@ public class OwnershipAndCheckpoint{ this(new HashMap<>(),Long.MAX_VALUE); } - public OwnershipAndCheckpoint(Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied, long timeoutInMs){ + public OwnershipAndCheckpoint(Map<Range, Pair<MriReference, Integer>> alreadyApplied, long timeoutInMs){ change = new AtomicBoolean(true); checkpointLock = new ReentrantLock(); this.alreadyApplied = alreadyApplied; @@ -114,6 +115,9 @@ public class OwnershipAndCheckpoint{ public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, List<Range> ranges, Map<MusicRangeInformationRow, LockResult> locks, UUID ownOpId) throws MDBCServiceException { + if(ranges.isEmpty()){ + return; + } try { checkpointLock.lock(); change.set(true); @@ -156,6 +160,9 @@ public class OwnershipAndCheckpoint{ } public void warmup(MusicInterface mi, DBInterface di, List<Range> ranges) throws MDBCServiceException { + if(ranges.isEmpty()){ + return; + } boolean ready = false; change.set(true); Set<Range> rangeSet = new HashSet<Range>(ranges); @@ -181,7 +188,8 @@ public class OwnershipAndCheckpoint{ final HashMap<Range, StagingTable> txDigest = mi.getTxDigest(pair.getKey()); applyTxDigest(di, txDigest); for (Range r : pair.getValue()) { - alreadyApplied.put(r, Pair.of(node.getRow(), pair.getKey().index)); + MusicRangeInformationRow row = node.getRow(); + alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index)); } } pair = node.nextNotAppliedTransaction(rangeSet); @@ -208,7 +216,8 @@ public class OwnershipAndCheckpoint{ final HashMap<Range, StagingTable> txDigest = mi.getTxDigest(pair.getKey()); applyTxDigest(db, txDigest); for (Range r : pair.getValue()) { - alreadyApplied.put(r, Pair.of(node.getRow(), pair.getKey().index)); + MusicRangeInformationRow row = node.getRow(); + alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index)); } pair = node.nextNotAppliedTransaction(rangeSet); if (timeout(ownOpId)) { diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java index 69f2c31..8aad335 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java @@ -28,4 +28,6 @@ public final class MriReference { this.index= index; } + public long getTimestamp() { return index.timestamp();} + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java index 1da2d79..3b6953c 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java @@ -21,10 +21,7 @@ package org.onap.music.mdbc.tables; import java.sql.SQLException; import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.concurrent.TimeUnit; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; @@ -70,23 +67,26 @@ public class MusicTxDigest { continue; } //2) for each partition I don't own - List<DatabasePartition> ranges = stateManager.getRanges(); - if(ranges.size()!=0) { - DatabasePartition myPartition = ranges.get(0); - for (UUID partition : partitions) { - if (!partition.equals(myPartition.getMRIIndex())) { - try { - //replayDigestForPartition(mi, partition, dbi); - mi.getOwnAndCheck().warmup(mi, dbi, myPartition.getSnapshot()); - } catch (MDBCServiceException e) { - logger.error("Unable to update for partition : " + partition + ". " + e.getMessage()); - continue; - } - } - } - } - - //Step 3: ReplayDigest() for E.C conditions + final List<Range> warmuplist = stateManager.getWarmupRanges(); + if(warmuplist!=null) { + final Set<Range> warmupRanges = new HashSet(warmuplist); + final List<DatabasePartition> currentPartitions = stateManager.getPartitions(); + List<Range> missingRanges = new ArrayList<>(); + if (currentPartitions.size() != 0) { + for (DatabasePartition part : currentPartitions) { + List<Range> partitionRanges = part.getSnapshot(); + warmupRanges.removeAll(partitionRanges); + } + try { + mi.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges)); + } catch (MDBCServiceException e) { + logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage()); + continue; + } + } + } + + //Step 3: ReplayDigest() for E.C conditions try { replayDigest(mi,dbi); } catch (MDBCServiceException e) { diff --git a/mdbc-server/src/main/resources/music.properties b/mdbc-server/src/main/resources/music.properties index a676f70..21f3e92 100755 --- a/mdbc-server/src/main/resources/music.properties +++ b/mdbc-server/src/main/resources/music.properties @@ -1,5 +1,5 @@ cassandra.host =\ - 143.215.128.49 + 192.168.1.19 cassandra.user =\ metric cassandra.password =\ diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java index 85e31cd..da64595 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java @@ -34,6 +34,7 @@ import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.MusicTxDigestId; @@ -191,7 +192,7 @@ public class DagTest { @Test public void nextToApply2() throws InterruptedException, MDBCServiceException { - Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied = new HashMap<>(); + Map<Range, Pair<MriReference, Integer>> alreadyApplied = new HashMap<>(); List<MusicRangeInformationRow> rows = new ArrayList<>(); List<Range> ranges = new ArrayList<>( Arrays.asList( new Range("range1") @@ -206,7 +207,7 @@ public class DagTest { new MusicTxDigestId(MDBCUtils.generateUniqueKey(),1) )); MusicRangeInformationRow newRow = createNewRow(new ArrayList<>(ranges), "", false, redo2); - alreadyApplied.put(new Range("range1"),Pair.of(newRow, 0)); + alreadyApplied.put(new Range("range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0)); rows.add(newRow); MILLISECONDS.sleep(10); List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList( |