diff options
author | Bharath Balasubramanian <bharathb@research.att.com> | 2019-03-12 23:24:52 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-03-12 23:24:52 +0000 |
commit | f61985dedb975a777bed88fb17c482d0dbb73cd8 (patch) | |
tree | e6026e3c8d75468a2c5227401ce2a9560a34d8fb /mdbc-server/src/main | |
parent | 45ffc11e5cecf8112032d9d67c9c652f4836eacd (diff) | |
parent | b4e66e8087274a656b1301ecb55fee4af183bf36 (diff) |
Merge "Remove ownership logic from mixin"
Diffstat (limited to 'mdbc-server/src/main')
10 files changed, 336 insertions, 234 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java index 0793a67..7377c4f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java @@ -507,7 +507,7 @@ public class MdbcConnection implements Connection { DatabasePartition tempPartition = own(scQueryTables); if(tempPartition!=null && tempPartition != partition) { this.partition.updateDatabasePartition(tempPartition); - mi.reloadAlreadyApplied(this.partition); + statemanager.getOwnAndCheck().reloadAlreadyApplied(this.partition); } dbi.preStatementHook(sql); } @@ -575,10 +575,10 @@ public class MdbcConnection implements Connection { return null; } DatabasePartition newPartition = null; - OwnershipAndCheckpoint ownAndCheck = mi.getOwnAndCheck(); + OwnershipAndCheckpoint ownAndCheck = statemanager.getOwnAndCheck(); UUID ownOpId = MDBCUtils.generateTimebasedUniqueKey(); try { - final OwnershipReturn ownershipReturn = mi.own(ranges, partition, ownOpId); + final OwnershipReturn ownershipReturn = ownAndCheck.own(mi, ranges, partition, ownOpId); if(ownershipReturn==null){ return null; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index 6cc50ec..8e7976f 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Set; import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.lang3.tuple.Pair; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.logging.format.AppMessages; @@ -33,6 +34,8 @@ import org.onap.music.mdbc.mixins.DBInterface; import org.onap.music.mdbc.mixins.MixinFactory; import org.onap.music.mdbc.mixins.MusicInterface; import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn; +import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint; +import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MusicTxDigest; import org.onap.music.mdbc.tables.TxCommitProgress; @@ -79,6 +82,11 @@ public class StateManager { String cassandraUrl; private Properties info; + /** The property name to use to provide a timeout to mdbc (ownership) */ + public static final String KEY_TIMEOUT = "mdbc_timeout"; + /** The default property value to use for the MDBC timeout */ + public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours + /** Identifier for this server instance */ private String mdbcServerName; private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition @@ -86,6 +94,8 @@ public class StateManager { private List<Range> eventualRanges; private final Lock warmupLock = new ReentrantLock(); private List<Range> warmupRanges; + private Map<Range, Pair<MriReference, Integer>> alreadyApplied; + private OwnershipAndCheckpoint ownAndCheck; public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException { this.sqlDBName = sqlDBName; @@ -108,6 +118,10 @@ public class StateManager { initMusic(); initSqlDatabase(); + String t = info.getProperty(KEY_TIMEOUT); + long timeout = (t == null) ? DEFAULT_TIMEOUT : Integer.parseInt(t); + alreadyApplied = new ConcurrentHashMap<>(); + ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout); MusicTxDigest txDaemon = new MusicTxDigest(this); txDaemon.startBackgroundDaemon(Integer.parseInt( @@ -119,7 +133,7 @@ public class StateManager { * @throws MDBCServiceException */ protected void initMusic() throws MDBCServiceException { - this.musicInterface = MixinFactory.createMusicInterface(musicmixin, mdbcServerName, info); + this.musicInterface = MixinFactory.createMusicInterface(this, musicmixin, mdbcServerName, info); this.mdbcConnections = new HashMap<>(); } @@ -326,4 +340,8 @@ public class StateManager { warmupLock.unlock(); } } + + public OwnershipAndCheckpoint getOwnAndCheck() { + return ownAndCheck; + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockRequest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockRequest.java new file mode 100644 index 0000000..e8b400b --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockRequest.java @@ -0,0 +1,69 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ + +package org.onap.music.mdbc.mixins; + +import java.util.List; +import java.util.UUID; +import org.onap.music.mdbc.Range; + +public class LockRequest { + private final String table; + private final UUID id; + private final List<Range> toLockRanges; + private int numOfAttempts; + + /** + * + * @param table + * @param id + * @param toLockRanges + */ + public LockRequest(String table, UUID id, List<Range> toLockRanges) { + this.table = table; + this.id = id; + this.toLockRanges = toLockRanges; + numOfAttempts = 1; + } + + public UUID getId() { + return id; + } + + public List<Range> getToLockRanges() { + return toLockRanges; + } + + public String getTable() { + return table; + } + + /** + * Number of times you've requested this lock + * @return + */ + public int getNumOfAttempts() { + return numOfAttempts; + } + + public void incrementAttempts() { + numOfAttempts++; + } +}
\ No newline at end of file diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java index 7dd92c4..8055cae 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java @@ -25,24 +25,68 @@ import java.util.UUID; import org.onap.music.mdbc.Range; public class LockResult{ - private final UUID musicRangeInformationIndex; - private final String ownerId; + private boolean successful; + private UUID musicRangeInformationIndex; + private String ownerId; private List<Range> ranges; - private final boolean newLock; + private boolean newLock; + private long backOffPeriodS; + public LockResult(boolean succesful, UUID rowId, String ownerId, boolean newLock, List<Range> ranges){ + this.successful = true; + this.musicRangeInformationIndex = rowId; + this.ownerId=ownerId; + this.newLock=newLock; + this.ranges=ranges; + } + /** + * Please use constructor which specifies whether lock result was succesful + * @param rowId + * @param ownerId + * @param newLock + * @param ranges + */ + @Deprecated public LockResult(UUID rowId, String ownerId, boolean newLock, List<Range> ranges){ + this.successful = true; this.musicRangeInformationIndex = rowId; this.ownerId=ownerId; this.newLock=newLock; this.ranges=ranges; } + public LockResult(boolean successful, long backOffTimems) { + this.successful = successful; + this.backOffPeriodS = backOffTimems; + } + + public boolean wasSuccessful() { + return successful; + } + public String getOwnerId(){ return ownerId; } public boolean isNewLock(){ return newLock; } - public UUID getIndex() {return musicRangeInformationIndex;} - public List<Range> getRanges() {return ranges;} - public void addRange(Range range){ranges.add(range);} + public UUID getIndex() { + return musicRangeInformationIndex; + } + + public List<Range> getRanges() { + return ranges; + } + + public void addRange(Range range) { + ranges.add(range); + } + + /** + * Get the backOffPeriod, in milliseconds, requested by mixin + * @return + */ + public long getBackOffPeriod() { + return this.backOffPeriodS; + } + }
\ No newline at end of file diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java index 1edb38d..d822615 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java @@ -25,6 +25,7 @@ import java.sql.Connection; import java.util.Properties; import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.mdbc.StateManager; /** * This class is used to construct instances of Mixins that implement either the {@link org.onap.music.mdbc.mixins.DBInterface} @@ -78,7 +79,7 @@ public class MixinFactory { * @param info the Properties to use as an argument to the constructor * @return the newly constructed MusicInterface, or null if one cannot be found. */ - public static MusicInterface createMusicInterface(String name, String mdbcServerName, Properties info) { + public static MusicInterface createMusicInterface(StateManager statemanager, String name, String mdbcServerName, Properties info) { for (Class<?> cl : Utils.getClassesImplementing(MusicInterface.class)) { try { Constructor<?> con = cl.getConstructor(); @@ -87,10 +88,10 @@ public class MixinFactory { String miname = mi.getMixinName(); logger.info(EELFLoggerDelegate.applicationLogger, "Checking "+miname); if (miname.equalsIgnoreCase(name)) { - con = cl.getConstructor(String.class, Properties.class); + con = cl.getConstructor(StateManager.class, String.class, Properties.class); if (con != null) { logger.info(EELFLoggerDelegate.applicationLogger,"Found match: "+miname); - return (MusicInterface) con.newInstance(mdbcServerName, info); + return (MusicInterface) con.newInstance(statemanager, mdbcServerName, info); } } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java index 8181159..591f830 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java @@ -37,6 +37,7 @@ import org.onap.music.main.ReturnType; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.mdbc.DatabasePartition; +import org.onap.music.mdbc.StateManager; import org.onap.music.mdbc.TableInfo; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; @@ -58,8 +59,8 @@ public class Music2Mixin extends MusicMixin { super(); } - public Music2Mixin(String url, Properties info) throws MDBCServiceException { - super(url, info); + public Music2Mixin(StateManager stateManager, String url, Properties info) throws MDBCServiceException { + super(stateManager, url, info); } /** diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java index c38efb7..00f6d00 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java @@ -211,6 +211,8 @@ public interface MusicInterface { */ RangeDependency getMusicRangeDependency(Range baseRange) throws MDBCServiceException; + List<Range> getRangeDependencies(List<Range> range) throws MDBCServiceException; + /** * This function is used to create a new row in the MRI table * @param info the information used to create the row @@ -269,17 +271,6 @@ public interface MusicInterface { StagingTable getTxDigest(MusicTxDigestId id) throws MDBCServiceException; /** - * Use this functions to verify ownership, and own new ranges - * @param ranges the ranges that should be own after calling this function - * @param partition current information of the ownership in the system - * @param ownOpId is the id used to describe this ownership operation (it is not used to create the new row, if any is - * required - * @return an object indicating the status of the own function result - * @throws MDBCServiceException - */ - OwnershipReturn own(List<Range> ranges, DatabasePartition partition, UUID ownOpId) throws MDBCServiceException; - - /** * This function relinquish ownership, if it is time to do it, it should be used at the end of a commit operation * @param partition information of the partition that is currently being owned * @throws MDBCServiceException @@ -326,11 +317,11 @@ public interface MusicInterface { void deleteOldMriRows(Map<UUID,String> oldRowsAndLocks) throws MDBCServiceException; List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException; - - OwnershipAndCheckpoint getOwnAndCheck(); - - void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException; public void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException; + public LockResult requestLock(LockRequest request) throws MDBCServiceException; + public void releaseLocks(Map<UUID, LockResult> newLocks) throws MDBCServiceException; + public OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges, + Map<UUID, LockResult> locks, UUID ownershipId) throws MDBCServiceException; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index 8a2ef6f..ffb6c87 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -56,6 +56,7 @@ import org.onap.music.main.ReturnType; import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.StateManager; import org.onap.music.mdbc.TableInfo; import org.onap.music.mdbc.ownership.Dag; import org.onap.music.mdbc.ownership.DagNode; @@ -105,16 +106,12 @@ public class MusicMixin implements MusicInterface { public static final String KEY_MUSIC_RFACTOR = "music_rfactor"; /** The property name to use to provide the replication factor for Cassandra. */ public static final String KEY_MUSIC_NAMESPACE = "music_namespace"; - /** The property name to use to provide a timeout to mdbc (ownership) */ - public static final String KEY_TIMEOUT = "mdbc_timeout"; /** Namespace for the tables in MUSIC (Cassandra) */ public static final String DEFAULT_MUSIC_NAMESPACE = "namespace"; /** The default property value to use for the Cassandra IP address. */ public static final String DEFAULT_MUSIC_ADDRESS = "localhost"; /** The default property value to use for the Cassandra replication factor. */ public static final int DEFAULT_MUSIC_RFACTOR = 1; - /** The default property value to use for the MDBC timeout */ - public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours /** The default primary string column, if none is provided. */ public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid"; /** Type of the primary key, if none is defined by the user */ @@ -124,7 +121,7 @@ public class MusicMixin implements MusicInterface { //\TODO Add logic to change the names when required and create the tables when necessary private String musicTxDigestTableName = "musictxdigest"; private String musicEventualTxDigestTableName = "musicevetxdigest"; - private String musicRangeInformationTableName = "musicrangeinformation"; + public static final String musicRangeInformationTableName = "musicrangeinformation"; private String musicRangeDependencyTableName = "musicrangedependency"; private String musicNodeInfoTableName = "nodeinfo"; @@ -157,27 +154,6 @@ public class MusicMixin implements MusicInterface { } } - private class LockRequest{ - private final String table; - private final UUID id; - private final List<Range> toLockRanges; - public LockRequest(String table, UUID id, List<Range> toLockRanges){ - this.table=table; - this.id=id; - this.toLockRanges=toLockRanges; - } - public UUID getId() { - return id; - } - public List<Range> getToLockRanges() { - return toLockRanges; - } - - public String getTable() { - return table; - } - } - private static final Map<Integer, String> typemap = new HashMap<>(); static { @@ -207,14 +183,13 @@ public class MusicMixin implements MusicInterface { protected final String[] allReplicaIds; private final String musicAddress; private final int music_rfactor; - protected long timeout; private MusicConnector mCon = null; private Session musicSession = null; private boolean keyspace_created = false; private Map<String, PreparedStatement> ps_cache = new HashMap<>(); private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>()); - private Map<Range, Pair<MriReference, Integer>> alreadyApplied; - private OwnershipAndCheckpoint ownAndCheck; + private StateManager stateManager; + public MusicMixin() { @@ -226,7 +201,7 @@ public class MusicMixin implements MusicInterface { this.allReplicaIds = null; } - public MusicMixin(String mdbcServerName, Properties info) throws MDBCServiceException { + public MusicMixin(StateManager stateManager, String mdbcServerName, Properties info) throws MDBCServiceException { // Default values -- should be overridden in the Properties // Default to using the host_ids of the various peers as the replica IDs (this is probably preferred) this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS); @@ -245,12 +220,8 @@ public class MusicMixin implements MusicInterface { this.music_ns = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE); logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns); - String t = info.getProperty(KEY_TIMEOUT); - this.timeout = (t == null) ? DEFAULT_TIMEOUT : Integer.parseInt(t); - - alreadyApplied = new ConcurrentHashMap<>(); - ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied,timeout); - + this.stateManager = stateManager; + initializeMetricTables(); } @@ -1354,7 +1325,7 @@ public class MusicMixin implements MusicInterface { */ @Override public void commitLog(DatabasePartition partition,List<Range> eventualRanges, StagingTable transactionDigest, - String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{ + String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException { // first deal with commit for eventually consistent tables filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper); @@ -1374,8 +1345,7 @@ public class MusicMixin implements MusicInterface { //0. See if reference to lock was already created String lockId = partition.getLockId(); if(mriIndex==null || lockId == null || lockId.isEmpty()) { - //\TODO fix this - own(partition.getSnapshot(),partition, MDBCUtils.generateTimebasedUniqueKey()); + throw new MDBCServiceException("Not able to commit, as you are no longer the lock-holder for this partition"); } UUID commitId; @@ -1407,6 +1377,7 @@ public class MusicMixin implements MusicInterface { appendToRedoLog(partition, digestId); List<Range> ranges = partition.getSnapshot(); for(Range r : ranges) { + Map<Range, Pair<MriReference, Integer>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied(); if(!alreadyApplied.containsKey(r)){ throw new MDBCServiceException("already applied data structure was not updated correctly and range " +r+" is not contained"); @@ -1420,12 +1391,7 @@ public class MusicMixin implements MusicInterface { alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), rowAndIndex.getValue()+1)); } } - } - - public void cleanAlreadyApplied(){ - logger.warn("Use this only in test environments"); - alreadyApplied.clear(); - } + } private void filterAndAddEventualTxDigest(List<Range> eventualRanges, StagingTable transactionDigest, String txId, @@ -1585,7 +1551,7 @@ public class MusicMixin implements MusicInterface { } @Override - public RangeDependency getMusicRangeDependency(Range baseRange) throws MDBCServiceException{ + public RangeDependency getMusicRangeDependency(Range baseRange) throws MDBCServiceException { String cql = String.format("SELECT * FROM %s.%s WHERE range = ?;", music_ns, musicRangeDependencyTableName); PreparedQueryObject pQueryObject = new PreparedQueryObject(); pQueryObject.appendQueryString(cql); @@ -1964,15 +1930,7 @@ public class MusicMixin implements MusicInterface { return ecDigestInformation; } - @Override - public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException { - List<Range> snapshot = partition.getSnapshot(); - UUID row = partition.getMRIIndex(); - for(Range r : snapshot){ - alreadyApplied.put(r,Pair.of(new MriReference(row),-1)); - } - - } + ResultSet getAllMriCassandraRows() throws MDBCServiceException { @@ -2076,13 +2034,13 @@ public class MusicMixin implements MusicInterface { private List<Range> lockRow(LockRequest request, DatabasePartition partition,Map<UUID, LockResult> rowLock) throws MDBCServiceException { - if(partition.getMRIIndex().equals(request.id) && partition.isLocked()){ + if(partition.getMRIIndex().equals(request.getId()) && partition.isLocked()){ return new ArrayList<>(); } //\TODO: this function needs to be improved, to track possible changes in the owner of a set of ranges - String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+request.id.toString(); + String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+request.getId().toString(); //return List<Range> knownRanges, UUID mriIndex, String lockId - DatabasePartition newPartition = new DatabasePartition(request.toLockRanges,request.id,null); + DatabasePartition newPartition = new DatabasePartition(request.getToLockRanges(),request.getId(),null); return waitForLock(request,newPartition,rowLock); } @@ -2091,7 +2049,8 @@ public class MusicMixin implements MusicInterface { MusicCore.destroyLockRef(fullyQualifiedKey,lockref); } - private void releaseLocks(Map<UUID,LockResult> newLocks) throws MDBCServiceException{ + @Override + public void releaseLocks(Map<UUID,LockResult> newLocks) throws MDBCServiceException{ for(Map.Entry<UUID,LockResult> lock : newLocks.entrySet()) { unlockKeyInMusic(musicRangeInformationTableName, lock.getKey().toString(), lock.getValue().getOwnerId()); } @@ -2126,7 +2085,8 @@ public class MusicMixin implements MusicInterface { * @return * @throws MDBCServiceException */ - private List<Range> getRangeDependencies(List<Range> range) throws MDBCServiceException{ + @Override + public List<Range> getRangeDependencies(List<Range> range) throws MDBCServiceException{ Set<Range> extendedRange = new HashSet<>(); for(Range r: range){ extendedRange.add(r); @@ -2138,30 +2098,22 @@ public class MusicMixin implements MusicInterface { return new ArrayList<>(extendedRange); } - private LockResult waitForLock(LockRequest request) throws MDBCServiceException{ + @Override + public LockResult requestLock(LockRequest request) throws MDBCServiceException{ String fullyQualifiedKey= music_ns+"."+ request.getTable()+"."+request.getId(); String lockId = MusicCore.createLockReference(fullyQualifiedKey); ReturnType lockReturn = acquireLock(fullyQualifiedKey,lockId); - if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) { + if(lockReturn.getResult() == ResultType.FAILURE) { //\TODO Improve the exponential backoff - int n = 1; + int n = request.getNumOfAttempts(); int low = 1; int high = 1000; Random r = new Random(); - while(MusicCore.whoseTurnIsIt(fullyQualifiedKey) != lockId){ - try { - Thread.sleep(((int) Math.round(Math.pow(2, n)) * 1000) - + (r.nextInt(high - low) + low)); - } catch (InterruptedException e) { - continue; - } - n++; - if (n == 20) { - throw new MDBCServiceException("Lock was impossible to obtain, waited for 20 exponential backoffs!"); - } - } + long backOffTimems = ((int) Math.round(Math.pow(2, n)) * 1000) + + (r.nextInt(high - low) + low); + return new LockResult(false, backOffTimems); } - return new LockResult(request.id,lockId,true,null); + return new LockResult(true, request.getId(),lockId,true,null); } private void recoverFromFailureAndUpdateDag(Dag latestDag,List<MusicRangeInformationRow> rows,List<Range> ranges, @@ -2197,7 +2149,8 @@ public class MusicMixin implements MusicInterface { return newRow; } - private OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges, + @Override + public OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges, Map<UUID,LockResult> locks, UUID ownershipId) throws MDBCServiceException{ recoverFromFailureAndUpdateDag(extendedDag,latestRows,ranges,locks); List<MusicRangeInformationRow> changed = setReadOnlyAnyDoubleRow(extendedDag, latestRows,locks); @@ -2221,119 +2174,7 @@ public class MusicMixin implements MusicInterface { return new OwnershipReturn(ownershipId, ownRow.getOwnerId(), ownRow.getIndex(),ranges,extendedDag); } - // \TODO merge with dag code - private Map<Range,Set<DagNode>> getIsLatestPerRange(Dag dag, List<MusicRangeInformationRow> rows) throws MDBCServiceException { - Map<Range,Set<DagNode>> rowsPerLatestRange = new HashMap<>(); - for(MusicRangeInformationRow row : rows){ - DatabasePartition dbPartition = row.getDBPartition(); - if (row.getIsLatest()) { - for(Range range : dbPartition.getSnapshot()){ - if(!rowsPerLatestRange.containsKey(range)){ - rowsPerLatestRange.put(range,new HashSet<>()); - } - DagNode node = dag.getNode(row.getPartitionIndex()); - if(node!=null) { - rowsPerLatestRange.get(range).add(node); - } - else{ - rowsPerLatestRange.get(range).add(new DagNode(row)); - } - } - } - } - return rowsPerLatestRange; - } - - /** - * Take locking ownership of each range - * @param ranges - ranges that need to be owned - * @param partition - current partition owned - * @param opId - */ - @Override - public OwnershipReturn own(List<Range> ranges, DatabasePartition partition, UUID opId) throws MDBCServiceException { - - if(ranges == null || ranges.isEmpty()) { - return null; - } - - Map<UUID,LockResult> newLocks = new HashMap<>(); - //Init timeout clock - ownAndCheck.startOwnershipTimeoutClock(opId); - if(partition.isLocked()&&partition.getSnapshot().containsAll(ranges)) { - return new OwnershipReturn(opId,partition.getLockId(),partition.getMRIIndex(),partition.getSnapshot(),null); - } - //Find - List<Range> rangesToOwn = getRangeDependencies(ranges); - List<MusicRangeInformationRow> allMriRows = getAllMriRows(); - List<MusicRangeInformationRow> rows = ownAndCheck.extractRowsForRange(allMriRows,rangesToOwn, false); - Dag toOwn = Dag.getDag(rows,rangesToOwn); - Dag currentlyOwn = new Dag(); - while( (toOwn.isDifferent(currentlyOwn) || !currentlyOwn.isOwned() ) && - !ownAndCheck.timeout(opId) - ){ - takeOwnershipOfDag(partition, opId, newLocks, toOwn); - currentlyOwn=toOwn; - //TODO instead of comparing dags, compare rows - allMriRows = getAllMriRows(); - rows = ownAndCheck.extractRowsForRange(allMriRows,rangesToOwn,false); - toOwn = Dag.getDag(rows,rangesToOwn); - } - if(!currentlyOwn.isOwned() || toOwn.isDifferent(currentlyOwn)){ - releaseLocks(newLocks); - ownAndCheck.stopOwnershipTimeoutClock(opId); - logger.error("Error when owning a range: Timeout"); - throw new MDBCServiceException("Ownership timeout"); - } - Set<Range> allRanges = currentlyOwn.getAllRanges(); - List<MusicRangeInformationRow> isLatestRows = ownAndCheck.extractRowsForRange(allMriRows, new ArrayList<>(allRanges), true); - currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,isLatestRows)); - return mergeLatestRows(currentlyOwn,rows,ranges,newLocks,opId); - } - - /** - * Step through dag and take lock ownership of each range - * @param partition - * @param opId - * @param newLocks - * @param toOwn - * @throws MDBCServiceException - */ - private void takeOwnershipOfDag(DatabasePartition partition, UUID opId, Map<UUID, LockResult> newLocks, Dag toOwn) - throws MDBCServiceException { - while(toOwn.hasNextToOwn()){ - DagNode node = toOwn.nextToOwn(); - MusicRangeInformationRow row = node.getRow(); - UUID uuid = row.getPartitionIndex(); - if(partition.isLocked()&&partition.getMRIIndex().equals(uuid)|| - newLocks.containsKey(uuid) || - !row.getIsLatest()){ - toOwn.setOwn(node); - } - else{ - LockResult lockResult = null; - boolean owned = false; - while(!owned && !ownAndCheck.timeout(opId)){ - try { - LockRequest request = new LockRequest(musicRangeInformationTableName,uuid, - new ArrayList(node.getRangeSet())); - lockResult = waitForLock(request); - owned = true; - } - catch (MDBCServiceException e){ - logger.warn("Locking failed, retrying",e); - } - } - if(owned){ - toOwn.setOwn(node); - newLocks.put(uuid,lockResult); - } - else{ - break; - } - } - } - } + /** * This function is used to check if we need to create a new row in MRI, beacause one of the new ranges is not contained @@ -2578,11 +2419,6 @@ public class MusicMixin implements MusicInterface { executeMusicLockedDelete(music_ns,musicRangeInformationTableName,rows.getKey().toString(),rows.getValue()); } } - - @Override - public OwnershipAndCheckpoint getOwnAndCheck(){ - return ownAndCheck; - } @Override public void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException{ diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java index f72b0ec..ddf26ce 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java @@ -29,10 +29,14 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.commons.lang3.tuple.Pair; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.mixins.DBInterface; +import org.onap.music.mdbc.mixins.LockRequest; import org.onap.music.mdbc.mixins.LockResult; import org.onap.music.mdbc.mixins.MusicInterface; +import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn; +import org.onap.music.mdbc.mixins.MusicMixin; import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.MusicTxDigestId; @@ -84,14 +88,8 @@ public class OwnershipAndCheckpoint{ return false; } - /** - * Extracts all the rows that match any of the ranges. - * @param allMriRows - * @param ranges - ranges interested in - * @param onlyIsLatest - only return the "latest" rows - * @return - */ - public List<MusicRangeInformationRow> extractRowsForRange(List<MusicRangeInformationRow> allMriRows, List<Range> ranges, + + private List<MusicRangeInformationRow> extractRowsForRange(List<MusicRangeInformationRow> allMriRows, List<Range> ranges, boolean onlyIsLatest){ List<MusicRangeInformationRow> rows = new ArrayList<>(); for(MusicRangeInformationRow row : allMriRows){ @@ -114,6 +112,13 @@ public class OwnershipAndCheckpoint{ return rows; } + /** + * Extracts all the rows that match any of the ranges. + * @param allMriRows + * @param ranges - ranges interested in + * @param onlyIsLatest - only return the "latest" rows + * @return + */ private List<MusicRangeInformationRow> extractRowsForRange(MusicInterface music, List<Range> ranges, boolean onlyIsLatest) throws MDBCServiceException { final List<MusicRangeInformationRow> allMriRows = music.getAllMriRows(); @@ -248,4 +253,141 @@ public class OwnershipAndCheckpoint{ } + /** + * Use this functions to verify ownership, and taking locking ownership of new ranges + * @param ranges the ranges that should be own after calling this function + * @param partition current information of the ownership in the system + * @param ownOpId is the id used to describe this ownership operation (it is not used to create the new row, if any is + * required + * @return an object indicating the status of the own function result + * @throws MDBCServiceException + */ + public OwnershipReturn own(MusicInterface mi, List<Range> ranges, DatabasePartition partition, UUID opId) throws MDBCServiceException { + + if(ranges == null || ranges.isEmpty()) { + return null; + } + + Map<UUID,LockResult> newLocks = new HashMap<>(); + //Init timeout clock + startOwnershipTimeoutClock(opId); + if(partition.isLocked()&&partition.getSnapshot().containsAll(ranges)) { + return new OwnershipReturn(opId,partition.getLockId(),partition.getMRIIndex(),partition.getSnapshot(),null); + } + //Find + List<Range> rangesToOwn = mi.getRangeDependencies(ranges); + List<MusicRangeInformationRow> rows = extractRowsForRange(mi,rangesToOwn, false); + Dag toOwn = Dag.getDag(rows,rangesToOwn); + Dag currentlyOwn = new Dag(); + while( (toOwn.isDifferent(currentlyOwn) || !currentlyOwn.isOwned() ) && + !timeout(opId) + ){ + takeOwnershipOfDag(mi, partition, opId, newLocks, toOwn); + currentlyOwn=toOwn; + //TODO instead of comparing dags, compare rows + rows = extractRowsForRange(mi, rangesToOwn, false); + toOwn = Dag.getDag(rows,rangesToOwn); + } + if(!currentlyOwn.isOwned() || toOwn.isDifferent(currentlyOwn)){ + mi.releaseLocks(newLocks); + stopOwnershipTimeoutClock(opId); + logger.error("Error when owning a range: Timeout"); + throw new MDBCServiceException("Ownership timeout"); + } + Set<Range> allRanges = currentlyOwn.getAllRanges(); + List<MusicRangeInformationRow> isLatestRows = extractRowsForRange(mi, new ArrayList<>(allRanges), true); + currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,isLatestRows)); + return mi.mergeLatestRows(currentlyOwn,rows,ranges,newLocks,opId); + } + + /** + * Step through dag and take lock ownership of each range + * @param partition + * @param opId + * @param newLocks + * @param toOwn + * @throws MDBCServiceException + */ + private void takeOwnershipOfDag(MusicInterface mi, DatabasePartition partition, UUID opId, Map<UUID, LockResult> newLocks, Dag toOwn) + throws MDBCServiceException { + while(toOwn.hasNextToOwn()){ + DagNode node = toOwn.nextToOwn(); + MusicRangeInformationRow row = node.getRow(); + UUID uuid = row.getPartitionIndex(); + if(partition.isLocked()&&partition.getMRIIndex().equals(uuid)|| + newLocks.containsKey(uuid) || + !row.getIsLatest()){ + toOwn.setOwn(node); + } + else{ + LockRequest request = new LockRequest(MusicMixin.musicRangeInformationTableName,uuid, + new ArrayList(node.getRangeSet())); + LockResult result = null; + boolean owned = false; + while(!owned && !timeout(opId)){ + try { + result = mi.requestLock(request); + if (result.wasSuccessful()) { + owned = true; + continue; + } + //backOff + try { + Thread.sleep(result.getBackOffPeriod()); + } catch (InterruptedException e) { + continue; + } + request.incrementAttempts(); + } + catch (MDBCServiceException e){ + logger.warn("Locking failed, retrying",e); + } + } + if(owned){ + toOwn.setOwn(node); + newLocks.put(uuid,result); + } + else{ + break; + } + } + } + } + + + public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException { + List<Range> snapshot = partition.getSnapshot(); + UUID row = partition.getMRIIndex(); + for(Range r : snapshot){ + alreadyApplied.put(r,Pair.of(new MriReference(row),-1)); + } + } + + // \TODO merge with dag code + private Map<Range,Set<DagNode>> getIsLatestPerRange(Dag dag, List<MusicRangeInformationRow> rows) throws MDBCServiceException { + Map<Range,Set<DagNode>> rowsPerLatestRange = new HashMap<>(); + for(MusicRangeInformationRow row : rows){ + DatabasePartition dbPartition = row.getDBPartition(); + if (row.getIsLatest()) { + for(Range range : dbPartition.getSnapshot()){ + if(!rowsPerLatestRange.containsKey(range)){ + rowsPerLatestRange.put(range,new HashSet<>()); + } + DagNode node = dag.getNode(row.getPartitionIndex()); + if(node!=null) { + rowsPerLatestRange.get(range).add(node); + } + else{ + rowsPerLatestRange.get(range).add(new DagNode(row)); + } + } + } + } + return rowsPerLatestRange; + } + + public Map<Range, Pair<MriReference, Integer>> getAlreadyApplied() { + return this.alreadyApplied; + } + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java index 03db7a7..4db3315 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java @@ -78,7 +78,7 @@ public class MusicTxDigest { warmupRanges.removeAll(partitionRanges); } try { - mi.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges)); + stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges)); } catch (MDBCServiceException e) { logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage()); continue; |