From a290a037ea6da2dc062945f77240a10564f6541f Mon Sep 17 00:00:00 2001 From: "Tschaen, Brendan" Date: Wed, 20 Mar 2019 12:12:06 -0400 Subject: Warm up ranges across sites Add configuration to select which ranges we warm up. Default (null) is all ranges Change-Id: I0f241563959cc9fb673ab6d8a87c45e8399086c2 Issue-ID: MUSIC-287 Signed-off-by: Tschaen, Brendan --- .../java/org/onap/music/mdbc/Configuration.java | 6 ++ .../java/org/onap/music/mdbc/StateManager.java | 89 +++++++++++++++++----- .../onap/music/mdbc/examples/MdbcTestClient.java | 24 +++--- .../org/onap/music/mdbc/mixins/DBInterface.java | 6 ++ .../org/onap/music/mdbc/mixins/MySQLMixin.java | 30 +++++++- .../mdbc/ownership/OwnershipAndCheckpoint.java | 23 ++++-- .../org/onap/music/mdbc/tables/MusicTxDigest.java | 28 +++---- 7 files changed, 150 insertions(+), 56 deletions(-) (limited to 'mdbc-server/src/main/java/org/onap') diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java index 7e39772..ced5745 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java @@ -38,4 +38,10 @@ public class Configuration { public static final String TX_DAEMON_SLEEPTIME_S = "txdaemonsleeps"; /** Default txDigest Daemon sleep time */ public static final String TX_DAEMON_SLEEPTIME_S_DEFAULT = "10"; + /** The property name to use to provide a timeout to mdbc (ownership) */ + public static final String KEY_OWNERSHIP_TIMEOUT = "mdbc_timeout"; + /** The default property value to use for the MDBC timeout */ + public static final long DEFAULT_OWNERSHIP_TIMEOUT = 5*60*60*1000;//default of 5 hours + /** The property name to provide comma separated list of ranges to warmup */ + public static final String KEY_WARMUPRANGES = "warmupranges"; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index 430e783..f0d9832 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -45,6 +45,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -82,21 +83,26 @@ public class StateManager { String cassandraUrl; private Properties info; - /** The property name to use to provide a timeout to mdbc (ownership) */ - public static final String KEY_TIMEOUT = "mdbc_timeout"; - /** The default property value to use for the MDBC timeout */ - public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours - /** Identifier for this server instance */ private String mdbcServerName; private Map connectionRanges;//Each connection owns its own database partition private final Lock eventualLock = new ReentrantLock(); private List eventualRanges; + /** lock for warmupRanges */ private final Lock warmupLock = new ReentrantLock(); - private List warmupRanges; + /** a set of ranges that should be periodically updated with latest information, if null all tables should be warmed up */ + private Set rangesToWarmup; + /** map of transactions that have already been applied/updated in this sites SQL db */ private Map> alreadyApplied; private OwnershipAndCheckpoint ownAndCheck; + /** + * For testing purposes only + */ + @Deprecated + public StateManager() { + } + public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException { this.sqlDBName = sqlDBName; this.sqlDBUrl = sqlDBUrl; @@ -118,11 +124,14 @@ public class StateManager { initMusic(); initSqlDatabase(); - String t = info.getProperty(KEY_TIMEOUT); - long timeout = (t == null) ? DEFAULT_TIMEOUT : Integer.parseInt(t); + String t = info.getProperty(Configuration.KEY_OWNERSHIP_TIMEOUT); + long timeoutMs = (t == null) ? Configuration.DEFAULT_OWNERSHIP_TIMEOUT : Integer.parseInt(t); alreadyApplied = new ConcurrentHashMap<>(); - ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout); + ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeoutMs); + rangesToWarmup = initWarmupRanges(); + logger.info("Warmup ranges for this site is " + rangesToWarmup); + MusicTxDigest txDaemon = new MusicTxDigest(this); txDaemon.startBackgroundDaemon(Integer.parseInt( info.getProperty(Configuration.TX_DAEMON_SLEEPTIME_S, Configuration.TX_DAEMON_SLEEPTIME_S_DEFAULT))); @@ -160,6 +169,24 @@ public class StateManager { throw new MDBCServiceException(e.getMessage(), e); } } + + /** + * Get list of ranges to warmup from configuration file + * if no configuration is provided, will return null + * @return + */ + private Set initWarmupRanges() { + String warmupString = info.getProperty(Configuration.KEY_WARMUPRANGES); + if (warmupString==null) { + return null; + } + Set warmupRanges = new HashSet<>(); + String[] ranges = warmupString.split(","); + for (String range: ranges) { + warmupRanges.add(new Range(range.trim())); + } + return warmupRanges; + } public MusicInterface getMusicInterface() { return this.musicInterface; @@ -169,23 +196,45 @@ public class StateManager { return new ArrayList<>(connectionRanges.values()); } - public List getWarmupRanges(){ + /** + * Get a list of ranges that are to be periodically warmed up + * + * If no list is specified, all ranges except eventual consistency ranges are returned + * @return + */ + public Set getRangesToWarmup() { warmupLock.lock(); - List returnArray; + Set returnSet; try { - if(warmupRanges!=null) { - returnArray = new ArrayList<>(warmupRanges); + if(rangesToWarmup!=null) { + returnSet = rangesToWarmup; } - else{ - returnArray = null; + else { + returnSet = getAllRanges(); + for (Range eventualRange: eventualRanges) { + returnSet.remove(eventualRange); + } } } finally{ warmupLock.unlock(); } - return returnArray; + return returnSet; } + /** + * Get a set of all ranges seen in the sql db + * @return + */ + private Set getAllRanges() { + DBInterface dbi = ((MdbcConnection) getConnection("daemon")).getDBInterface(); + return dbi.getSQLRangeSet(); + } + + /** + * Get a list of ranges that are eventually consistent + * @return + */ public List getEventualRanges() { eventualLock.lock(); List returnArray; @@ -221,6 +270,10 @@ public class StateManager { this.mdbcServerName = mdbcServerName; } + /** + * Close connection and relinquish any locks held for that connection + * @param connectionId + */ public void closeConnection(String connectionId){ //\TODO check if there is a race condition if(mdbcConnections.containsKey(connectionId)) { @@ -331,10 +384,10 @@ public class StateManager { } - public void setWarmupRanges(List warmupRanges) { + public void setWarmupRanges(Set warmupRanges) { warmupLock.lock(); try { - this.warmupRanges = warmupRanges; + this.rangesToWarmup = warmupRanges; } finally{ warmupLock.unlock(); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java index 6a17d4c..fd8651a 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java @@ -50,11 +50,15 @@ public class MdbcTestClient { System.exit(1); } Connection connection; - try { - connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf"); - } catch (SQLException e) { - e.printStackTrace(); - return; + try { + String metricURL = "http://localhost:300000/test"; + if(args[0] != null) { + metricURL = args[0]; + } + connection = DriverManager.getConnection("jdbc:avatica:remote:url=" + metricURL+ ";serialization=protobuf"); + } catch (SQLException e) { + e.printStackTrace(); + return; } try { @@ -104,7 +108,7 @@ public class MdbcTestClient { } final String insertSQL = "INSERT INTO Persons VALUES (1, 'Martinez', 'Juan', 'KACB', 'ATLANTA');"; - final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=1;"; + final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=2;"; final String insertSQL2 = "INSERT INTO Persons VALUES (2, 'Smith', 'JOHN', 'GNOC', 'BEDMINSTER');"; final String insertSQL3 = "UPDATE Persons SET FirstName='JOSH' WHERE LastName='Smith';"; final String insertSQL4 = "UPDATE Persons SET FirstName='JOHN' WHERE LastName='Smith';"; @@ -119,11 +123,11 @@ public class MdbcTestClient { } try { - execute = insertStmt.execute(insertSQL); + //execute = insertStmt.execute(insertSQL); execute = insertStmt.execute(insertSQL1); - execute = insertStmt.execute(insertSQL2); - execute = insertStmt.execute(insertSQL3); - execute = insertStmt.execute(insertSQL4); + //execute = insertStmt.execute(insertSQL2); + //execute = insertStmt.execute(insertSQL3); + //execute = insertStmt.execute(insertSQL4); } catch (SQLException e) { e.printStackTrace(); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java index 85645f3..5b72482 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java @@ -49,8 +49,14 @@ public interface DBInterface { * Get a set of the table names in the database. The table names should be returned in UPPER CASE. * @return the set */ + @Deprecated Set getSQLTableSet(); /** + * Get a set of the ranges in the database + * @return the set + */ + Set getSQLRangeSet(); + /** * Return the name of the database that the driver is connected to * @return */ diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java index 420f9d4..7154b34 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java @@ -27,6 +27,7 @@ import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -150,10 +151,7 @@ public class MySQLMixin implements DBInterface { public String getDatabaseName() { return this.dbName; } - /** - * Get a set of the table names in the database. - * @return the set - */ + @Override public Set getSQLTableSet() { Set set = new TreeSet(); @@ -172,6 +170,30 @@ public class MySQLMixin implements DBInterface { logger.debug(EELFLoggerDelegate.applicationLogger,"getSQLTableSet returning: "+ set); return set; } + + @Override + public Set getSQLRangeSet() { + Set set = new TreeSet(); + String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'"; + try { + Statement stmt = jdbcConn.createStatement(); + ResultSet rs = stmt.executeQuery(sql); + while (rs.next()) { + String s = rs.getString("TABLE_NAME"); + set.add(s); + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger,"getSQLTableSet: "+e); + } + logger.debug(EELFLoggerDelegate.applicationLogger,"getSQLTableSet returning: "+ set); + Set rangeSet = new HashSet<>(); + for (String table: set) { + rangeSet.add(new Range(table)); + } + return rangeSet; + } + /* mysql> describe tables; +-----------------+---------------------+------+-----+---------+-------+ diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java index ddf26ce..057b550 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java @@ -180,23 +180,30 @@ public class OwnershipAndCheckpoint{ throw new MDBCServiceException("Error applying tx digest in local SQL",e); } } - - public void warmup(MusicInterface mi, DBInterface di, List ranges) throws MDBCServiceException { - if(ranges.isEmpty()){ + + /** + * Replay the updates for the partitions containing ranges to the local database + * @param mi + * @param di + * @param rangesToWarmup + * @throws MDBCServiceException + */ + public void warmup(MusicInterface mi, DBInterface di, List rangesToWarmup) throws MDBCServiceException { + if(rangesToWarmup.isEmpty()){ return; } boolean ready = false; change.set(true); - Set rangeSet = new HashSet(ranges); + Set rangeSet = new HashSet(rangesToWarmup); Dag dag = new Dag(false); while(!ready){ if(change.get()){ change.set(false); - final List rows = extractRowsForRange(mi, ranges,false); - dag = Dag.getDag(rows,ranges); + final List rows = extractRowsForRange(mi, rangesToWarmup,false); + dag = Dag.getDag(rows,rangesToWarmup); } else if(!dag.applied()){ - DagNode node = dag.nextToApply(ranges); + DagNode node = dag.nextToApply(rangesToWarmup); if(node!=null) { Pair> pair = node.nextNotAppliedTransaction(rangeSet); while (pair != null) { @@ -208,7 +215,7 @@ public class OwnershipAndCheckpoint{ break; } else { final StagingTable txDigest = mi.getTxDigest(pair.getKey()); - applyTxDigest(ranges,di, txDigest); + applyTxDigest(rangesToWarmup,di, txDigest); for (Range r : pair.getValue()) { MusicRangeInformationRow row = node.getRow(); alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index)); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java index 4db3315..5b3872a 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java @@ -67,22 +67,18 @@ public class MusicTxDigest { continue; } //2) for each partition I don't own - final List warmuplist = stateManager.getWarmupRanges(); - if(warmuplist!=null) { - final Set warmupRanges = new HashSet(warmuplist); - final List currentPartitions = stateManager.getPartitions(); - List missingRanges = new ArrayList<>(); - if (currentPartitions.size() != 0) { - for (DatabasePartition part : currentPartitions) { - List partitionRanges = part.getSnapshot(); - warmupRanges.removeAll(partitionRanges); - } - try { - stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges)); - } catch (MDBCServiceException e) { - logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage()); - continue; - } + final Set warmupRanges = stateManager.getRangesToWarmup(); + final List currentPartitions = stateManager.getPartitions(); + if (currentPartitions.size() != 0) { + for (DatabasePartition part : currentPartitions) { + List partitionRanges = part.getSnapshot(); + warmupRanges.removeAll(partitionRanges); + } + try { + stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges)); + } catch (MDBCServiceException e) { + logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage()); + continue; } } -- cgit 1.2.3-korg