diff options
11 files changed, 296 insertions, 58 deletions
diff --git a/mdbc-server/pom.xml b/mdbc-server/pom.xml index 874fce7..f69cf2f 100755 --- a/mdbc-server/pom.xml +++ b/mdbc-server/pom.xml @@ -123,6 +123,12 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>2.23.4</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>com.github.jsqlparser</groupId> <artifactId>jsqlparser</artifactId> <version>1.1</version> diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java index 7e39772..ced5745 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java @@ -38,4 +38,10 @@ public class Configuration { public static final String TX_DAEMON_SLEEPTIME_S = "txdaemonsleeps"; /** Default txDigest Daemon sleep time */ public static final String TX_DAEMON_SLEEPTIME_S_DEFAULT = "10"; + /** The property name to use to provide a timeout to mdbc (ownership) */ + public static final String KEY_OWNERSHIP_TIMEOUT = "mdbc_timeout"; + /** The default property value to use for the MDBC timeout */ + public static final long DEFAULT_OWNERSHIP_TIMEOUT = 5*60*60*1000;//default of 5 hours + /** The property name to provide comma separated list of ranges to warmup */ + public static final String KEY_WARMUPRANGES = "warmupranges"; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index 430e783..f0d9832 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -45,6 +45,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -82,21 +83,26 @@ public class StateManager { String cassandraUrl; private Properties info; - /** The property name to use to provide a timeout to mdbc (ownership) */ - public static final String KEY_TIMEOUT = "mdbc_timeout"; - /** The default property value to use for the MDBC timeout */ - public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours - /** Identifier for this server instance */ private String mdbcServerName; private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition private final Lock eventualLock = new ReentrantLock(); private List<Range> eventualRanges; + /** lock for warmupRanges */ private final Lock warmupLock = new ReentrantLock(); - private List<Range> warmupRanges; + /** a set of ranges that should be periodically updated with latest information, if null all tables should be warmed up */ + private Set<Range> rangesToWarmup; + /** map of transactions that have already been applied/updated in this sites SQL db */ private Map<Range, Pair<MriReference, Integer>> alreadyApplied; private OwnershipAndCheckpoint ownAndCheck; + /** + * For testing purposes only + */ + @Deprecated + public StateManager() { + } + public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException { this.sqlDBName = sqlDBName; this.sqlDBUrl = sqlDBUrl; @@ -118,11 +124,14 @@ public class StateManager { initMusic(); initSqlDatabase(); - String t = info.getProperty(KEY_TIMEOUT); - long timeout = (t == null) ? DEFAULT_TIMEOUT : Integer.parseInt(t); + String t = info.getProperty(Configuration.KEY_OWNERSHIP_TIMEOUT); + long timeoutMs = (t == null) ? Configuration.DEFAULT_OWNERSHIP_TIMEOUT : Integer.parseInt(t); alreadyApplied = new ConcurrentHashMap<>(); - ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout); + ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeoutMs); + rangesToWarmup = initWarmupRanges(); + logger.info("Warmup ranges for this site is " + rangesToWarmup); + MusicTxDigest txDaemon = new MusicTxDigest(this); txDaemon.startBackgroundDaemon(Integer.parseInt( info.getProperty(Configuration.TX_DAEMON_SLEEPTIME_S, Configuration.TX_DAEMON_SLEEPTIME_S_DEFAULT))); @@ -160,6 +169,24 @@ public class StateManager { throw new MDBCServiceException(e.getMessage(), e); } } + + /** + * Get list of ranges to warmup from configuration file + * if no configuration is provided, will return null + * @return + */ + private Set<Range> initWarmupRanges() { + String warmupString = info.getProperty(Configuration.KEY_WARMUPRANGES); + if (warmupString==null) { + return null; + } + Set<Range> warmupRanges = new HashSet<>(); + String[] ranges = warmupString.split(","); + for (String range: ranges) { + warmupRanges.add(new Range(range.trim())); + } + return warmupRanges; + } public MusicInterface getMusicInterface() { return this.musicInterface; @@ -169,23 +196,45 @@ public class StateManager { return new ArrayList<>(connectionRanges.values()); } - public List<Range> getWarmupRanges(){ + /** + * Get a list of ranges that are to be periodically warmed up + * + * If no list is specified, all ranges except eventual consistency ranges are returned + * @return + */ + public Set<Range> getRangesToWarmup() { warmupLock.lock(); - List<Range> returnArray; + Set<Range> returnSet; try { - if(warmupRanges!=null) { - returnArray = new ArrayList<>(warmupRanges); + if(rangesToWarmup!=null) { + returnSet = rangesToWarmup; } - else{ - returnArray = null; + else { + returnSet = getAllRanges(); + for (Range eventualRange: eventualRanges) { + returnSet.remove(eventualRange); + } } } finally{ warmupLock.unlock(); } - return returnArray; + return returnSet; } + /** + * Get a set of all ranges seen in the sql db + * @return + */ + private Set<Range> getAllRanges() { + DBInterface dbi = ((MdbcConnection) getConnection("daemon")).getDBInterface(); + return dbi.getSQLRangeSet(); + } + + /** + * Get a list of ranges that are eventually consistent + * @return + */ public List<Range> getEventualRanges() { eventualLock.lock(); List<Range> returnArray; @@ -221,6 +270,10 @@ public class StateManager { this.mdbcServerName = mdbcServerName; } + /** + * Close connection and relinquish any locks held for that connection + * @param connectionId + */ public void closeConnection(String connectionId){ //\TODO check if there is a race condition if(mdbcConnections.containsKey(connectionId)) { @@ -331,10 +384,10 @@ public class StateManager { } - public void setWarmupRanges(List<Range> warmupRanges) { + public void setWarmupRanges(Set<Range> warmupRanges) { warmupLock.lock(); try { - this.warmupRanges = warmupRanges; + this.rangesToWarmup = warmupRanges; } finally{ warmupLock.unlock(); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java index 6a17d4c..fd8651a 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java @@ -50,11 +50,15 @@ public class MdbcTestClient { System.exit(1);
}
Connection connection;
- try {
- connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf");
- } catch (SQLException e) {
- e.printStackTrace();
- return;
+ try {
+ String metricURL = "http://localhost:300000/test";
+ if(args[0] != null) {
+ metricURL = args[0];
+ }
+ connection = DriverManager.getConnection("jdbc:avatica:remote:url=" + metricURL+ ";serialization=protobuf");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ return;
}
try {
@@ -104,7 +108,7 @@ public class MdbcTestClient { }
final String insertSQL = "INSERT INTO Persons VALUES (1, 'Martinez', 'Juan', 'KACB', 'ATLANTA');";
- final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=1;";
+ final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=2;";
final String insertSQL2 = "INSERT INTO Persons VALUES (2, 'Smith', 'JOHN', 'GNOC', 'BEDMINSTER');";
final String insertSQL3 = "UPDATE Persons SET FirstName='JOSH' WHERE LastName='Smith';";
final String insertSQL4 = "UPDATE Persons SET FirstName='JOHN' WHERE LastName='Smith';";
@@ -119,11 +123,11 @@ public class MdbcTestClient { }
try {
- execute = insertStmt.execute(insertSQL);
+ //execute = insertStmt.execute(insertSQL);
execute = insertStmt.execute(insertSQL1);
- execute = insertStmt.execute(insertSQL2);
- execute = insertStmt.execute(insertSQL3);
- execute = insertStmt.execute(insertSQL4);
+ //execute = insertStmt.execute(insertSQL2);
+ //execute = insertStmt.execute(insertSQL3);
+ //execute = insertStmt.execute(insertSQL4);
} catch (SQLException e) {
e.printStackTrace();
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java index 85645f3..5b72482 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java @@ -49,8 +49,14 @@ public interface DBInterface { * Get a set of the table names in the database. The table names should be returned in UPPER CASE. * @return the set */ + @Deprecated Set<String> getSQLTableSet(); /** + * Get a set of the ranges in the database + * @return the set + */ + Set<Range> getSQLRangeSet(); + /** * Return the name of the database that the driver is connected to * @return */ diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java index 420f9d4..7154b34 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java @@ -27,6 +27,7 @@ import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -150,10 +151,7 @@ public class MySQLMixin implements DBInterface { public String getDatabaseName() { return this.dbName; } - /** - * Get a set of the table names in the database. - * @return the set - */ + @Override public Set<String> getSQLTableSet() { Set<String> set = new TreeSet<String>(); @@ -172,6 +170,30 @@ public class MySQLMixin implements DBInterface { logger.debug(EELFLoggerDelegate.applicationLogger,"getSQLTableSet returning: "+ set); return set; } + + @Override + public Set<Range> getSQLRangeSet() { + Set<String> set = new TreeSet<String>(); + String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'"; + try { + Statement stmt = jdbcConn.createStatement(); + ResultSet rs = stmt.executeQuery(sql); + while (rs.next()) { + String s = rs.getString("TABLE_NAME"); + set.add(s); + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger,"getSQLTableSet: "+e); + } + logger.debug(EELFLoggerDelegate.applicationLogger,"getSQLTableSet returning: "+ set); + Set<Range> rangeSet = new HashSet<>(); + for (String table: set) { + rangeSet.add(new Range(table)); + } + return rangeSet; + } + /* mysql> describe tables; +-----------------+---------------------+------+-----+---------+-------+ diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java index ddf26ce..057b550 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java @@ -180,23 +180,30 @@ public class OwnershipAndCheckpoint{ throw new MDBCServiceException("Error applying tx digest in local SQL",e); } } - - public void warmup(MusicInterface mi, DBInterface di, List<Range> ranges) throws MDBCServiceException { - if(ranges.isEmpty()){ + + /** + * Replay the updates for the partitions containing ranges to the local database + * @param mi + * @param di + * @param rangesToWarmup + * @throws MDBCServiceException + */ + public void warmup(MusicInterface mi, DBInterface di, List<Range> rangesToWarmup) throws MDBCServiceException { + if(rangesToWarmup.isEmpty()){ return; } boolean ready = false; change.set(true); - Set<Range> rangeSet = new HashSet<Range>(ranges); + Set<Range> rangeSet = new HashSet<Range>(rangesToWarmup); Dag dag = new Dag(false); while(!ready){ if(change.get()){ change.set(false); - final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, ranges,false); - dag = Dag.getDag(rows,ranges); + final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, rangesToWarmup,false); + dag = Dag.getDag(rows,rangesToWarmup); } else if(!dag.applied()){ - DagNode node = dag.nextToApply(ranges); + DagNode node = dag.nextToApply(rangesToWarmup); if(node!=null) { Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet); while (pair != null) { @@ -208,7 +215,7 @@ public class OwnershipAndCheckpoint{ break; } else { final StagingTable txDigest = mi.getTxDigest(pair.getKey()); - applyTxDigest(ranges,di, txDigest); + applyTxDigest(rangesToWarmup,di, txDigest); for (Range r : pair.getValue()) { MusicRangeInformationRow row = node.getRow(); alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index)); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java index 4db3315..5b3872a 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java @@ -67,22 +67,18 @@ public class MusicTxDigest { continue; } //2) for each partition I don't own - final List<Range> warmuplist = stateManager.getWarmupRanges(); - if(warmuplist!=null) { - final Set<Range> warmupRanges = new HashSet(warmuplist); - final List<DatabasePartition> currentPartitions = stateManager.getPartitions(); - List<Range> missingRanges = new ArrayList<>(); - if (currentPartitions.size() != 0) { - for (DatabasePartition part : currentPartitions) { - List<Range> partitionRanges = part.getSnapshot(); - warmupRanges.removeAll(partitionRanges); - } - try { - stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges)); - } catch (MDBCServiceException e) { - logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage()); - continue; - } + final Set<Range> warmupRanges = stateManager.getRangesToWarmup(); + final List<DatabasePartition> currentPartitions = stateManager.getPartitions(); + if (currentPartitions.size() != 0) { + for (DatabasePartition part : currentPartitions) { + List<Range> partitionRanges = part.getSnapshot(); + warmupRanges.removeAll(partitionRanges); + } + try { + stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges)); + } catch (MDBCServiceException e) { + logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage()); + continue; } } diff --git a/mdbc-server/src/main/resources/music.properties b/mdbc-server/src/main/resources/music.properties index 83dcb7c..0b34ff9 100755 --- a/mdbc-server/src/main/resources/music.properties +++ b/mdbc-server/src/main/resources/music.properties @@ -4,5 +4,5 @@ cassandra.user =\ cassandra cassandra.password =\ cassandra -zookeeper.host =\ - localhost +music_namespace =\ + mdbc_namespace
\ No newline at end of file diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/StateManagerTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/StateManagerTest.java new file mode 100644 index 0000000..899fff2 --- /dev/null +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/StateManagerTest.java @@ -0,0 +1,137 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ +package org.onap.music.mdbc; + +import static org.junit.Assert.*; +import java.sql.Connection; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.internal.util.reflection.FieldSetter; +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.mdbc.mixins.DBInterface; +import org.onap.music.mdbc.tables.TxCommitProgress; + +public class StateManagerTest { + + StateManager stateManager; + + @BeforeClass + public static void beforeClass() { + System.out.println("StateManagerTest"); + } + + @Before + public void before() throws MDBCServiceException { + //shouldn't use separate constructor, but this will do for now + stateManager = new StateManager(); + } + + @Test + public void testGetEventualRanges() throws NoSuchFieldException, SecurityException { + List<Range> evList = new ArrayList<>(); + evList.add(new Range("eventualRange")); + FieldSetter.setField(stateManager, stateManager.getClass().getDeclaredField("eventualRanges"), evList); + assertEquals(evList, stateManager.getEventualRanges()); + } + + @Test + public void testSetEventualRanges() { + List<Range> evList = new ArrayList<>(); + evList.add(new Range("eventualRange")); + stateManager.setEventualRanges(evList); + assertEquals(evList, stateManager.getEventualRanges()); + } + + @Test + public void testSetMdbcServerName() { + String serverName = "serverName"; + stateManager.setMdbcServerName(serverName); + assertEquals(serverName, stateManager.getMdbcServerName()); + } + + @Test + public void testGetConnection() throws Exception { + System.out.println("Testing getting a connection"); + + Connection connMock = Mockito.mock(Connection.class); + String connName = "connectionName"; + Map<String, Connection> connMap = new HashMap<>(); + connMap.put(connName, connMock); + FieldSetter.setField(stateManager, stateManager.getClass().getDeclaredField("mdbcConnections"), + connMap); + + TxCommitProgress txInfoMock = Mockito.mock(TxCommitProgress.class); + FieldSetter.setField(stateManager, stateManager.getClass().getDeclaredField("transactionInfo"), + txInfoMock); + + + assertEquals(connMock, stateManager.getConnection(connName)); + } + + @Test + public void testGetRangesToWarmup() throws Exception { + System.out.println("Testing warmup ranges where no ranges are defined"); + + //getConnection + MdbcConnection connMock = Mockito.mock(MdbcConnection.class); + String connName = "daemon"; + Map<String, Connection> connMap = new HashMap<>(); + connMap.put(connName, connMock); + FieldSetter.setField(stateManager, stateManager.getClass().getDeclaredField("mdbcConnections"), + connMap); + TxCommitProgress txInfoMock = Mockito.mock(TxCommitProgress.class); + FieldSetter.setField(stateManager, stateManager.getClass().getDeclaredField("transactionInfo"), + txInfoMock); + + DBInterface dbiMock = Mockito.mock(DBInterface.class); + Mockito.when(connMock.getDBInterface()).thenReturn(dbiMock); + Set<Range> allRanges = new HashSet<>(); + allRanges.add(new Range("rangeToWarmup")); + allRanges.add(new Range("rangeToWarmup2")); + allRanges.add(new Range("eventualRange")); + Mockito.when(dbiMock.getSQLRangeSet()).thenReturn(allRanges); + + List<Range> eventualRanges = new ArrayList<Range>(); + eventualRanges.add(new Range("eventualRange")); + stateManager.setEventualRanges(eventualRanges); + + assertEquals(2, stateManager.getRangesToWarmup().size()); + assertTrue(stateManager.getRangesToWarmup().contains(new Range("rangeToWarmup"))); + assertTrue(stateManager.getRangesToWarmup().contains(new Range("rangeToWarmup2"))); + } + + @Test + public void testSetWarmupRanges() { + Set<Range> warmupRanges = new HashSet<>(); + warmupRanges.add(new Range("rangeToWarmup")); + warmupRanges.add(new Range("rangeToWarmup2")); + stateManager.setWarmupRanges(warmupRanges); + assertEquals(warmupRanges, stateManager.getRangesToWarmup()); + } + +} diff --git a/mdbc-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/mdbc-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000..1f0955d --- /dev/null +++ b/mdbc-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline |