aboutsummaryrefslogtreecommitdiffstats
path: root/mdbc-server/src/main/java
diff options
context:
space:
mode:
authorTschaen, Brendan <ctschaen@att.com>2019-03-05 17:46:20 -0500
committerTschaen, Brendan <ctschaen@att.com>2019-03-05 17:46:20 -0500
commitb4e66e8087274a656b1301ecb55fee4af183bf36 (patch)
treec0281edeeaf8a95ff4e55a314634dd659ebc8151 /mdbc-server/src/main/java
parent77212c23b8ec060c3b15ab21c33502b1bd24e858 (diff)
Remove ownership logic from mixin
Change-Id: I70d62e76e23c690726294c62b9222c4cd9659c70 Issue-ID: MUSIC-326 Signed-off-by: Tschaen, Brendan <ctschaen@att.com>
Diffstat (limited to 'mdbc-server/src/main/java')
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java6
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java20
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockRequest.java69
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java56
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java7
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java5
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java21
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java226
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java158
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java2
10 files changed, 336 insertions, 234 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
index 0793a67..7377c4f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
@@ -507,7 +507,7 @@ public class MdbcConnection implements Connection {
DatabasePartition tempPartition = own(scQueryTables);
if(tempPartition!=null && tempPartition != partition) {
this.partition.updateDatabasePartition(tempPartition);
- mi.reloadAlreadyApplied(this.partition);
+ statemanager.getOwnAndCheck().reloadAlreadyApplied(this.partition);
}
dbi.preStatementHook(sql);
}
@@ -575,10 +575,10 @@ public class MdbcConnection implements Connection {
return null;
}
DatabasePartition newPartition = null;
- OwnershipAndCheckpoint ownAndCheck = mi.getOwnAndCheck();
+ OwnershipAndCheckpoint ownAndCheck = statemanager.getOwnAndCheck();
UUID ownOpId = MDBCUtils.generateTimebasedUniqueKey();
try {
- final OwnershipReturn ownershipReturn = mi.own(ranges, partition, ownOpId);
+ final OwnershipReturn ownershipReturn = ownAndCheck.own(mi, ranges, partition, ownOpId);
if(ownershipReturn==null){
return null;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index 6cc50ec..8e7976f 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.tuple.Pair;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.logging.format.AppMessages;
@@ -33,6 +34,8 @@ import org.onap.music.mdbc.mixins.DBInterface;
import org.onap.music.mdbc.mixins.MixinFactory;
import org.onap.music.mdbc.mixins.MusicInterface;
import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn;
+import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
+import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicTxDigest;
import org.onap.music.mdbc.tables.TxCommitProgress;
@@ -79,6 +82,11 @@ public class StateManager {
String cassandraUrl;
private Properties info;
+ /** The property name to use to provide a timeout to mdbc (ownership) */
+ public static final String KEY_TIMEOUT = "mdbc_timeout";
+ /** The default property value to use for the MDBC timeout */
+ public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours
+
/** Identifier for this server instance */
private String mdbcServerName;
private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition
@@ -86,6 +94,8 @@ public class StateManager {
private List<Range> eventualRanges;
private final Lock warmupLock = new ReentrantLock();
private List<Range> warmupRanges;
+ private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
+ private OwnershipAndCheckpoint ownAndCheck;
public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
this.sqlDBName = sqlDBName;
@@ -108,6 +118,10 @@ public class StateManager {
initMusic();
initSqlDatabase();
+ String t = info.getProperty(KEY_TIMEOUT);
+ long timeout = (t == null) ? DEFAULT_TIMEOUT : Integer.parseInt(t);
+ alreadyApplied = new ConcurrentHashMap<>();
+ ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout);
MusicTxDigest txDaemon = new MusicTxDigest(this);
txDaemon.startBackgroundDaemon(Integer.parseInt(
@@ -119,7 +133,7 @@ public class StateManager {
* @throws MDBCServiceException
*/
protected void initMusic() throws MDBCServiceException {
- this.musicInterface = MixinFactory.createMusicInterface(musicmixin, mdbcServerName, info);
+ this.musicInterface = MixinFactory.createMusicInterface(this, musicmixin, mdbcServerName, info);
this.mdbcConnections = new HashMap<>();
}
@@ -326,4 +340,8 @@ public class StateManager {
warmupLock.unlock();
}
}
+
+ public OwnershipAndCheckpoint getOwnAndCheck() {
+ return ownAndCheck;
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockRequest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockRequest.java
new file mode 100644
index 0000000..e8b400b
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockRequest.java
@@ -0,0 +1,69 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc.mixins;
+
+import java.util.List;
+import java.util.UUID;
+import org.onap.music.mdbc.Range;
+
+public class LockRequest {
+ private final String table;
+ private final UUID id;
+ private final List<Range> toLockRanges;
+ private int numOfAttempts;
+
+ /**
+ *
+ * @param table
+ * @param id
+ * @param toLockRanges
+ */
+ public LockRequest(String table, UUID id, List<Range> toLockRanges) {
+ this.table = table;
+ this.id = id;
+ this.toLockRanges = toLockRanges;
+ numOfAttempts = 1;
+ }
+
+ public UUID getId() {
+ return id;
+ }
+
+ public List<Range> getToLockRanges() {
+ return toLockRanges;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ /**
+ * Number of times you've requested this lock
+ * @return
+ */
+ public int getNumOfAttempts() {
+ return numOfAttempts;
+ }
+
+ public void incrementAttempts() {
+ numOfAttempts++;
+ }
+} \ No newline at end of file
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java
index 7dd92c4..8055cae 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java
@@ -25,24 +25,68 @@ import java.util.UUID;
import org.onap.music.mdbc.Range;
public class LockResult{
- private final UUID musicRangeInformationIndex;
- private final String ownerId;
+ private boolean successful;
+ private UUID musicRangeInformationIndex;
+ private String ownerId;
private List<Range> ranges;
- private final boolean newLock;
+ private boolean newLock;
+ private long backOffPeriodS;
+ public LockResult(boolean succesful, UUID rowId, String ownerId, boolean newLock, List<Range> ranges){
+ this.successful = true;
+ this.musicRangeInformationIndex = rowId;
+ this.ownerId=ownerId;
+ this.newLock=newLock;
+ this.ranges=ranges;
+ }
+ /**
+ * Please use constructor which specifies whether lock result was succesful
+ * @param rowId
+ * @param ownerId
+ * @param newLock
+ * @param ranges
+ */
+ @Deprecated
public LockResult(UUID rowId, String ownerId, boolean newLock, List<Range> ranges){
+ this.successful = true;
this.musicRangeInformationIndex = rowId;
this.ownerId=ownerId;
this.newLock=newLock;
this.ranges=ranges;
}
+ public LockResult(boolean successful, long backOffTimems) {
+ this.successful = successful;
+ this.backOffPeriodS = backOffTimems;
+ }
+
+ public boolean wasSuccessful() {
+ return successful;
+ }
+
public String getOwnerId(){
return ownerId;
}
public boolean isNewLock(){
return newLock;
}
- public UUID getIndex() {return musicRangeInformationIndex;}
- public List<Range> getRanges() {return ranges;}
- public void addRange(Range range){ranges.add(range);}
+ public UUID getIndex() {
+ return musicRangeInformationIndex;
+ }
+
+ public List<Range> getRanges() {
+ return ranges;
+ }
+
+ public void addRange(Range range) {
+ ranges.add(range);
+ }
+
+ /**
+ * Get the backOffPeriod, in milliseconds, requested by mixin
+ * @return
+ */
+ public long getBackOffPeriod() {
+ return this.backOffPeriodS;
+ }
+
} \ No newline at end of file
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
index 1edb38d..d822615 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
@@ -25,6 +25,7 @@ import java.sql.Connection;
import java.util.Properties;
import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.StateManager;
/**
* This class is used to construct instances of Mixins that implement either the {@link org.onap.music.mdbc.mixins.DBInterface}
@@ -78,7 +79,7 @@ public class MixinFactory {
* @param info the Properties to use as an argument to the constructor
* @return the newly constructed MusicInterface, or null if one cannot be found.
*/
- public static MusicInterface createMusicInterface(String name, String mdbcServerName, Properties info) {
+ public static MusicInterface createMusicInterface(StateManager statemanager, String name, String mdbcServerName, Properties info) {
for (Class<?> cl : Utils.getClassesImplementing(MusicInterface.class)) {
try {
Constructor<?> con = cl.getConstructor();
@@ -87,10 +88,10 @@ public class MixinFactory {
String miname = mi.getMixinName();
logger.info(EELFLoggerDelegate.applicationLogger, "Checking "+miname);
if (miname.equalsIgnoreCase(name)) {
- con = cl.getConstructor(String.class, Properties.class);
+ con = cl.getConstructor(StateManager.class, String.class, Properties.class);
if (con != null) {
logger.info(EELFLoggerDelegate.applicationLogger,"Found match: "+miname);
- return (MusicInterface) con.newInstance(mdbcServerName, info);
+ return (MusicInterface) con.newInstance(statemanager, mdbcServerName, info);
}
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java
index 8181159..591f830 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java
@@ -37,6 +37,7 @@ import org.onap.music.main.ReturnType;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.DatabasePartition;
+import org.onap.music.mdbc.StateManager;
import org.onap.music.mdbc.TableInfo;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
@@ -58,8 +59,8 @@ public class Music2Mixin extends MusicMixin {
super();
}
- public Music2Mixin(String url, Properties info) throws MDBCServiceException {
- super(url, info);
+ public Music2Mixin(StateManager stateManager, String url, Properties info) throws MDBCServiceException {
+ super(stateManager, url, info);
}
/**
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
index c38efb7..00f6d00 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
@@ -211,6 +211,8 @@ public interface MusicInterface {
*/
RangeDependency getMusicRangeDependency(Range baseRange) throws MDBCServiceException;
+ List<Range> getRangeDependencies(List<Range> range) throws MDBCServiceException;
+
/**
* This function is used to create a new row in the MRI table
* @param info the information used to create the row
@@ -269,17 +271,6 @@ public interface MusicInterface {
StagingTable getTxDigest(MusicTxDigestId id) throws MDBCServiceException;
/**
- * Use this functions to verify ownership, and own new ranges
- * @param ranges the ranges that should be own after calling this function
- * @param partition current information of the ownership in the system
- * @param ownOpId is the id used to describe this ownership operation (it is not used to create the new row, if any is
- * required
- * @return an object indicating the status of the own function result
- * @throws MDBCServiceException
- */
- OwnershipReturn own(List<Range> ranges, DatabasePartition partition, UUID ownOpId) throws MDBCServiceException;
-
- /**
* This function relinquish ownership, if it is time to do it, it should be used at the end of a commit operation
* @param partition information of the partition that is currently being owned
* @throws MDBCServiceException
@@ -326,11 +317,11 @@ public interface MusicInterface {
void deleteOldMriRows(Map<UUID,String> oldRowsAndLocks) throws MDBCServiceException;
List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException;
-
- OwnershipAndCheckpoint getOwnAndCheck();
-
- void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException;
public void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException;
+ public LockResult requestLock(LockRequest request) throws MDBCServiceException;
+ public void releaseLocks(Map<UUID, LockResult> newLocks) throws MDBCServiceException;
+ public OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges,
+ Map<UUID, LockResult> locks, UUID ownershipId) throws MDBCServiceException;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index 8a2ef6f..ffb6c87 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -56,6 +56,7 @@ import org.onap.music.main.ReturnType;
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.StateManager;
import org.onap.music.mdbc.TableInfo;
import org.onap.music.mdbc.ownership.Dag;
import org.onap.music.mdbc.ownership.DagNode;
@@ -105,16 +106,12 @@ public class MusicMixin implements MusicInterface {
public static final String KEY_MUSIC_RFACTOR = "music_rfactor";
/** The property name to use to provide the replication factor for Cassandra. */
public static final String KEY_MUSIC_NAMESPACE = "music_namespace";
- /** The property name to use to provide a timeout to mdbc (ownership) */
- public static final String KEY_TIMEOUT = "mdbc_timeout";
/** Namespace for the tables in MUSIC (Cassandra) */
public static final String DEFAULT_MUSIC_NAMESPACE = "namespace";
/** The default property value to use for the Cassandra IP address. */
public static final String DEFAULT_MUSIC_ADDRESS = "localhost";
/** The default property value to use for the Cassandra replication factor. */
public static final int DEFAULT_MUSIC_RFACTOR = 1;
- /** The default property value to use for the MDBC timeout */
- public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours
/** The default primary string column, if none is provided. */
public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid";
/** Type of the primary key, if none is defined by the user */
@@ -124,7 +121,7 @@ public class MusicMixin implements MusicInterface {
//\TODO Add logic to change the names when required and create the tables when necessary
private String musicTxDigestTableName = "musictxdigest";
private String musicEventualTxDigestTableName = "musicevetxdigest";
- private String musicRangeInformationTableName = "musicrangeinformation";
+ public static final String musicRangeInformationTableName = "musicrangeinformation";
private String musicRangeDependencyTableName = "musicrangedependency";
private String musicNodeInfoTableName = "nodeinfo";
@@ -157,27 +154,6 @@ public class MusicMixin implements MusicInterface {
}
}
- private class LockRequest{
- private final String table;
- private final UUID id;
- private final List<Range> toLockRanges;
- public LockRequest(String table, UUID id, List<Range> toLockRanges){
- this.table=table;
- this.id=id;
- this.toLockRanges=toLockRanges;
- }
- public UUID getId() {
- return id;
- }
- public List<Range> getToLockRanges() {
- return toLockRanges;
- }
-
- public String getTable() {
- return table;
- }
- }
-
private static final Map<Integer, String> typemap = new HashMap<>();
static {
@@ -207,14 +183,13 @@ public class MusicMixin implements MusicInterface {
protected final String[] allReplicaIds;
private final String musicAddress;
private final int music_rfactor;
- protected long timeout;
private MusicConnector mCon = null;
private Session musicSession = null;
private boolean keyspace_created = false;
private Map<String, PreparedStatement> ps_cache = new HashMap<>();
private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>());
- private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
- private OwnershipAndCheckpoint ownAndCheck;
+ private StateManager stateManager;
+
public MusicMixin() {
@@ -226,7 +201,7 @@ public class MusicMixin implements MusicInterface {
this.allReplicaIds = null;
}
- public MusicMixin(String mdbcServerName, Properties info) throws MDBCServiceException {
+ public MusicMixin(StateManager stateManager, String mdbcServerName, Properties info) throws MDBCServiceException {
// Default values -- should be overridden in the Properties
// Default to using the host_ids of the various peers as the replica IDs (this is probably preferred)
this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS);
@@ -245,12 +220,8 @@ public class MusicMixin implements MusicInterface {
this.music_ns = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE);
logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns);
- String t = info.getProperty(KEY_TIMEOUT);
- this.timeout = (t == null) ? DEFAULT_TIMEOUT : Integer.parseInt(t);
-
- alreadyApplied = new ConcurrentHashMap<>();
- ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied,timeout);
-
+ this.stateManager = stateManager;
+
initializeMetricTables();
}
@@ -1354,7 +1325,7 @@ public class MusicMixin implements MusicInterface {
*/
@Override
public void commitLog(DatabasePartition partition,List<Range> eventualRanges, StagingTable transactionDigest,
- String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{
+ String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException {
// first deal with commit for eventually consistent tables
filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper);
@@ -1374,8 +1345,7 @@ public class MusicMixin implements MusicInterface {
//0. See if reference to lock was already created
String lockId = partition.getLockId();
if(mriIndex==null || lockId == null || lockId.isEmpty()) {
- //\TODO fix this
- own(partition.getSnapshot(),partition, MDBCUtils.generateTimebasedUniqueKey());
+ throw new MDBCServiceException("Not able to commit, as you are no longer the lock-holder for this partition");
}
UUID commitId;
@@ -1407,6 +1377,7 @@ public class MusicMixin implements MusicInterface {
appendToRedoLog(partition, digestId);
List<Range> ranges = partition.getSnapshot();
for(Range r : ranges) {
+ Map<Range, Pair<MriReference, Integer>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied();
if(!alreadyApplied.containsKey(r)){
throw new MDBCServiceException("already applied data structure was not updated correctly and range "
+r+" is not contained");
@@ -1420,12 +1391,7 @@ public class MusicMixin implements MusicInterface {
alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), rowAndIndex.getValue()+1));
}
}
- }
-
- public void cleanAlreadyApplied(){
- logger.warn("Use this only in test environments");
- alreadyApplied.clear();
- }
+ }
private void filterAndAddEventualTxDigest(List<Range> eventualRanges,
StagingTable transactionDigest, String txId,
@@ -1585,7 +1551,7 @@ public class MusicMixin implements MusicInterface {
}
@Override
- public RangeDependency getMusicRangeDependency(Range baseRange) throws MDBCServiceException{
+ public RangeDependency getMusicRangeDependency(Range baseRange) throws MDBCServiceException {
String cql = String.format("SELECT * FROM %s.%s WHERE range = ?;", music_ns, musicRangeDependencyTableName);
PreparedQueryObject pQueryObject = new PreparedQueryObject();
pQueryObject.appendQueryString(cql);
@@ -1964,15 +1930,7 @@ public class MusicMixin implements MusicInterface {
return ecDigestInformation;
}
- @Override
- public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException {
- List<Range> snapshot = partition.getSnapshot();
- UUID row = partition.getMRIIndex();
- for(Range r : snapshot){
- alreadyApplied.put(r,Pair.of(new MriReference(row),-1));
- }
-
- }
+
ResultSet getAllMriCassandraRows() throws MDBCServiceException {
@@ -2076,13 +2034,13 @@ public class MusicMixin implements MusicInterface {
private List<Range> lockRow(LockRequest request, DatabasePartition partition,Map<UUID, LockResult> rowLock)
throws MDBCServiceException {
- if(partition.getMRIIndex().equals(request.id) && partition.isLocked()){
+ if(partition.getMRIIndex().equals(request.getId()) && partition.isLocked()){
return new ArrayList<>();
}
//\TODO: this function needs to be improved, to track possible changes in the owner of a set of ranges
- String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+request.id.toString();
+ String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+request.getId().toString();
//return List<Range> knownRanges, UUID mriIndex, String lockId
- DatabasePartition newPartition = new DatabasePartition(request.toLockRanges,request.id,null);
+ DatabasePartition newPartition = new DatabasePartition(request.getToLockRanges(),request.getId(),null);
return waitForLock(request,newPartition,rowLock);
}
@@ -2091,7 +2049,8 @@ public class MusicMixin implements MusicInterface {
MusicCore.destroyLockRef(fullyQualifiedKey,lockref);
}
- private void releaseLocks(Map<UUID,LockResult> newLocks) throws MDBCServiceException{
+ @Override
+ public void releaseLocks(Map<UUID,LockResult> newLocks) throws MDBCServiceException{
for(Map.Entry<UUID,LockResult> lock : newLocks.entrySet()) {
unlockKeyInMusic(musicRangeInformationTableName, lock.getKey().toString(), lock.getValue().getOwnerId());
}
@@ -2126,7 +2085,8 @@ public class MusicMixin implements MusicInterface {
* @return
* @throws MDBCServiceException
*/
- private List<Range> getRangeDependencies(List<Range> range) throws MDBCServiceException{
+ @Override
+ public List<Range> getRangeDependencies(List<Range> range) throws MDBCServiceException{
Set<Range> extendedRange = new HashSet<>();
for(Range r: range){
extendedRange.add(r);
@@ -2138,30 +2098,22 @@ public class MusicMixin implements MusicInterface {
return new ArrayList<>(extendedRange);
}
- private LockResult waitForLock(LockRequest request) throws MDBCServiceException{
+ @Override
+ public LockResult requestLock(LockRequest request) throws MDBCServiceException{
String fullyQualifiedKey= music_ns+"."+ request.getTable()+"."+request.getId();
String lockId = MusicCore.createLockReference(fullyQualifiedKey);
ReturnType lockReturn = acquireLock(fullyQualifiedKey,lockId);
- if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) {
+ if(lockReturn.getResult() == ResultType.FAILURE) {
//\TODO Improve the exponential backoff
- int n = 1;
+ int n = request.getNumOfAttempts();
int low = 1;
int high = 1000;
Random r = new Random();
- while(MusicCore.whoseTurnIsIt(fullyQualifiedKey) != lockId){
- try {
- Thread.sleep(((int) Math.round(Math.pow(2, n)) * 1000)
- + (r.nextInt(high - low) + low));
- } catch (InterruptedException e) {
- continue;
- }
- n++;
- if (n == 20) {
- throw new MDBCServiceException("Lock was impossible to obtain, waited for 20 exponential backoffs!");
- }
- }
+ long backOffTimems = ((int) Math.round(Math.pow(2, n)) * 1000)
+ + (r.nextInt(high - low) + low);
+ return new LockResult(false, backOffTimems);
}
- return new LockResult(request.id,lockId,true,null);
+ return new LockResult(true, request.getId(),lockId,true,null);
}
private void recoverFromFailureAndUpdateDag(Dag latestDag,List<MusicRangeInformationRow> rows,List<Range> ranges,
@@ -2197,7 +2149,8 @@ public class MusicMixin implements MusicInterface {
return newRow;
}
- private OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges,
+ @Override
+ public OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges,
Map<UUID,LockResult> locks, UUID ownershipId) throws MDBCServiceException{
recoverFromFailureAndUpdateDag(extendedDag,latestRows,ranges,locks);
List<MusicRangeInformationRow> changed = setReadOnlyAnyDoubleRow(extendedDag, latestRows,locks);
@@ -2221,119 +2174,7 @@ public class MusicMixin implements MusicInterface {
return new OwnershipReturn(ownershipId, ownRow.getOwnerId(), ownRow.getIndex(),ranges,extendedDag);
}
- // \TODO merge with dag code
- private Map<Range,Set<DagNode>> getIsLatestPerRange(Dag dag, List<MusicRangeInformationRow> rows) throws MDBCServiceException {
- Map<Range,Set<DagNode>> rowsPerLatestRange = new HashMap<>();
- for(MusicRangeInformationRow row : rows){
- DatabasePartition dbPartition = row.getDBPartition();
- if (row.getIsLatest()) {
- for(Range range : dbPartition.getSnapshot()){
- if(!rowsPerLatestRange.containsKey(range)){
- rowsPerLatestRange.put(range,new HashSet<>());
- }
- DagNode node = dag.getNode(row.getPartitionIndex());
- if(node!=null) {
- rowsPerLatestRange.get(range).add(node);
- }
- else{
- rowsPerLatestRange.get(range).add(new DagNode(row));
- }
- }
- }
- }
- return rowsPerLatestRange;
- }
-
- /**
- * Take locking ownership of each range
- * @param ranges - ranges that need to be owned
- * @param partition - current partition owned
- * @param opId
- */
- @Override
- public OwnershipReturn own(List<Range> ranges, DatabasePartition partition, UUID opId) throws MDBCServiceException {
-
- if(ranges == null || ranges.isEmpty()) {
- return null;
- }
-
- Map<UUID,LockResult> newLocks = new HashMap<>();
- //Init timeout clock
- ownAndCheck.startOwnershipTimeoutClock(opId);
- if(partition.isLocked()&&partition.getSnapshot().containsAll(ranges)) {
- return new OwnershipReturn(opId,partition.getLockId(),partition.getMRIIndex(),partition.getSnapshot(),null);
- }
- //Find
- List<Range> rangesToOwn = getRangeDependencies(ranges);
- List<MusicRangeInformationRow> allMriRows = getAllMriRows();
- List<MusicRangeInformationRow> rows = ownAndCheck.extractRowsForRange(allMriRows,rangesToOwn, false);
- Dag toOwn = Dag.getDag(rows,rangesToOwn);
- Dag currentlyOwn = new Dag();
- while( (toOwn.isDifferent(currentlyOwn) || !currentlyOwn.isOwned() ) &&
- !ownAndCheck.timeout(opId)
- ){
- takeOwnershipOfDag(partition, opId, newLocks, toOwn);
- currentlyOwn=toOwn;
- //TODO instead of comparing dags, compare rows
- allMriRows = getAllMriRows();
- rows = ownAndCheck.extractRowsForRange(allMriRows,rangesToOwn,false);
- toOwn = Dag.getDag(rows,rangesToOwn);
- }
- if(!currentlyOwn.isOwned() || toOwn.isDifferent(currentlyOwn)){
- releaseLocks(newLocks);
- ownAndCheck.stopOwnershipTimeoutClock(opId);
- logger.error("Error when owning a range: Timeout");
- throw new MDBCServiceException("Ownership timeout");
- }
- Set<Range> allRanges = currentlyOwn.getAllRanges();
- List<MusicRangeInformationRow> isLatestRows = ownAndCheck.extractRowsForRange(allMriRows, new ArrayList<>(allRanges), true);
- currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,isLatestRows));
- return mergeLatestRows(currentlyOwn,rows,ranges,newLocks,opId);
- }
-
- /**
- * Step through dag and take lock ownership of each range
- * @param partition
- * @param opId
- * @param newLocks
- * @param toOwn
- * @throws MDBCServiceException
- */
- private void takeOwnershipOfDag(DatabasePartition partition, UUID opId, Map<UUID, LockResult> newLocks, Dag toOwn)
- throws MDBCServiceException {
- while(toOwn.hasNextToOwn()){
- DagNode node = toOwn.nextToOwn();
- MusicRangeInformationRow row = node.getRow();
- UUID uuid = row.getPartitionIndex();
- if(partition.isLocked()&&partition.getMRIIndex().equals(uuid)||
- newLocks.containsKey(uuid) ||
- !row.getIsLatest()){
- toOwn.setOwn(node);
- }
- else{
- LockResult lockResult = null;
- boolean owned = false;
- while(!owned && !ownAndCheck.timeout(opId)){
- try {
- LockRequest request = new LockRequest(musicRangeInformationTableName,uuid,
- new ArrayList(node.getRangeSet()));
- lockResult = waitForLock(request);
- owned = true;
- }
- catch (MDBCServiceException e){
- logger.warn("Locking failed, retrying",e);
- }
- }
- if(owned){
- toOwn.setOwn(node);
- newLocks.put(uuid,lockResult);
- }
- else{
- break;
- }
- }
- }
- }
+
/**
* This function is used to check if we need to create a new row in MRI, beacause one of the new ranges is not contained
@@ -2578,11 +2419,6 @@ public class MusicMixin implements MusicInterface {
executeMusicLockedDelete(music_ns,musicRangeInformationTableName,rows.getKey().toString(),rows.getValue());
}
}
-
- @Override
- public OwnershipAndCheckpoint getOwnAndCheck(){
- return ownAndCheck;
- }
@Override
public void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException{
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
index f72b0ec..ddf26ce 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
@@ -29,10 +29,14 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.mixins.DBInterface;
+import org.onap.music.mdbc.mixins.LockRequest;
import org.onap.music.mdbc.mixins.LockResult;
import org.onap.music.mdbc.mixins.MusicInterface;
+import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn;
+import org.onap.music.mdbc.mixins.MusicMixin;
import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.MusicTxDigestId;
@@ -84,14 +88,8 @@ public class OwnershipAndCheckpoint{
return false;
}
- /**
- * Extracts all the rows that match any of the ranges.
- * @param allMriRows
- * @param ranges - ranges interested in
- * @param onlyIsLatest - only return the "latest" rows
- * @return
- */
- public List<MusicRangeInformationRow> extractRowsForRange(List<MusicRangeInformationRow> allMriRows, List<Range> ranges,
+
+ private List<MusicRangeInformationRow> extractRowsForRange(List<MusicRangeInformationRow> allMriRows, List<Range> ranges,
boolean onlyIsLatest){
List<MusicRangeInformationRow> rows = new ArrayList<>();
for(MusicRangeInformationRow row : allMriRows){
@@ -114,6 +112,13 @@ public class OwnershipAndCheckpoint{
return rows;
}
+ /**
+ * Extracts all the rows that match any of the ranges.
+ * @param allMriRows
+ * @param ranges - ranges interested in
+ * @param onlyIsLatest - only return the "latest" rows
+ * @return
+ */
private List<MusicRangeInformationRow> extractRowsForRange(MusicInterface music, List<Range> ranges, boolean onlyIsLatest)
throws MDBCServiceException {
final List<MusicRangeInformationRow> allMriRows = music.getAllMriRows();
@@ -248,4 +253,141 @@ public class OwnershipAndCheckpoint{
}
+ /**
+ * Use this functions to verify ownership, and taking locking ownership of new ranges
+ * @param ranges the ranges that should be own after calling this function
+ * @param partition current information of the ownership in the system
+ * @param ownOpId is the id used to describe this ownership operation (it is not used to create the new row, if any is
+ * required
+ * @return an object indicating the status of the own function result
+ * @throws MDBCServiceException
+ */
+ public OwnershipReturn own(MusicInterface mi, List<Range> ranges, DatabasePartition partition, UUID opId) throws MDBCServiceException {
+
+ if(ranges == null || ranges.isEmpty()) {
+ return null;
+ }
+
+ Map<UUID,LockResult> newLocks = new HashMap<>();
+ //Init timeout clock
+ startOwnershipTimeoutClock(opId);
+ if(partition.isLocked()&&partition.getSnapshot().containsAll(ranges)) {
+ return new OwnershipReturn(opId,partition.getLockId(),partition.getMRIIndex(),partition.getSnapshot(),null);
+ }
+ //Find
+ List<Range> rangesToOwn = mi.getRangeDependencies(ranges);
+ List<MusicRangeInformationRow> rows = extractRowsForRange(mi,rangesToOwn, false);
+ Dag toOwn = Dag.getDag(rows,rangesToOwn);
+ Dag currentlyOwn = new Dag();
+ while( (toOwn.isDifferent(currentlyOwn) || !currentlyOwn.isOwned() ) &&
+ !timeout(opId)
+ ){
+ takeOwnershipOfDag(mi, partition, opId, newLocks, toOwn);
+ currentlyOwn=toOwn;
+ //TODO instead of comparing dags, compare rows
+ rows = extractRowsForRange(mi, rangesToOwn, false);
+ toOwn = Dag.getDag(rows,rangesToOwn);
+ }
+ if(!currentlyOwn.isOwned() || toOwn.isDifferent(currentlyOwn)){
+ mi.releaseLocks(newLocks);
+ stopOwnershipTimeoutClock(opId);
+ logger.error("Error when owning a range: Timeout");
+ throw new MDBCServiceException("Ownership timeout");
+ }
+ Set<Range> allRanges = currentlyOwn.getAllRanges();
+ List<MusicRangeInformationRow> isLatestRows = extractRowsForRange(mi, new ArrayList<>(allRanges), true);
+ currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,isLatestRows));
+ return mi.mergeLatestRows(currentlyOwn,rows,ranges,newLocks,opId);
+ }
+
+ /**
+ * Step through dag and take lock ownership of each range
+ * @param partition
+ * @param opId
+ * @param newLocks
+ * @param toOwn
+ * @throws MDBCServiceException
+ */
+ private void takeOwnershipOfDag(MusicInterface mi, DatabasePartition partition, UUID opId, Map<UUID, LockResult> newLocks, Dag toOwn)
+ throws MDBCServiceException {
+ while(toOwn.hasNextToOwn()){
+ DagNode node = toOwn.nextToOwn();
+ MusicRangeInformationRow row = node.getRow();
+ UUID uuid = row.getPartitionIndex();
+ if(partition.isLocked()&&partition.getMRIIndex().equals(uuid)||
+ newLocks.containsKey(uuid) ||
+ !row.getIsLatest()){
+ toOwn.setOwn(node);
+ }
+ else{
+ LockRequest request = new LockRequest(MusicMixin.musicRangeInformationTableName,uuid,
+ new ArrayList(node.getRangeSet()));
+ LockResult result = null;
+ boolean owned = false;
+ while(!owned && !timeout(opId)){
+ try {
+ result = mi.requestLock(request);
+ if (result.wasSuccessful()) {
+ owned = true;
+ continue;
+ }
+ //backOff
+ try {
+ Thread.sleep(result.getBackOffPeriod());
+ } catch (InterruptedException e) {
+ continue;
+ }
+ request.incrementAttempts();
+ }
+ catch (MDBCServiceException e){
+ logger.warn("Locking failed, retrying",e);
+ }
+ }
+ if(owned){
+ toOwn.setOwn(node);
+ newLocks.put(uuid,result);
+ }
+ else{
+ break;
+ }
+ }
+ }
+ }
+
+
+ public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException {
+ List<Range> snapshot = partition.getSnapshot();
+ UUID row = partition.getMRIIndex();
+ for(Range r : snapshot){
+ alreadyApplied.put(r,Pair.of(new MriReference(row),-1));
+ }
+ }
+
+ // \TODO merge with dag code
+ private Map<Range,Set<DagNode>> getIsLatestPerRange(Dag dag, List<MusicRangeInformationRow> rows) throws MDBCServiceException {
+ Map<Range,Set<DagNode>> rowsPerLatestRange = new HashMap<>();
+ for(MusicRangeInformationRow row : rows){
+ DatabasePartition dbPartition = row.getDBPartition();
+ if (row.getIsLatest()) {
+ for(Range range : dbPartition.getSnapshot()){
+ if(!rowsPerLatestRange.containsKey(range)){
+ rowsPerLatestRange.put(range,new HashSet<>());
+ }
+ DagNode node = dag.getNode(row.getPartitionIndex());
+ if(node!=null) {
+ rowsPerLatestRange.get(range).add(node);
+ }
+ else{
+ rowsPerLatestRange.get(range).add(new DagNode(row));
+ }
+ }
+ }
+ }
+ return rowsPerLatestRange;
+ }
+
+ public Map<Range, Pair<MriReference, Integer>> getAlreadyApplied() {
+ return this.alreadyApplied;
+ }
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
index 03db7a7..4db3315 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
@@ -78,7 +78,7 @@ public class MusicTxDigest {
warmupRanges.removeAll(partitionRanges);
}
try {
- mi.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
+ stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
} catch (MDBCServiceException e) {
logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
continue;