diff options
author | Bharath Balasubramanian <bharathb@research.att.com> | 2019-03-25 23:11:59 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-03-25 23:11:59 +0000 |
commit | a40ee886c28bc28db1794792f1fb312b723d48fb (patch) | |
tree | 10a6ee7cc31694d4815df2ea22512d357fbee00a /mdbc-server/src/main | |
parent | 66e0e407c48d2288ce0d7d6f8129487a29de8c02 (diff) | |
parent | a290a037ea6da2dc062945f77240a10564f6541f (diff) |
Merge "Warm up ranges across sites"
Diffstat (limited to 'mdbc-server/src/main')
8 files changed, 152 insertions, 58 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java index 7e39772..ced5745 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java @@ -38,4 +38,10 @@ public class Configuration { public static final String TX_DAEMON_SLEEPTIME_S = "txdaemonsleeps"; /** Default txDigest Daemon sleep time */ public static final String TX_DAEMON_SLEEPTIME_S_DEFAULT = "10"; + /** The property name to use to provide a timeout to mdbc (ownership) */ + public static final String KEY_OWNERSHIP_TIMEOUT = "mdbc_timeout"; + /** The default property value to use for the MDBC timeout */ + public static final long DEFAULT_OWNERSHIP_TIMEOUT = 5*60*60*1000;//default of 5 hours + /** The property name to provide comma separated list of ranges to warmup */ + public static final String KEY_WARMUPRANGES = "warmupranges"; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index 430e783..f0d9832 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -45,6 +45,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -82,21 +83,26 @@ public class StateManager { String cassandraUrl; private Properties info; - /** The property name to use to provide a timeout to mdbc (ownership) */ - public static final String KEY_TIMEOUT = "mdbc_timeout"; - /** The default property value to use for the MDBC timeout */ - public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours - /** Identifier for this server instance */ private String mdbcServerName; private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition private final Lock eventualLock = new ReentrantLock(); private List<Range> eventualRanges; + /** lock for warmupRanges */ private final Lock warmupLock = new ReentrantLock(); - private List<Range> warmupRanges; + /** a set of ranges that should be periodically updated with latest information, if null all tables should be warmed up */ + private Set<Range> rangesToWarmup; + /** map of transactions that have already been applied/updated in this sites SQL db */ private Map<Range, Pair<MriReference, Integer>> alreadyApplied; private OwnershipAndCheckpoint ownAndCheck; + /** + * For testing purposes only + */ + @Deprecated + public StateManager() { + } + public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException { this.sqlDBName = sqlDBName; this.sqlDBUrl = sqlDBUrl; @@ -118,11 +124,14 @@ public class StateManager { initMusic(); initSqlDatabase(); - String t = info.getProperty(KEY_TIMEOUT); - long timeout = (t == null) ? DEFAULT_TIMEOUT : Integer.parseInt(t); + String t = info.getProperty(Configuration.KEY_OWNERSHIP_TIMEOUT); + long timeoutMs = (t == null) ? Configuration.DEFAULT_OWNERSHIP_TIMEOUT : Integer.parseInt(t); alreadyApplied = new ConcurrentHashMap<>(); - ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout); + ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeoutMs); + rangesToWarmup = initWarmupRanges(); + logger.info("Warmup ranges for this site is " + rangesToWarmup); + MusicTxDigest txDaemon = new MusicTxDigest(this); txDaemon.startBackgroundDaemon(Integer.parseInt( info.getProperty(Configuration.TX_DAEMON_SLEEPTIME_S, Configuration.TX_DAEMON_SLEEPTIME_S_DEFAULT))); @@ -160,6 +169,24 @@ public class StateManager { throw new MDBCServiceException(e.getMessage(), e); } } + + /** + * Get list of ranges to warmup from configuration file + * if no configuration is provided, will return null + * @return + */ + private Set<Range> initWarmupRanges() { + String warmupString = info.getProperty(Configuration.KEY_WARMUPRANGES); + if (warmupString==null) { + return null; + } + Set<Range> warmupRanges = new HashSet<>(); + String[] ranges = warmupString.split(","); + for (String range: ranges) { + warmupRanges.add(new Range(range.trim())); + } + return warmupRanges; + } public MusicInterface getMusicInterface() { return this.musicInterface; @@ -169,23 +196,45 @@ public class StateManager { return new ArrayList<>(connectionRanges.values()); } - public List<Range> getWarmupRanges(){ + /** + * Get a list of ranges that are to be periodically warmed up + * + * If no list is specified, all ranges except eventual consistency ranges are returned + * @return + */ + public Set<Range> getRangesToWarmup() { warmupLock.lock(); - List<Range> returnArray; + Set<Range> returnSet; try { - if(warmupRanges!=null) { - returnArray = new ArrayList<>(warmupRanges); + if(rangesToWarmup!=null) { + returnSet = rangesToWarmup; } - else{ - returnArray = null; + else { + returnSet = getAllRanges(); + for (Range eventualRange: eventualRanges) { + returnSet.remove(eventualRange); + } } } finally{ warmupLock.unlock(); } - return returnArray; + return returnSet; } + /** + * Get a set of all ranges seen in the sql db + * @return + */ + private Set<Range> getAllRanges() { + DBInterface dbi = ((MdbcConnection) getConnection("daemon")).getDBInterface(); + return dbi.getSQLRangeSet(); + } + + /** + * Get a list of ranges that are eventually consistent + * @return + */ public List<Range> getEventualRanges() { eventualLock.lock(); List<Range> returnArray; @@ -221,6 +270,10 @@ public class StateManager { this.mdbcServerName = mdbcServerName; } + /** + * Close connection and relinquish any locks held for that connection + * @param connectionId + */ public void closeConnection(String connectionId){ //\TODO check if there is a race condition if(mdbcConnections.containsKey(connectionId)) { @@ -331,10 +384,10 @@ public class StateManager { } - public void setWarmupRanges(List<Range> warmupRanges) { + public void setWarmupRanges(Set<Range> warmupRanges) { warmupLock.lock(); try { - this.warmupRanges = warmupRanges; + this.rangesToWarmup = warmupRanges; } finally{ warmupLock.unlock(); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java index 6a17d4c..fd8651a 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java @@ -50,11 +50,15 @@ public class MdbcTestClient { System.exit(1);
}
Connection connection;
- try {
- connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf");
- } catch (SQLException e) {
- e.printStackTrace();
- return;
+ try {
+ String metricURL = "http://localhost:300000/test";
+ if(args[0] != null) {
+ metricURL = args[0];
+ }
+ connection = DriverManager.getConnection("jdbc:avatica:remote:url=" + metricURL+ ";serialization=protobuf");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ return;
}
try {
@@ -104,7 +108,7 @@ public class MdbcTestClient { }
final String insertSQL = "INSERT INTO Persons VALUES (1, 'Martinez', 'Juan', 'KACB', 'ATLANTA');";
- final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=1;";
+ final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=2;";
final String insertSQL2 = "INSERT INTO Persons VALUES (2, 'Smith', 'JOHN', 'GNOC', 'BEDMINSTER');";
final String insertSQL3 = "UPDATE Persons SET FirstName='JOSH' WHERE LastName='Smith';";
final String insertSQL4 = "UPDATE Persons SET FirstName='JOHN' WHERE LastName='Smith';";
@@ -119,11 +123,11 @@ public class MdbcTestClient { }
try {
- execute = insertStmt.execute(insertSQL);
+ //execute = insertStmt.execute(insertSQL);
execute = insertStmt.execute(insertSQL1);
- execute = insertStmt.execute(insertSQL2);
- execute = insertStmt.execute(insertSQL3);
- execute = insertStmt.execute(insertSQL4);
+ //execute = insertStmt.execute(insertSQL2);
+ //execute = insertStmt.execute(insertSQL3);
+ //execute = insertStmt.execute(insertSQL4);
} catch (SQLException e) {
e.printStackTrace();
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java index 5ef2bc7..a514dc6 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java @@ -49,8 +49,14 @@ public interface DBInterface { * Get a set of the table names in the database. The table names should be returned in UPPER CASE. * @return the set */ + @Deprecated Set<String> getSQLTableSet(); /** + * Get a set of the ranges in the database + * @return the set + */ + Set<Range> getSQLRangeSet(); + /** * Return the name of the database that the driver is connected to * @return */ diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java index 928ffa1..0afacea 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java @@ -27,6 +27,7 @@ import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -150,10 +151,7 @@ public class MySQLMixin implements DBInterface { public String getDatabaseName() { return this.dbName; } - /** - * Get a set of the table names in the database. - * @return the set - */ + @Override public Set<String> getSQLTableSet() { Set<String> set = new TreeSet<String>(); @@ -172,6 +170,30 @@ public class MySQLMixin implements DBInterface { logger.debug(EELFLoggerDelegate.applicationLogger,"getSQLTableSet returning: "+ set); return set; } + + @Override + public Set<Range> getSQLRangeSet() { + Set<String> set = new TreeSet<String>(); + String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'"; + try { + Statement stmt = jdbcConn.createStatement(); + ResultSet rs = stmt.executeQuery(sql); + while (rs.next()) { + String s = rs.getString("TABLE_NAME"); + set.add(s); + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger,"getSQLTableSet: "+e); + } + logger.debug(EELFLoggerDelegate.applicationLogger,"getSQLTableSet returning: "+ set); + Set<Range> rangeSet = new HashSet<>(); + for (String table: set) { + rangeSet.add(new Range(table)); + } + return rangeSet; + } + /* mysql> describe tables; +-----------------+---------------------+------+-----+---------+-------+ diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java index ddf26ce..057b550 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java @@ -180,23 +180,30 @@ public class OwnershipAndCheckpoint{ throw new MDBCServiceException("Error applying tx digest in local SQL",e); } } - - public void warmup(MusicInterface mi, DBInterface di, List<Range> ranges) throws MDBCServiceException { - if(ranges.isEmpty()){ + + /** + * Replay the updates for the partitions containing ranges to the local database + * @param mi + * @param di + * @param rangesToWarmup + * @throws MDBCServiceException + */ + public void warmup(MusicInterface mi, DBInterface di, List<Range> rangesToWarmup) throws MDBCServiceException { + if(rangesToWarmup.isEmpty()){ return; } boolean ready = false; change.set(true); - Set<Range> rangeSet = new HashSet<Range>(ranges); + Set<Range> rangeSet = new HashSet<Range>(rangesToWarmup); Dag dag = new Dag(false); while(!ready){ if(change.get()){ change.set(false); - final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, ranges,false); - dag = Dag.getDag(rows,ranges); + final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, rangesToWarmup,false); + dag = Dag.getDag(rows,rangesToWarmup); } else if(!dag.applied()){ - DagNode node = dag.nextToApply(ranges); + DagNode node = dag.nextToApply(rangesToWarmup); if(node!=null) { Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet); while (pair != null) { @@ -208,7 +215,7 @@ public class OwnershipAndCheckpoint{ break; } else { final StagingTable txDigest = mi.getTxDigest(pair.getKey()); - applyTxDigest(ranges,di, txDigest); + applyTxDigest(rangesToWarmup,di, txDigest); for (Range r : pair.getValue()) { MusicRangeInformationRow row = node.getRow(); alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index)); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java index 4db3315..5b3872a 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java @@ -67,22 +67,18 @@ public class MusicTxDigest { continue; } //2) for each partition I don't own - final List<Range> warmuplist = stateManager.getWarmupRanges(); - if(warmuplist!=null) { - final Set<Range> warmupRanges = new HashSet(warmuplist); - final List<DatabasePartition> currentPartitions = stateManager.getPartitions(); - List<Range> missingRanges = new ArrayList<>(); - if (currentPartitions.size() != 0) { - for (DatabasePartition part : currentPartitions) { - List<Range> partitionRanges = part.getSnapshot(); - warmupRanges.removeAll(partitionRanges); - } - try { - stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges)); - } catch (MDBCServiceException e) { - logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage()); - continue; - } + final Set<Range> warmupRanges = stateManager.getRangesToWarmup(); + final List<DatabasePartition> currentPartitions = stateManager.getPartitions(); + if (currentPartitions.size() != 0) { + for (DatabasePartition part : currentPartitions) { + List<Range> partitionRanges = part.getSnapshot(); + warmupRanges.removeAll(partitionRanges); + } + try { + stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges)); + } catch (MDBCServiceException e) { + logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage()); + continue; } } diff --git a/mdbc-server/src/main/resources/music.properties b/mdbc-server/src/main/resources/music.properties index 83dcb7c..0b34ff9 100755 --- a/mdbc-server/src/main/resources/music.properties +++ b/mdbc-server/src/main/resources/music.properties @@ -4,5 +4,5 @@ cassandra.user =\ cassandra cassandra.password =\ cassandra -zookeeper.host =\ - localhost +music_namespace =\ + mdbc_namespace
\ No newline at end of file |