aboutsummaryrefslogtreecommitdiffstats
path: root/mdbc-server/src/main
diff options
context:
space:
mode:
authorBharath Balasubramanian <bharathb@research.att.com>2019-03-25 23:11:59 +0000
committerGerrit Code Review <gerrit@onap.org>2019-03-25 23:11:59 +0000
commita40ee886c28bc28db1794792f1fb312b723d48fb (patch)
tree10a6ee7cc31694d4815df2ea22512d357fbee00a /mdbc-server/src/main
parent66e0e407c48d2288ce0d7d6f8129487a29de8c02 (diff)
parenta290a037ea6da2dc062945f77240a10564f6541f (diff)
Merge "Warm up ranges across sites"
Diffstat (limited to 'mdbc-server/src/main')
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java6
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java89
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java24
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java6
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java30
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java23
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java28
-rwxr-xr-xmdbc-server/src/main/resources/music.properties4
8 files changed, 152 insertions, 58 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
index 7e39772..ced5745 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
@@ -38,4 +38,10 @@ public class Configuration {
public static final String TX_DAEMON_SLEEPTIME_S = "txdaemonsleeps";
/** Default txDigest Daemon sleep time */
public static final String TX_DAEMON_SLEEPTIME_S_DEFAULT = "10";
+ /** The property name to use to provide a timeout to mdbc (ownership) */
+ public static final String KEY_OWNERSHIP_TIMEOUT = "mdbc_timeout";
+ /** The default property value to use for the MDBC timeout */
+ public static final long DEFAULT_OWNERSHIP_TIMEOUT = 5*60*60*1000;//default of 5 hours
+ /** The property name to provide comma separated list of ranges to warmup */
+ public static final String KEY_WARMUPRANGES = "warmupranges";
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index 430e783..f0d9832 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -45,6 +45,7 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
@@ -82,21 +83,26 @@ public class StateManager {
String cassandraUrl;
private Properties info;
- /** The property name to use to provide a timeout to mdbc (ownership) */
- public static final String KEY_TIMEOUT = "mdbc_timeout";
- /** The default property value to use for the MDBC timeout */
- public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours
-
/** Identifier for this server instance */
private String mdbcServerName;
private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition
private final Lock eventualLock = new ReentrantLock();
private List<Range> eventualRanges;
+ /** lock for warmupRanges */
private final Lock warmupLock = new ReentrantLock();
- private List<Range> warmupRanges;
+ /** a set of ranges that should be periodically updated with latest information, if null all tables should be warmed up */
+ private Set<Range> rangesToWarmup;
+ /** map of transactions that have already been applied/updated in this sites SQL db */
private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
private OwnershipAndCheckpoint ownAndCheck;
+ /**
+ * For testing purposes only
+ */
+ @Deprecated
+ public StateManager() {
+ }
+
public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
this.sqlDBName = sqlDBName;
this.sqlDBUrl = sqlDBUrl;
@@ -118,11 +124,14 @@ public class StateManager {
initMusic();
initSqlDatabase();
- String t = info.getProperty(KEY_TIMEOUT);
- long timeout = (t == null) ? DEFAULT_TIMEOUT : Integer.parseInt(t);
+ String t = info.getProperty(Configuration.KEY_OWNERSHIP_TIMEOUT);
+ long timeoutMs = (t == null) ? Configuration.DEFAULT_OWNERSHIP_TIMEOUT : Integer.parseInt(t);
alreadyApplied = new ConcurrentHashMap<>();
- ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout);
+ ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeoutMs);
+ rangesToWarmup = initWarmupRanges();
+ logger.info("Warmup ranges for this site is " + rangesToWarmup);
+
MusicTxDigest txDaemon = new MusicTxDigest(this);
txDaemon.startBackgroundDaemon(Integer.parseInt(
info.getProperty(Configuration.TX_DAEMON_SLEEPTIME_S, Configuration.TX_DAEMON_SLEEPTIME_S_DEFAULT)));
@@ -160,6 +169,24 @@ public class StateManager {
throw new MDBCServiceException(e.getMessage(), e);
}
}
+
+ /**
+ * Get list of ranges to warmup from configuration file
+ * if no configuration is provided, will return null
+ * @return
+ */
+ private Set<Range> initWarmupRanges() {
+ String warmupString = info.getProperty(Configuration.KEY_WARMUPRANGES);
+ if (warmupString==null) {
+ return null;
+ }
+ Set<Range> warmupRanges = new HashSet<>();
+ String[] ranges = warmupString.split(",");
+ for (String range: ranges) {
+ warmupRanges.add(new Range(range.trim()));
+ }
+ return warmupRanges;
+ }
public MusicInterface getMusicInterface() {
return this.musicInterface;
@@ -169,23 +196,45 @@ public class StateManager {
return new ArrayList<>(connectionRanges.values());
}
- public List<Range> getWarmupRanges(){
+ /**
+ * Get a list of ranges that are to be periodically warmed up
+ *
+ * If no list is specified, all ranges except eventual consistency ranges are returned
+ * @return
+ */
+ public Set<Range> getRangesToWarmup() {
warmupLock.lock();
- List<Range> returnArray;
+ Set<Range> returnSet;
try {
- if(warmupRanges!=null) {
- returnArray = new ArrayList<>(warmupRanges);
+ if(rangesToWarmup!=null) {
+ returnSet = rangesToWarmup;
}
- else{
- returnArray = null;
+ else {
+ returnSet = getAllRanges();
+ for (Range eventualRange: eventualRanges) {
+ returnSet.remove(eventualRange);
+ }
}
}
finally{
warmupLock.unlock();
}
- return returnArray;
+ return returnSet;
}
+ /**
+ * Get a set of all ranges seen in the sql db
+ * @return
+ */
+ private Set<Range> getAllRanges() {
+ DBInterface dbi = ((MdbcConnection) getConnection("daemon")).getDBInterface();
+ return dbi.getSQLRangeSet();
+ }
+
+ /**
+ * Get a list of ranges that are eventually consistent
+ * @return
+ */
public List<Range> getEventualRanges() {
eventualLock.lock();
List<Range> returnArray;
@@ -221,6 +270,10 @@ public class StateManager {
this.mdbcServerName = mdbcServerName;
}
+ /**
+ * Close connection and relinquish any locks held for that connection
+ * @param connectionId
+ */
public void closeConnection(String connectionId){
//\TODO check if there is a race condition
if(mdbcConnections.containsKey(connectionId)) {
@@ -331,10 +384,10 @@ public class StateManager {
}
- public void setWarmupRanges(List<Range> warmupRanges) {
+ public void setWarmupRanges(Set<Range> warmupRanges) {
warmupLock.lock();
try {
- this.warmupRanges = warmupRanges;
+ this.rangesToWarmup = warmupRanges;
}
finally{
warmupLock.unlock();
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java
index 6a17d4c..fd8651a 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java
@@ -50,11 +50,15 @@ public class MdbcTestClient {
System.exit(1);
}
Connection connection;
- try {
- connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf");
- } catch (SQLException e) {
- e.printStackTrace();
- return;
+ try {
+ String metricURL = "http://localhost:300000/test";
+ if(args[0] != null) {
+ metricURL = args[0];
+ }
+ connection = DriverManager.getConnection("jdbc:avatica:remote:url=" + metricURL+ ";serialization=protobuf");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ return;
}
try {
@@ -104,7 +108,7 @@ public class MdbcTestClient {
}
final String insertSQL = "INSERT INTO Persons VALUES (1, 'Martinez', 'Juan', 'KACB', 'ATLANTA');";
- final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=1;";
+ final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=2;";
final String insertSQL2 = "INSERT INTO Persons VALUES (2, 'Smith', 'JOHN', 'GNOC', 'BEDMINSTER');";
final String insertSQL3 = "UPDATE Persons SET FirstName='JOSH' WHERE LastName='Smith';";
final String insertSQL4 = "UPDATE Persons SET FirstName='JOHN' WHERE LastName='Smith';";
@@ -119,11 +123,11 @@ public class MdbcTestClient {
}
try {
- execute = insertStmt.execute(insertSQL);
+ //execute = insertStmt.execute(insertSQL);
execute = insertStmt.execute(insertSQL1);
- execute = insertStmt.execute(insertSQL2);
- execute = insertStmt.execute(insertSQL3);
- execute = insertStmt.execute(insertSQL4);
+ //execute = insertStmt.execute(insertSQL2);
+ //execute = insertStmt.execute(insertSQL3);
+ //execute = insertStmt.execute(insertSQL4);
} catch (SQLException e) {
e.printStackTrace();
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
index 5ef2bc7..a514dc6 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
@@ -49,8 +49,14 @@ public interface DBInterface {
* Get a set of the table names in the database. The table names should be returned in UPPER CASE.
* @return the set
*/
+ @Deprecated
Set<String> getSQLTableSet();
/**
+ * Get a set of the ranges in the database
+ * @return the set
+ */
+ Set<Range> getSQLRangeSet();
+ /**
* Return the name of the database that the driver is connected to
* @return
*/
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
index 928ffa1..0afacea 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
@@ -27,6 +27,7 @@ import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -150,10 +151,7 @@ public class MySQLMixin implements DBInterface {
public String getDatabaseName() {
return this.dbName;
}
- /**
- * Get a set of the table names in the database.
- * @return the set
- */
+
@Override
public Set<String> getSQLTableSet() {
Set<String> set = new TreeSet<String>();
@@ -172,6 +170,30 @@ public class MySQLMixin implements DBInterface {
logger.debug(EELFLoggerDelegate.applicationLogger,"getSQLTableSet returning: "+ set);
return set;
}
+
+ @Override
+ public Set<Range> getSQLRangeSet() {
+ Set<String> set = new TreeSet<String>();
+ String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
+ try {
+ Statement stmt = jdbcConn.createStatement();
+ ResultSet rs = stmt.executeQuery(sql);
+ while (rs.next()) {
+ String s = rs.getString("TABLE_NAME");
+ set.add(s);
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,"getSQLTableSet: "+e);
+ }
+ logger.debug(EELFLoggerDelegate.applicationLogger,"getSQLTableSet returning: "+ set);
+ Set<Range> rangeSet = new HashSet<>();
+ for (String table: set) {
+ rangeSet.add(new Range(table));
+ }
+ return rangeSet;
+ }
+
/*
mysql> describe tables;
+-----------------+---------------------+------+-----+---------+-------+
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
index ddf26ce..057b550 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
@@ -180,23 +180,30 @@ public class OwnershipAndCheckpoint{
throw new MDBCServiceException("Error applying tx digest in local SQL",e);
}
}
-
- public void warmup(MusicInterface mi, DBInterface di, List<Range> ranges) throws MDBCServiceException {
- if(ranges.isEmpty()){
+
+ /**
+ * Replay the updates for the partitions containing ranges to the local database
+ * @param mi
+ * @param di
+ * @param rangesToWarmup
+ * @throws MDBCServiceException
+ */
+ public void warmup(MusicInterface mi, DBInterface di, List<Range> rangesToWarmup) throws MDBCServiceException {
+ if(rangesToWarmup.isEmpty()){
return;
}
boolean ready = false;
change.set(true);
- Set<Range> rangeSet = new HashSet<Range>(ranges);
+ Set<Range> rangeSet = new HashSet<Range>(rangesToWarmup);
Dag dag = new Dag(false);
while(!ready){
if(change.get()){
change.set(false);
- final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, ranges,false);
- dag = Dag.getDag(rows,ranges);
+ final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, rangesToWarmup,false);
+ dag = Dag.getDag(rows,rangesToWarmup);
}
else if(!dag.applied()){
- DagNode node = dag.nextToApply(ranges);
+ DagNode node = dag.nextToApply(rangesToWarmup);
if(node!=null) {
Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet);
while (pair != null) {
@@ -208,7 +215,7 @@ public class OwnershipAndCheckpoint{
break;
} else {
final StagingTable txDigest = mi.getTxDigest(pair.getKey());
- applyTxDigest(ranges,di, txDigest);
+ applyTxDigest(rangesToWarmup,di, txDigest);
for (Range r : pair.getValue()) {
MusicRangeInformationRow row = node.getRow();
alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index));
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
index 4db3315..5b3872a 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
@@ -67,22 +67,18 @@ public class MusicTxDigest {
continue;
}
//2) for each partition I don't own
- final List<Range> warmuplist = stateManager.getWarmupRanges();
- if(warmuplist!=null) {
- final Set<Range> warmupRanges = new HashSet(warmuplist);
- final List<DatabasePartition> currentPartitions = stateManager.getPartitions();
- List<Range> missingRanges = new ArrayList<>();
- if (currentPartitions.size() != 0) {
- for (DatabasePartition part : currentPartitions) {
- List<Range> partitionRanges = part.getSnapshot();
- warmupRanges.removeAll(partitionRanges);
- }
- try {
- stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
- } catch (MDBCServiceException e) {
- logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
- continue;
- }
+ final Set<Range> warmupRanges = stateManager.getRangesToWarmup();
+ final List<DatabasePartition> currentPartitions = stateManager.getPartitions();
+ if (currentPartitions.size() != 0) {
+ for (DatabasePartition part : currentPartitions) {
+ List<Range> partitionRanges = part.getSnapshot();
+ warmupRanges.removeAll(partitionRanges);
+ }
+ try {
+ stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
+ } catch (MDBCServiceException e) {
+ logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
+ continue;
}
}
diff --git a/mdbc-server/src/main/resources/music.properties b/mdbc-server/src/main/resources/music.properties
index 83dcb7c..0b34ff9 100755
--- a/mdbc-server/src/main/resources/music.properties
+++ b/mdbc-server/src/main/resources/music.properties
@@ -4,5 +4,5 @@ cassandra.user =\
cassandra
cassandra.password =\
cassandra
-zookeeper.host =\
- localhost
+music_namespace =\
+ mdbc_namespace \ No newline at end of file