diff options
37 files changed, 964 insertions, 475 deletions
diff --git a/mdbc-server/pom.xml b/mdbc-server/pom.xml index 7a46120..42fb9f8 100755 --- a/mdbc-server/pom.xml +++ b/mdbc-server/pom.xml @@ -180,8 +180,8 @@ </dependency> <dependency> <groupId>org.onap.music</groupId> - <artifactId>dev-MUSIC-cassandra</artifactId> - <version>3.2.1</version> + <artifactId>MUSIC-core</artifactId> + <version>3.2.37-SNAPSHOT</version> <exclusions> <exclusion> <groupId>io.netty</groupId> @@ -211,7 +211,13 @@ <version>0.13.1</version> <scope>test</scope> </dependency> - + <dependency> + <groupId>javax.websocket</groupId> + <artifactId>javax.websocket-api</artifactId> + <version>1.1</version> + <scope>provided</scope> + </dependency> + </dependencies> diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java index 91b13f3..efe4c21 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java @@ -48,4 +48,8 @@ public class Configuration { public static final String KEY_WARMUPRANGES = "warmupranges"; /** Default async staging table update o ption*/ public static final String ASYNC_STAGING_TABLE_UPDATE = "false"; + /** The property name to determine if only write locks are allowed */ + public static final String KEY_WRITE_LOCKS_ONLY = "write_locks_only"; + /** Default if only write locks are allowed */ + public static final Boolean WRITE_LOCK_ONLY_DEFAULT = false; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java index 314248f..4122623 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java @@ -48,12 +48,18 @@ public class DatabasePartition { * The only requirement is that the ranges are not overlapping. */ - public DatabasePartition() { - this(new HashSet<Range>(),null,""); - } public DatabasePartition(UUID mriIndex) { - this(new HashSet<Range>(), mriIndex,""); + this(new HashSet<Range>(), mriIndex); + } + + /** + * Create unlocked partition + * @param ranges + * @param mriIndex + */ + public DatabasePartition(Set<Range> ranges, UUID mriIndex) { + this(ranges, mriIndex, null); } public DatabasePartition(Set<Range> knownRanges, UUID mriIndex, String lockId) { @@ -90,7 +96,9 @@ public class DatabasePartition { } - public synchronized boolean isLocked(){return lockId != null && !lockId.isEmpty(); } + public synchronized boolean isLocked(){ + return lockId != null && !lockId.isEmpty(); + } public synchronized boolean isReady() { return ready; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java index b60062e..ee742f8 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java @@ -49,6 +49,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.json.JSONObject; public class MDBCUtils { + public static boolean writeLocksOnly = false; public static void saveToFile(String serializedContent, String filename, EELFLoggerDelegate logger) throws IOException { try (PrintWriter fout = new PrintWriter(filename)) { @@ -129,6 +130,10 @@ public class MDBCUtils { * @return write if any table has a write query. Read otherwise */ public static SQLOperationType getOperationType(Map<String, List<SQLOperation>> tableToQueryType) { + if (writeLocksOnly) { + return SQLOperationType.WRITE; + } + for (List<org.onap.music.mdbc.query.SQLOperation> tablesOps : tableToQueryType.values()) { for (org.onap.music.mdbc.query.SQLOperation op : tablesOps) { if (op.getOperationType() != SQLOperationType.READ) { diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java index 2294673..1707c03 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java @@ -44,6 +44,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Executor; import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.exceptions.MusicDeadlockException; import org.onap.music.exceptions.QueryException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.logging.format.AppMessages; @@ -61,6 +62,7 @@ import org.onap.music.mdbc.query.QueryProcessor; import org.onap.music.mdbc.query.SQLOperation; import org.onap.music.mdbc.query.SQLOperationType; import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.TxCommitProgress; @@ -82,9 +84,14 @@ public class MdbcConnection implements Connection { private final TxCommitProgress progressKeeper; private final DBInterface dbi; private final StagingTable transactionDigest; + /** Set of tables in db */ private final Set<String> table_set; private final StateManager statemanager; + /** partition owned for this transaction */ private DatabasePartition partition; + /** ranges needed for this transaction */ + private Set<Range> rangesUsed; + private String ownerId = UUID.randomUUID().toString(); public MdbcConnection(String id, String url, Connection c, Properties info, MusicInterface mi, TxCommitProgress progressKeeper, DatabasePartition partition, StateManager statemanager) throws MDBCServiceException { @@ -187,9 +194,18 @@ public class MdbcConnection implements Connection { dbi.preCommitHook(); try { + partition = mi.splitPartitionIfNecessary(partition, rangesUsed, ownerId); + } catch (MDBCServiceException e) { + logger.warn(EELFLoggerDelegate.errorLogger, + "Failure to split partition '" + partition.getMRIIndex() + "' trying to continue", + AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL); + } + + try { logger.debug(EELFLoggerDelegate.applicationLogger, " commit "); // transaction was committed -- add all the updates into the REDO-Log in MUSIC - mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper); + MusicTxDigestId digestCreated = mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper); + statemanager.getOwnAndCheck().updateAlreadyApplied(mi, dbi, partition.getSnapshot(), partition.getMRIIndex(), digestCreated); } catch (MDBCServiceException e) { //If the commit fail, then a new commitId should be used logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL); @@ -519,19 +535,17 @@ public class MdbcConnection implements Connection { //Check ownership of keys String defaultSchema = dbi.getSchema(); Set<Range> queryTables = MDBCUtils.getTables(defaultSchema, tableToQueryType); - if (this.partition!=null) { - Set<Range> snapshot = this.partition.getSnapshot(); - if(snapshot!=null){ - queryTables.addAll(snapshot); - } + if (this.rangesUsed==null) { + rangesUsed = queryTables; + } else { + rangesUsed.addAll(queryTables); } // filter out ranges that fall under Eventually consistent // category as these tables do not need ownership - Set<Range> scQueryTables = filterEveTables(queryTables); - DatabasePartition tempPartition = own(scQueryTables, MDBCUtils.getOperationType(tableToQueryType)); + Set<Range> scRanges = filterEveTables(rangesUsed); + DatabasePartition tempPartition = own(scRanges, MDBCUtils.getOperationType(tableToQueryType)); if(tempPartition!=null && tempPartition != partition) { this.partition.updateDatabasePartition(tempPartition); - statemanager.getOwnAndCheck().reloadAlreadyApplied(this.partition); } dbi.preStatementHook(sql); } @@ -599,22 +613,31 @@ public class MdbcConnection implements Connection { OwnershipAndCheckpoint ownAndCheck = statemanager.getOwnAndCheck(); UUID ownOpId = MDBCUtils.generateTimebasedUniqueKey(); try { - final OwnershipReturn ownershipReturn = ownAndCheck.own(mi, ranges, partition, ownOpId, lockType); + final OwnershipReturn ownershipReturn = ownAndCheck.own(mi, ranges, partition, ownOpId, lockType, ownerId); if(ownershipReturn==null){ return null; } Dag dag = ownershipReturn.getDag(); if(dag!=null) { - DagNode node = dag.getNode(ownershipReturn.getRangeId()); - MusicRangeInformationRow row = node.getRow(); - Map<MusicRangeInformationRow, LockResult> lock = new HashMap<>(); - lock.put(row, new LockResult(row.getPartitionIndex(), ownershipReturn.getOwnerId(), true, ranges)); - ownAndCheck.checkpoint(this.mi, this.dbi, dag, ranges, lock, ownershipReturn.getOwnershipId()); + ownAndCheck.checkpoint(this.mi, this.dbi, dag, ranges, ownershipReturn.getOwnershipId()); + //TODO: need to update pointer in alreadyapplied if a merge happened instead of in prestatement hook newPartition = new DatabasePartition(ownershipReturn.getRanges(), ownershipReturn.getRangeId(), - ownershipReturn.getOwnerId()); + ownershipReturn.getLockId()); } - } - finally{ + } catch (MDBCServiceException e) { + MusicDeadlockException de = Utils.getDeadlockException(e); + if (de!=null) { + //release all partitions + mi.releaseAllLocksForOwner(de.getOwner(), de.getKeyspace(), de.getTable()); + //rollback transaction + try { + rollback(); + } catch (SQLException e1) { + throw new MDBCServiceException("Failed to rollback transaction after detecting deadlock while taking ownership of table, which, wow", e1); + } + } + throw e; + } finally { ownAndCheck.stopOwnershipTimeoutClock(ownOpId); } return newPartition; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java index 500ed81..246044b 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java @@ -101,6 +101,7 @@ public class MdbcServer { // Then start it server.start(); + System.out.println("Started Avatica server on port " + server.getPort()); logger.info("Started Avatica server on port {} with serialization {}", server.getPort(), serialization); } catch (Exception e) { diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index 66c8fa9..fb39637 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -36,6 +36,7 @@ import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn; import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint; import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MusicTxDigestDaemon; +import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.TxCommitProgress; import java.io.IOException; @@ -92,7 +93,7 @@ public class StateManager { /** a set of ranges that should be periodically updated with latest information, if null all tables should be warmed up */ private Set<Range> rangesToWarmup; /** map of transactions that have already been applied/updated in this sites SQL db */ - private Map<Range, Pair<MriReference, Integer>> alreadyApplied; + private Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied; private OwnershipAndCheckpoint ownAndCheck; private Thread txDaemon ; @@ -114,6 +115,7 @@ public class StateManager { //\fixme this might not be used, delete? try { info.load(this.getClass().getClassLoader().getResourceAsStream("music.properties")); + info.load(this.getClass().getClassLoader().getResourceAsStream("key.properties")); info.putAll(MDBCUtils.getMdbcProperties()); } catch (IOException e) { logger.error(EELFLoggerDelegate.errorLogger, e.getMessage()); @@ -122,13 +124,17 @@ public class StateManager { cassandraUrl = info.getProperty(Configuration.KEY_CASSANDRA_URL, Configuration.CASSANDRA_URL_DEFAULT); musicmixin = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT); + String writeLocksOnly = info.getProperty(Configuration.KEY_WRITE_LOCKS_ONLY); + MDBCUtils.writeLocksOnly = (writeLocksOnly==null) ? Configuration.WRITE_LOCK_ONLY_DEFAULT : Boolean.parseBoolean(writeLocksOnly); + initMusic(); - initSqlDatabase(); - initTxDaemonThread(); + Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = initSqlDatabase(); + String t = info.getProperty(Configuration.KEY_OWNERSHIP_TIMEOUT); long timeout = (t == null) ? Configuration.DEFAULT_OWNERSHIP_TIMEOUT : Integer.parseInt(t); - alreadyApplied = new ConcurrentHashMap<>(); ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout); + + initTxDaemonThread(); } protected String cleanSqlUrl(String url){ @@ -160,7 +166,12 @@ public class StateManager { this.mdbcConnections = new HashMap<>(); } - protected void initSqlDatabase() throws MDBCServiceException { + /** + * Do everything necessary to initialize the sql database + * @return the current checkpoint location of this database, if restarting + * @throws MDBCServiceException + */ + protected Map<Range, Pair<MriReference, MusicTxDigestId>> initSqlDatabase() throws MDBCServiceException { if(!this.sqlDBUrl.toLowerCase().startsWith("jdbc:postgresql")) { try { Connection sqlConnection = DriverManager.getConnection(this.sqlDBUrl, this.info); @@ -178,16 +189,21 @@ public class StateManager { } } - // Verify the tables in MUSIC match the tables in the database - // and create triggers on any tables that need them + Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyAppliedToDb = null; try { MdbcConnection mdbcConn = (MdbcConnection) openConnection("init"); mdbcConn.initDatabase(); + alreadyAppliedToDb = mdbcConn.getDBInterface().getCheckpointLocations(); closeConnection("init"); } catch (QueryException e) { - logger.error("Error syncrhonizing tables"); + logger.error("Error initializing sql database tables"); logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL); } + + if (alreadyAppliedToDb==null) { + alreadyAppliedToDb = new ConcurrentHashMap<>(); + } + return alreadyAppliedToDb; } /** @@ -309,11 +325,9 @@ public class StateManager { } mdbcConnections.remove(connectionId); } - if(connectionRanges.containsKey(connectionId)){ - //We relinquish all locks obtained by a given - //relinquish(connectionRanges.get(connectionId)); - connectionRanges.remove(connectionId); - } + + connectionRanges.remove(connectionId); + } /** @@ -334,18 +348,12 @@ public class StateManager { ErrorTypes.QUERYERROR); sqlConnection = null; } - //check if a range was already created for this connection - //TODO: later we could try to match it to some more sticky client id - DatabasePartition ranges; - if(connectionRanges.containsKey(id)){ - ranges=connectionRanges.get(id); - } - else{ - //TODO: we don't need to create a partition for each connection - ranges=new DatabasePartition(musicInterface.generateUniqueKey()); - connectionRanges.put(id,ranges); - } - //Create MDBC connection + + //TODO: later we could try to match it to some more sticky client id + DatabasePartition ranges=new DatabasePartition(musicInterface.generateUniqueKey()); + connectionRanges.put(id,ranges); + + //Create MDBC connection try { newConnection = new MdbcConnection(id,this.sqlDBUrl+"/"+this.sqlDBName, sqlConnection, info, this.musicInterface, transactionInfo,ranges, this); @@ -414,7 +422,7 @@ public class StateManager { * Close all connections for this server, relinquishing any locks/partitions owned by this server */ public void releaseAllPartitions() { - for(String connection: this.connectionRanges.keySet()) { + for(String connection: this.mdbcConnections.keySet()) { closeConnection(connection); } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java index 496f48d..3dcfaf0 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java @@ -34,6 +34,7 @@ import java.io.InputStream; import java.util.*; import org.onap.music.mdbc.mixins.MusicInterface; import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.service.impl.MusicCassaCore; public class TestUtils { @@ -48,14 +49,14 @@ public class TestUtils { new MusicRangeInformationRow(dbPartition, new ArrayList<>(), true); MusicRangeInformationRow newRow = new MusicRangeInformationRow(dbPartition, new ArrayList<>(), true); DatabasePartition partition=null; - partition = mixin.createLockedMRIRow(newRow); + partition = mixin.createLockedMRIRow(newRow, ""); return partition; } public static void unlockRow(String keyspace, String mriTableName, DatabasePartition partition) throws MusicLockingException { String fullyQualifiedMriKey = keyspace+"."+ mriTableName+"."+partition.getMRIIndex().toString(); - MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId()); + MusicLockState musicLockState = MusicCassaCore.getInstance().voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId()); } public static void createKeyspace(String keyspace, Session session) { @@ -142,7 +143,7 @@ public class TestUtils { } } - +/* public static void populateMusicUtilsWithProperties(Properties prop){ //TODO: Learn how to do this properly within music String[] propKeys = MusicUtil.getPropkeys(); @@ -207,6 +208,6 @@ public class TestUtils { } } - } +*/ } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java index 7a09dca..f4f4820 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Properties; + +import org.onap.music.exceptions.MusicDeadlockException; import org.onap.music.logging.EELFLoggerDelegate; public class Utils { @@ -77,4 +79,12 @@ public class Utils { } } } + + public static MusicDeadlockException getDeadlockException(Throwable t) { + while (t!=null) { + if (t instanceof MusicDeadlockException) return (MusicDeadlockException)t; + t = t.getCause(); + } + return null; + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestMultiClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestMultiClient.java index 7320d34..02b7c7c 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestMultiClient.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestMultiClient.java @@ -49,6 +49,14 @@ public class MdbcTestMultiClient implements Runnable { private int selectInsteadOfUpdatePct = 25;
private int rollbackChancePct = 15;
private int maxTables = 0;
+ public static boolean[] threadsDone;
+
+
+ private boolean sequentialIds = false;
+ private static Integer currentId = -1;
+
+ private boolean sequentialFirsts = false;
+ private static Integer currentFirstFirst = 0, currentFirstSecond = 0;
private Long randomSeed = null;
@@ -56,6 +64,7 @@ public class MdbcTestMultiClient implements Runnable { private static final List<String> defaultTableNames = Arrays.asList(new String[] {"persons", "persons2"});
private boolean explainConnection = true;
+ private boolean endInSelect = true;
public static class Employee {
public final int empid;
@@ -164,12 +173,12 @@ public class MdbcTestMultiClient implements Runnable { break;
case "-u":
case "--update":
- currState = 'u';
+ doUpdate = false;
break;
case "-x":
case "--delete":
- currState = 'x';
- break;
+ doDelete = false;
+ break;
case "-l":
case "--closeChance":
currState = 'l';
@@ -192,6 +201,12 @@ public class MdbcTestMultiClient implements Runnable { case "--randomSeed":
currState = '?';
break;
+ case "--sequentialId":
+ sequentialIds = true;
+ break;
+ case "--sequentialFirst":
+ sequentialFirsts = true;
+ break;
default:
System.out.println("Didn't understand switch " + arg);
}
@@ -225,12 +240,6 @@ public class MdbcTestMultiClient implements Runnable { case 'a':
additionalDelayBetweenTestsMs = Integer.parseInt(arg);
break;
- case 'u':
- doUpdate = arg.toUpperCase().startsWith("Y");
- break;
- case 'x':
- doDelete = arg.toUpperCase().startsWith("Y");
- break;
case 'l':
connectionCloseChancePct = Integer.parseInt(arg);
break;
@@ -265,23 +274,25 @@ public class MdbcTestMultiClient implements Runnable { private void showHelp() {
System.out.println(
"-?; --help: Show help\n" +
- "-c; --connection: MDBC connection string, may appear multiple times\n" +
- "-e; --tableName: Table name, may appear multiple times\n" +
- "-n; --name: Last name in persons table, default \"Lastname\"\n" +
- "-b; --baseId: Base ID, default 700\n" +
- "-r; --baseRange: Range of ID, default 50\n" +
- "-m; --maxCalls: Max number of commits (each may be 1+ updates), default 50\n" +
- "-t; --maxTime: Max time in ms test will run, default 60000\n" +
- "-d; --minDelay: Min delay between tests in ms, default 1000\n" +
- "-a; --addDelay: Max randomized additional delay between tests in ms, default 1000\n" +
- "-u; --update: Generate update statements; default Y\n" +
- "-x; --delete: Generate delete statements; default Y\n" +
- "-l; --closeChance: Percent chance of closing connection after each commit, default 50\n" +
- "-s; --skipInitialSelect: Percent chance of skipping each initial select in a transaction, default 25\n" +
- "-i; --selectNotUpdate: Percent chance of each action in a transaction being a select instead of an update, default 25\n" +
- "-o; --rollbackChance: Percent chance of rolling back each transaction instead of committing, default 15\n" +
- " --maxTables: Maximum number of tables per transaction, default 0 (no limit)\n" +
- " --randomSeed: Seed for the initial random number generator, default none\n" +
+ "-c; --connection [string]: MDBC connection string, may appear multiple times\n" +
+ "-e; --tableName [string]: Table name, may appear multiple times\n" +
+ "-n; --name [string]: Last name in persons table, default \"Lastname\"\n" +
+ "-b; --baseId [int]: Base ID, default 700\n" +
+ "-r; --baseRange [int]: Range of ID, default 50\n" +
+ "-m; --maxCalls [int]: Max number of commits (each may be 1+ updates), default 50\n" +
+ "-t; --maxTime [int]: Max time in ms test will run, default 60000\n" +
+ "-d; --minDelay [int]: Min delay between tests in ms, default 1000\n" +
+ "-a; --addDelay [int]: Max randomized additional delay between tests in ms, default 1000\n" +
+ "-u; --update: Don't generate update statements; default do\n" +
+ "-x; --delete: Don't generate delete statements; default do\n" +
+ "-l; --closeChance [int]: Percent chance of closing connection after each commit, default 50\n" +
+ "-s; --skipInitialSelect [int]: Percent chance of skipping each initial select in a transaction, default 25\n" +
+ "-i; --selectNotUpdate [int]: Percent chance of each action in a transaction being a select instead of an update, default 25\n" +
+ "-o; --rollbackChance [int]: Percent chance of rolling back each transaction instead of committing, default 15\n" +
+ " --maxTables [int]: Maximum number of tables per transaction, default 0 (no limit)\n" +
+ " --randomSeed [long]: Seed for the initial random number generator, default none (generate random random seed)\n" +
+ " --sequentialId: Generate sequential IDs instead of random ones (default random)\n" +
+ " --sequentialFirst: Generate alphabetically sequential first names (default completely random) \n" +
""
);
@@ -307,6 +318,8 @@ public class MdbcTestMultiClient implements Runnable { this.selectInsteadOfUpdatePct = that.selectInsteadOfUpdatePct;
this.rollbackChancePct = that.rollbackChancePct;
this.maxTables = that.maxTables;
+ this.sequentialIds = that.sequentialIds;
+ this.sequentialFirsts = that.sequentialFirsts;
}
private void setRandomSeed(Long randomSeed) {
@@ -350,6 +363,8 @@ public class MdbcTestMultiClient implements Runnable { // doLog("PersonId = " + rs.getInt("personId") + ", lastname = " + rs.getString("lastname") + ", firstname = " + rs.getString("firstname"));
Employee emp = new Employee(rs.getInt("personId"), rs.getString("lastname"), rs.getString("firstname"), rs.getString("address"), rs.getString("city"));
employeeMap.put(rs.getInt("personId"), emp);
+ if (sequentialIds) updateId(rs.getInt("personId"));
+ if (sequentialFirsts) updateFirst(rs.getString("firstname"));
doLog("Found: " + emp);
}
querySt.close();
@@ -388,7 +403,32 @@ public class MdbcTestMultiClient implements Runnable { insertStmt.close();
}
- private List<String> chooseTableNames(Random r) {
+ private void updateFirst(String firstName) {
+ if (firstName==null || firstName.length()<2) return;
+ synchronized(currentFirstFirst) {
+// return (char)(65+currentFirstFirst) + "" + (char)(97+currentFirstSecond) + generateLetters(r, 4+r.nextInt(4));
+ int ff = ((int)firstName.charAt(0))-65;
+ int fs = ((int)firstName.charAt(1))-97;
+ if (ff>=26 || ff<0 || fs>=26 || fs<0) return;
+ if ( (ff>currentFirstFirst) || (ff==currentFirstFirst && fs>currentFirstSecond) ) {
+ currentFirstFirst = ff;
+ currentFirstSecond = fs;
+ doLog("Saw " + firstName + ", updating currentFirstName to " + currentFirstFirst + ", " + currentFirstSecond);
+ }
+ }
+
+ }
+
+ private void updateId(int id) {
+ synchronized(currentId) {
+ if (currentId<=id) {
+ currentId = id+1;
+ doLog ("Saw " + id + ", updating current id");
+ }
+ }
+ }
+
+ private List<String> chooseTableNames(Random r) {
if (maxTables<=0 || maxTables>=tableNames.size()) return tableNames;
boolean[] useTable = new boolean[tableNames.size()];
for (int i=0; i<tableNames.size(); i++) useTable[i] = false;
@@ -462,33 +502,63 @@ public class MdbcTestMultiClient implements Runnable { private String generateInsert(HashMap<Integer, Employee> employeeMap, Random r, String tableName) {
String toRet = null;
- Integer id = null;
- int range = baseIdRange;
- while (id==null) {
- id = baseId + r.nextInt(range);
- if (employeeMap!=null && employeeMap.containsKey(id)) id = null;
- if (employeeMap==null) id+=baseIdRange;
- range+=(baseIdRange/5);
- }
- Employee newEmp = new Employee(id, lastName, Character.toUpperCase(randomLetter(r)) + generateLetters(r, 4+r.nextInt(4)), generateLetters(r, 4).toUpperCase(), generateLetters(r, 4).toUpperCase());
+ Integer id = generateId(employeeMap, r);
+ Employee newEmp = new Employee(id, lastName, generateFirstName(r), generateLetters(r, 4).toUpperCase(), generateLetters(r, 4).toUpperCase());
+// Employee newEmp = new Employee(id, lastName, Character.toUpperCase(randomLetter(r)) + generateLetters(r, 4+r.nextInt(4)), generateLetters(r, 4).toUpperCase(), generateLetters(r, 4).toUpperCase());
toRet = "insert into " + tableName + " values (" + id + ", '" + newEmp.getLastname() + "', '" + newEmp.getFirstname() + "', '" + newEmp.getAddress() + "', '" + newEmp.getCity() + "')";
if (employeeMap!=null) employeeMap.put(id, newEmp);
return toRet;
}
+ private String generateFirstName(Random r) {
+ if (sequentialFirsts) {
+ synchronized(currentFirstFirst) {
+ currentFirstSecond++;
+ if (currentFirstSecond==26) {
+ currentFirstSecond = 0;
+ currentFirstFirst++;
+ if (currentFirstFirst==26) currentFirstFirst=0;
+ }
+ return (char)(65+currentFirstFirst) + "" + (char)(97+currentFirstSecond) + generateLetters(r, 4+r.nextInt(4));
+ }
+ } else {
+ return Character.toUpperCase(randomLetter(r)) + generateLetters(r, 4+r.nextInt(4));
+ }
+ }
+
+ private Integer generateId(HashMap<Integer, Employee> employeeMap, Random r) {
+ Integer toRet = null;
+ if (currentId<0 && baseId>0) currentId = baseId; // setup, only matters if sequentialIds is true
+ int range = baseIdRange; // setup, only matters if sequentialIds is false
+ while (toRet==null) {
+ if (sequentialIds) {
+ synchronized(currentId) {
+ toRet = currentId++;
+ }
+ } else {
+ toRet = baseId + r.nextInt(range);
+ if (employeeMap==null) toRet+=baseIdRange;
+ range+=(baseIdRange/5);
+ }
+ if (employeeMap!=null && employeeMap.containsKey(toRet)) toRet = null;
+ }
+ return toRet;
+ }
+
private String generateUpdate(HashMap<Integer, Employee> employeeMap, Random r, String tableName) {
String toRet = null;
Employee toUpd = chooseTarget(employeeMap, r);
if (toUpd!=null) {
String newFirst = null;
- if (toUpd.getFirstname().length()<=3 || r.nextBoolean()) {
+ if (sequentialFirsts) {
+ newFirst = generateFirstName(r);
+ } else if (toUpd.getFirstname().length()<=3 || r.nextBoolean()) {
newFirst = toUpd.getFirstname() + randomLetter(r);
} else {
newFirst = toUpd.getFirstname().substring(0, toUpd.getFirstname().length()-1);
}
-// toRet = "update " + tableName + " set firstname = '" + newFirst + "' where personid = " + toUpd.getEmpid();
toRet = "update " + tableName + " set firstname = '" + newFirst + "' where personid = " + toUpd.getEmpid() + " and lastname = '" + toUpd.getLastname() + "'";
toUpd.setFirstname(newFirst);
}
@@ -649,7 +719,44 @@ public class MdbcTestMultiClient implements Runnable { doLog("");
}
+ threadsDone[threadId] = true;
+ if (endInSelect) {
+ doLog("Ending in select to ensure all db's are in sync");
+ while (!allThreadsDone()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ continue;
+ }
+ }
+ doLog("All threads are done. Ending in select");
+ if (connection==null) {
+ try {
+ doLog("Opening new connection");
+ connection = DriverManager.getConnection(connectionString);
+ connection.setAutoCommit(false);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+ for (String tableName : tableNames) {
+ try {
+ Statement querySt = connection.createStatement();
+ ResultSet rs = executeQueryTimed("select * from " + tableName, querySt, false);
+ while (rs.next()) {
+ // doLog("PersonId = " + rs.getInt("personId") + ", lastname = " + rs.getString("lastname") + ", firstname = " + rs.getString("firstname"));
+ Employee emp = new Employee(rs.getInt("personId"), rs.getString("lastname"), rs.getString("firstname"), rs.getString("address"), rs.getString("city"));
+ doLog("Found: " + emp);
+ }
+ querySt.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
if (connection!=null) {
try {
doLog("Closing connection at end");
@@ -662,27 +769,39 @@ public class MdbcTestMultiClient implements Runnable { doLog("All done.");
}
- private void doLog(String string) {
+ private boolean allThreadsDone() {
+ for (Boolean b: this.threadsDone) {
+ if (!b) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void doLog(String string) {
System.out.println(">> Thread " + threadId + " " + sdf.format(new java.util.Date()) + " >> " + string);
}
- public static void main(String[] args) {
+ public static void main(String[] args) throws InterruptedException {
MdbcTestMultiClient mtc = new MdbcTestMultiClient(args);
mtc.runTests();
}
- private void runTests() {
+ private void runTests() throws InterruptedException {
if (randomSeed==null) {
randomSeed = new Random().nextLong();
}
doLog("Using random seed = " + randomSeed);
Random seedRandom = new Random(randomSeed);
+ this.threadsDone = new boolean[connectionStrings.size()];
+
for (int i=0; i<connectionStrings.size(); i++) {
MdbcTestMultiClient mt = new MdbcTestMultiClient(this, i);
mt.setRandomSeed(seedRandom.nextLong());
Thread t = new Thread(mt);
t.start();
}
+
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java index 745307c..cba699f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java @@ -31,6 +31,8 @@ import org.apache.commons.lang3.tuple.Pair; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; +import org.onap.music.mdbc.tables.MriReference; +import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.StagingTable; /** @@ -134,13 +136,13 @@ public interface DBInterface { * @throws SQLException if replay cannot occur correctly * @throws MDBCServiceException */ - void replayTransaction(StagingTable digest, Set<Range> ranges) throws SQLException, MDBCServiceException; + void replayTransaction(StagingTable digest) throws SQLException, MDBCServiceException; void disableForeignKeyChecks() throws SQLException; void enableForeignKeyChecks() throws SQLException; - void applyTxDigest(StagingTable txDigest, Set<Range> ranges) throws SQLException, MDBCServiceException; + void applyTxDigest(StagingTable txDigest) throws SQLException, MDBCServiceException; Connection getSQLConnection(); @@ -151,7 +153,12 @@ public interface DBInterface { * @param r * @param playbackPointer */ - public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer); + public void updateCheckpointLocations(Range r, Pair<MriReference, MusicTxDigestId> playbackPointer); + /** + * Get current locations of this database's already applied locations + * @return + */ + public Map<Range, Pair<MriReference, MusicTxDigestId>> getCheckpointLocations(); /** * Initialize the SQL database by creating any tables necessary diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java index 637cb15..b8ac563 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java @@ -45,19 +45,19 @@ import org.onap.music.mdbc.tables.*; */ public interface MusicInterface { class OwnershipReturn{ - private final UUID ownershipId; + private final UUID ownershipOpId; private final String lockId; private final UUID rangeId; private final Set<Range> ranges; private final Dag dag; - public OwnershipReturn(UUID ownershipId, String ownerId, UUID rangeId, Set<Range> ranges, Dag dag){ - this.ownershipId=ownershipId; - this.lockId=ownerId; + public OwnershipReturn(UUID ownershipOpId, String lockId, UUID rangeId, Set<Range> ranges, Dag dag){ + this.ownershipOpId=ownershipOpId; + this.lockId=lockId; this.rangeId=rangeId; this.ranges=ranges; this.dag=dag; } - public String getOwnerId(){ + public String getLockId(){ return lockId; } public UUID getRangeId(){ @@ -65,7 +65,7 @@ public interface MusicInterface { } public Set<Range> getRanges(){ return ranges; } public Dag getDag(){return dag;} - public UUID getOwnershipId() { return ownershipId; } + public UUID getOwnershipId() { return ownershipOpId; } } /** * Get the name of this MusicInterface mixin object. @@ -181,15 +181,21 @@ public interface MusicInterface { /** * Commits the corresponding REDO-log into MUSIC + * Transaction is committed -- add all the updates into the REDO-Log in MUSIC + * + * This officially commits the transaction globally + * + * * * @param partition information related to ownership of partitions, used to verify ownership when commiting the Tx * @param eventualRanges * @param transactionDigest digest of the transaction that is being committed into the Redo log in music. * @param txId id associated with the log being send * @param progressKeeper data structure that is used to handle to detect failures, and know what to do + * @return digest that was created for this transaction commit * @throws MDBCServiceException */ - void commitLog(DatabasePartition partition, Set<Range> eventualRanges, StagingTable transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException; + public MusicTxDigestId commitLog(DatabasePartition partition, Set<Range> eventualRanges, StagingTable transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException; /** @@ -213,10 +219,11 @@ public interface MusicInterface { /** * This function is used to create a new locked row in the MRI table * @param info the information used to create the row + * @param owner owner of the lock for deadlock detection * @return the new partition object that contain the new information used to create the row * @throws MDBCServiceException */ - DatabasePartition createLockedMRIRow(MusicRangeInformationRow info) throws MDBCServiceException; + DatabasePartition createLockedMRIRow(MusicRangeInformationRow info, String owner) throws MDBCServiceException; /** * This function is used to create all the required music dependencies @@ -320,10 +327,13 @@ public interface MusicInterface { void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException; String createLock(LockRequest request) throws MDBCServiceException; + String createLock(LockRequest request, String ownerId) throws MDBCServiceException; LockResult acquireLock(LockRequest request, String lockId) throws MDBCServiceException; void releaseLocks(Map<UUID, LockResult> newLocks) throws MDBCServiceException; + public void releaseAllLocksForOwner(String owner, String keyspace, String table) throws MDBCServiceException; + /** * Combine previous musicrangeinformation rows for new partition, if necessary * @@ -331,13 +341,26 @@ public interface MusicInterface { * * @param currentlyOwned * @param locksForOwnership - * @param ownershipId + * @param ownershipOpId + * @param ownerId * @return * @throws MDBCServiceException */ - OwnershipReturn mergeLatestRowsIfNecessary(Dag currentlyOwned, Map<UUID, LockResult> locksForOwnership, UUID ownershipId) - throws MDBCServiceException; - + OwnershipReturn mergeLatestRowsIfNecessary(Dag currentlyOwned, Map<UUID, LockResult> locksForOwnership, + UUID ownershipOpId, String ownerId) throws MDBCServiceException; + + /** + * If this connection is using fewer ranges than what is owned in the current partition, split + * the partition to avoid a universal partition being passed around. + * + * This will follow "most recently used" policy + * @param partition2 partition that this transaction currently owns + * @param rangesUsed set of ranges that is the minimal required for this transaction + * @throws MDBCServiceException + */ + public DatabasePartition splitPartitionIfNecessary(DatabasePartition partition, Set<Range> rangesUsed, + String ownerId) throws MDBCServiceException; + /** * Create ranges in MRI table, if not already present * @param range to add into mri table @@ -349,8 +372,9 @@ public interface MusicInterface { * This is an eventual operation for minimal performance hits * @param r * @param playbackPointer + * @throws MDBCServiceException */ - public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer); + public void updateCheckpointLocations(Range r, Pair<MriReference, MusicTxDigestId> playbackPointer); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index 5581573..a24ada2 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -51,12 +51,15 @@ import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.exceptions.MusicLockingException; import org.onap.music.exceptions.MusicQueryException; import org.onap.music.exceptions.MusicServiceException; +import org.onap.music.lockingservice.cassandra.LockType; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.main.MusicCore; +import org.onap.music.main.CorePropertiesLoader; import org.onap.music.main.ResultType; import org.onap.music.main.ReturnType; import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.MDBCUtils; +import org.onap.music.mdbc.MdbcConnection; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.StateManager; import org.onap.music.mdbc.TableInfo; @@ -69,6 +72,7 @@ import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.RangeDependency; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.TxCommitProgress; +import org.onap.music.service.impl.MusicCassaCore; /** * This class provides the methods that MDBC needs to access Cassandra directly in order to provide persistence @@ -104,6 +108,8 @@ public class MusicMixin implements MusicInterface { public static final String KEY_TIMEOUT = "mdbc_timeout"; /** The property name to use to provide a flag indicating if compression is required */ public static final String KEY_COMPRESSION = "mdbc_compression"; + /** The property name to use to provide a flag indicating if mri row splits is allowable */ + public static final String KEY_SPLIT = "partition_splitting"; /** Namespace for the tables in MUSIC (Cassandra) */ public static final String DEFAULT_MUSIC_NAMESPACE = "namespace"; /** The default property value to use for the Cassandra IP address. */ @@ -197,9 +203,9 @@ public class MusicMixin implements MusicInterface { private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>()); private StateManager stateManager; private boolean useCompression; + private boolean splitAllowed; public MusicMixin() { - //this.logger = null; this.musicAddress = null; this.music_ns = null; @@ -209,6 +215,8 @@ public class MusicMixin implements MusicInterface { } public MusicMixin(StateManager stateManager, String mdbcServerName, Properties info) throws MDBCServiceException { + CorePropertiesLoader.loadProperties(info); + // Default values -- should be overridden in the Properties // Default to using the host_ids of the various peers as the replica IDs (this is probably preferred) this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS); @@ -237,6 +245,9 @@ public class MusicMixin implements MusicInterface { String s = info.getProperty(KEY_MUSIC_RFACTOR); this.music_rfactor = (s == null) ? DEFAULT_MUSIC_RFACTOR : Integer.parseInt(s); + String split = info.getProperty(KEY_SPLIT); + this.splitAllowed = (split == null) ? true: Boolean.parseBoolean(split); + initializeMetricTables(); commitExecutorThreads = Executors.newFixedThreadPool(4); } @@ -283,6 +294,8 @@ public class MusicMixin implements MusicInterface { throw new MDBCServiceException("Error creating namespace: "+keyspace+". Internal error:"+e.getErrorMessage(), e); } + } catch (MusicQueryException e) { + throw new MDBCServiceException(e); } } @@ -1118,22 +1131,19 @@ public class MusicMixin implements MusicInterface { * Build a preparedQueryObject that appends a transaction to the mriTable * @param mriTable * @param uuid - * @param table * @param redoUuid * @return */ - private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, String table, UUID redoUuid){ + private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, UUID redoUuid){ PreparedQueryObject query = new PreparedQueryObject(); StringBuilder appendBuilder = new StringBuilder(); appendBuilder.append("UPDATE ") .append(music_ns) .append(".") .append(mriTable) - .append(" SET txredolog = txredolog +[('") - .append(table) - .append("',") + .append(" SET txredolog = txredolog +[") .append(redoUuid) - .append(")] WHERE rangeid = ") + .append("] WHERE rangeid = ") .append(uuid) .append(";"); query.appendQueryString(appendBuilder.toString()); @@ -1198,44 +1208,19 @@ public class MusicMixin implements MusicInterface { return pendingRows; } - private List<Range> lockRow(LockRequest request,Map.Entry<UUID, Set<Range>> pending,Map<UUID, String> currentLockRef, - String fullyQualifiedKey, String lockId, List<Range> pendingToLock, - Map<UUID, LockResult> alreadyHeldLocks) - throws MDBCServiceException{ - List<Range> newRanges = new ArrayList<>(); - String newFullyQualifiedKey = music_ns + "." + musicRangeInformationTableName + "." + pending.getKey().toString(); - String newLockId; - boolean success; - if (currentLockRef.containsKey(pending.getKey())) { - newLockId = currentLockRef.get(pending.getKey()); - success = (MusicCore.whoseTurnIsIt(newFullyQualifiedKey) == newLockId); - } else { - newLockId = MusicCore.createLockReference(newFullyQualifiedKey); - ReturnType newLockReturn = acquireLock(fullyQualifiedKey, lockId); - success = newLockReturn.getResult().compareTo(ResultType.SUCCESS) == 0; - } - if (!success) { - pendingToLock.addAll(pending.getValue()); - currentLockRef.put(pending.getKey(), newLockId); - } else { - if(alreadyHeldLocks.containsKey(pending.getKey())){ - throw new MDBCServiceException("Adding key that already exist"); - } - alreadyHeldLocks.put(pending.getKey(),new LockResult(pending.getKey(), newLockId, true, - pending.getValue())); - newRanges.addAll(pending.getValue()); - } - return newRanges; - } - private boolean isDifferent(NavigableMap<UUID, List<Range>> previous, NavigableMap<UUID, List<Range>> current){ return previous.keySet().equals(current.keySet()); } - protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException { + protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition, String ownerId) + throws MDBCServiceException { UUID mriIndex = partition.getMRIIndex(); String lockId; - lockId = MusicCore.createLockReference(fullyQualifiedKey); + try { + lockId = MusicCore.createLockReference(fullyQualifiedKey, ownerId); + } catch (MusicLockingException e1) { + throw new MDBCServiceException(e1); + } if(lockId==null) { throw new MDBCServiceException("lock reference is null"); } @@ -1263,15 +1248,15 @@ public class MusicMixin implements MusicInterface { return lockId; } - protected void changeIsLatestToMRI(MusicRangeInformationRow row, boolean isLatest, LockResult lock) throws MDBCServiceException{ + protected void changeIsLatestToMRI(UUID mrirow, boolean isLatest, String lockref) throws MDBCServiceException{ - if(lock == null) + if(lockref == null) return; - PreparedQueryObject appendQuery = createChangeIsLatestToMriQuery(musicRangeInformationTableName, row.getPartitionIndex(), + PreparedQueryObject appendQuery = createChangeIsLatestToMriQuery(musicRangeInformationTableName, mrirow, musicTxDigestTableName, isLatest); - ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, row.getPartitionIndex().toString(), + ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, mrirow.toString(), appendQuery, - lock.getLockId() + lockref , null); if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){ logger.error(EELFLoggerDelegate.errorLogger, "Error when executing change isLatest operation with return type: "+returnType.getMessage()); @@ -1289,12 +1274,8 @@ public class MusicMixin implements MusicInterface { addTxDigest(digestId, serializedTransactionDigest); } - /** - * Writes the transaction information to metric's txDigest and musicRangeInformation table - * This officially commits the transaction globally - */ @Override - public void commitLog(DatabasePartition partition,Set<Range> eventualRanges, StagingTable transactionDigest, + public MusicTxDigestId commitLog(DatabasePartition partition,Set<Range> eventualRanges, StagingTable transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException { // first deal with commit for eventually consistent tables @@ -1302,18 +1283,18 @@ public class MusicMixin implements MusicInterface { if(partition==null){ logger.warn("Trying tcommit log with null partition"); - return; + return null; } Set<Range> snapshot = partition.getSnapshot(); if(snapshot==null || snapshot.isEmpty()){ logger.warn("Trying to commit log with empty ranges"); - return; + return null; } //Add creation type of transaction digest if(transactionDigest == null || transactionDigest.isEmpty()) { - return; + return null; } UUID mriIndex = partition.getMRIIndex(); @@ -1325,7 +1306,7 @@ public class MusicMixin implements MusicInterface { } - final MusicTxDigestId digestId = new MusicTxDigestId(MDBCUtils.generateUniqueKey(), -1); + final MusicTxDigestId digestId = new MusicTxDigestId(mriIndex, MDBCUtils.generateUniqueKey(), -1); Callable<Boolean> insertDigestCallable =()-> { try { createAndAddTxDigest(transactionDigest,digestId.transactionId); @@ -1337,8 +1318,7 @@ public class MusicMixin implements MusicInterface { }; Callable<Boolean> appendCallable=()-> { try { - appendToRedoLog(music_ns, mriIndex, digestId.transactionId, lockId, musicTxDigestTableName, - musicRangeInformationTableName); + appendToRedoLog(music_ns, mriIndex, digestId.transactionId, lockId, musicRangeInformationTableName); return true; } catch (MDBCServiceException e) { logger.error(EELFLoggerDelegate.errorLogger, "Error creating and pushing tx digest to music",e); @@ -1349,9 +1329,7 @@ public class MusicMixin implements MusicInterface { Future<Boolean> appendResultFuture = commitExecutorThreads.submit(appendCallable); Future<Boolean> digestFuture = commitExecutorThreads.submit(insertDigestCallable); try { - //Boolean appendResult = appendResultFuture.get(); - Boolean digestResult = digestFuture.get(); - if(/*!appendResult ||*/ !digestResult){ + if(!appendResultFuture.get() || !digestFuture.get()){ logger.error(EELFLoggerDelegate.errorLogger, "Error appending to log or adding tx digest"); throw new MDBCServiceException("Error appending to log or adding tx digest"); } @@ -1364,21 +1342,8 @@ public class MusicMixin implements MusicInterface { if (progressKeeper != null) { progressKeeper.setRecordId(txId, digestId); } - Set<Range> ranges = partition.getSnapshot(); - for(Range r : ranges) { - Map<Range, Pair<MriReference, Integer>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied(); - if(!alreadyApplied.containsKey(r)){ - throw new MDBCServiceException("already applied data structure was not updated correctly and range " - +r+" is not contained"); - } - Pair<MriReference, Integer> rowAndIndex = alreadyApplied.get(r); - MriReference key = rowAndIndex.getKey(); - if(!mriIndex.equals(key.index)){ - throw new MDBCServiceException("already applied data structure was not updated correctly and range "+ - r+" is not pointing to row: "+mriIndex.toString()); - } - alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), rowAndIndex.getValue()+1)); - } + + return digestId; } private void filterAndAddEventualTxDigest(Set<Range> eventualRanges, @@ -1477,20 +1442,18 @@ public class MusicMixin implements MusicInterface { static public MusicRangeInformationRow getMRIRowFromCassandraRow(Row newRow){ UUID partitionIndex = newRow.getUUID("rangeid"); - List<TupleValue> log = newRow.getList("txredolog",TupleValue.class); + List<UUID> log = newRow.getList("txredolog",UUID.class); List<MusicTxDigestId> digestIds = new ArrayList<>(); int index=0; - for(TupleValue t: log){ - //final String tableName = t.getString(0); - final UUID id = t.getUUID(1); - digestIds.add(new MusicTxDigestId(partitionIndex,id,index++)); + for(UUID u: log){ + digestIds.add(new MusicTxDigestId(partitionIndex,u,index++)); } Set<Range> partitions = new HashSet<>(); Set<String> tables = newRow.getSet("keys",String.class); for (String table:tables){ partitions.add(new Range(table)); } - return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex, ""), + return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex), digestIds, newRow.getBool("islatest"), newRow.getSet("prevmrirows", UUID.class)); } @@ -1564,7 +1527,7 @@ public class MusicMixin implements MusicInterface { fields.append("prevmrirows set<uuid>, "); fields.append("islatest boolean, "); //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly - fields.append("txredolog list<frozen<tuple<text,uuid>>> "); + fields.append("txredolog list<uuid> "); String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", namespace, tableName, fields, priKey); try { @@ -1577,14 +1540,15 @@ public class MusicMixin implements MusicInterface { @Override - public DatabasePartition createLockedMRIRow(MusicRangeInformationRow info) throws MDBCServiceException { + public DatabasePartition createLockedMRIRow(MusicRangeInformationRow info, String ownerId) + throws MDBCServiceException { DatabasePartition newPartition = info.getDBPartition(); String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+newPartition.getMRIIndex().toString(); String lockId; int counter=0; do { - lockId = createAndAssignLock(fullyQualifiedMriKey, newPartition); + lockId = createAndAssignLock(fullyQualifiedMriKey, newPartition, ownerId); //TODO: fix this retry logic } while ((lockId ==null||lockId.isEmpty())&&(counter++<3)); if (lockId == null || lockId.isEmpty()) { @@ -1698,15 +1662,12 @@ public class MusicMixin implements MusicInterface { @Override public void appendToRedoLog(UUID MRIIndex, String lockId, MusicTxDigestId newRecord) throws MDBCServiceException { logger.debug("Appending to redo log for partition " + MRIIndex + " txId=" + newRecord.transactionId); - appendToRedoLog(music_ns,MRIIndex,newRecord.transactionId,lockId,musicTxDigestTableName, - musicRangeInformationTableName); + appendToRedoLog(music_ns,MRIIndex,newRecord.transactionId,lockId,musicRangeInformationTableName); } - public void appendToRedoLog(String musicNamespace, UUID MRIIndex, UUID transactionId, String lockId, - String musicTxDigestTableName, String musicRangeInformationTableName) + public void appendToRedoLog(String musicNamespace, UUID MRIIndex, UUID transactionId, String lockId, String musicRangeInformationTableName) throws MDBCServiceException{ - PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MRIIndex, - musicTxDigestTableName, transactionId); + PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MRIIndex, transactionId); ReturnType returnType = MusicCore.criticalPut(musicNamespace, musicRangeInformationTableName, MRIIndex.toString(), appendQuery, lockId, null); //returnType.getExecutionInfo() @@ -1805,13 +1766,9 @@ public class MusicMixin implements MusicInterface { } public static void createMusicMdbcCheckpointTable(String namespace, String checkpointTable) throws MDBCServiceException { - String priKey = "txid"; - StringBuilder fields = new StringBuilder(); - fields.append("txid uuid, "); - fields.append("compressed boolean, "); - fields.append("transactiondigest blob ");//notice lack of ',' String cql = - String.format("CREATE TABLE IF NOT EXISTS %s.%s (mdbcnode UUID, mridigest UUID, digestindex int, PRIMARY KEY (mdbcnode));", + String.format("CREATE TABLE IF NOT EXISTS %s.%s (mdbcnode text, range text, mridigest UUID," + + "digestid UUID, PRIMARY KEY (mdbcnode, range));", namespace, checkpointTable); try { executeMusicWriteQuery(namespace,checkpointTable,cql); @@ -1844,6 +1801,8 @@ public class MusicMixin implements MusicInterface { } catch (MusicServiceException e) { logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for digest id "+digestId.toString()+ "with error "+e.getErrorMessage()); throw new MDBCServiceException("Transaction Digest serialization for digest id "+digestId.toString(), e); + } catch (MusicQueryException e) { + throw new MDBCServiceException(e); } } @@ -1870,6 +1829,8 @@ public class MusicMixin implements MusicInterface { } catch (MusicServiceException e) { logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.transactionId.toString()+ "with error "+e.getErrorMessage()); throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.transactionId.toString(), e); + } catch (MusicQueryException e) { + throw new MDBCServiceException(e); } } @@ -2027,7 +1988,7 @@ public class MusicMixin implements MusicInterface { private void unlockKeyInMusic(String table, String key, String lockref) throws MDBCServiceException { String fullyQualifiedKey= music_ns+"."+ table+"."+key; try { - MusicCore.voluntaryReleaseLock(fullyQualifiedKey,lockref); + MusicCassaCore.getInstance().voluntaryReleaseLock(fullyQualifiedKey,lockref); } catch (MusicLockingException e) { throw new MDBCServiceException(e.getMessage(), e); } @@ -2066,6 +2027,15 @@ public class MusicMixin implements MusicInterface { } } + @Override + public void releaseAllLocksForOwner(String ownerId, String keyspace, String table) throws MDBCServiceException { + try { + MusicCore.releaseAllLocksForOwner(ownerId, keyspace, table); + } catch (MusicLockingException | MusicServiceException | MusicQueryException e) { + throw new MDBCServiceException(e); + } + } + /** * Get a list of ranges and their range dependencies * @param range @@ -2087,9 +2057,19 @@ public class MusicMixin implements MusicInterface { @Override public String createLock(LockRequest request) throws MDBCServiceException{ + return createLock(request, null); + } + + @Override + public String createLock(LockRequest request, String ownerId) throws MDBCServiceException{ String fullyQualifiedKey= music_ns+"."+ musicRangeInformationTableName + "." + request.getId(); boolean isWrite = (request.getLockType()==SQLOperationType.WRITE); - String lockId = MusicCore.createLockReference(fullyQualifiedKey, isWrite); + String lockId; + try { + lockId = MusicCore.createLockReference(fullyQualifiedKey, isWrite?LockType.WRITE:LockType.READ, ownerId); + } catch (MusicLockingException e) { + throw new MDBCServiceException(e); + } return lockId; } @@ -2117,7 +2097,8 @@ public class MusicMixin implements MusicInterface { * @param locks * @throws MDBCServiceException */ - private void recoverFromFailureAndUpdateDag(Dag latestDag, Map<UUID,LockResult> locks) throws MDBCServiceException { + private void recoverFromFailureAndUpdateDag(Dag latestDag, Map<UUID, LockResult> locks, String ownerId) + throws MDBCServiceException { Pair<Set<Range>, Set<DagNode>> rangesAndDependents = latestDag.getIncompleteRangesAndDependents(); if(rangesAndDependents.getKey()==null || rangesAndDependents.getKey().size()==0 || rangesAndDependents.getValue()==null || rangesAndDependents.getValue().size() == 0){ @@ -2129,8 +2110,9 @@ public class MusicMixin implements MusicInterface { prevPartitions.add(dagnode.getId()); } - MusicRangeInformationRow r = createAndAssignLock(rangesAndDependents.getKey(), prevPartitions); - locks.put(r.getPartitionIndex(),new LockResult(r.getPartitionIndex(),r.getDBPartition().getLockId(),true,rangesAndDependents.getKey())); + MusicRangeInformationRow r = createAndAssignLock(rangesAndDependents.getKey(), prevPartitions, ownerId); + locks.put(r.getPartitionIndex(), new LockResult(true, r.getPartitionIndex(), r.getDBPartition().getLockId(), + true, rangesAndDependents.getKey())); latestDag.addNewNode(r,new ArrayList<>(rangesAndDependents.getValue())); } @@ -2140,7 +2122,10 @@ public class MusicMixin implements MusicInterface { List<MusicRangeInformationRow> returnInfo = new ArrayList<>(); List<DagNode> toDisable = latestDag.getOldestDoubles(); for(DagNode node : toDisable){ - changeIsLatestToMRI(node.getRow(),false,locks.get(node.getId())); + LockResult lockToDisable = locks.get(node.getId()); + if (lockToDisable!=null) { + changeIsLatestToMRI(node.getRow().getPartitionIndex(),false,lockToDisable.getLockId()); + } latestDag.setIsLatest(node.getId(),false); returnInfo.add(node.getRow()); } @@ -2161,13 +2146,14 @@ public class MusicMixin implements MusicInterface { } @Override - public OwnershipReturn mergeLatestRowsIfNecessary(Dag currentlyOwned, Map<UUID, LockResult> locksForOwnership, UUID ownershipId) throws MDBCServiceException { - recoverFromFailureAndUpdateDag(currentlyOwned,locksForOwnership); + public OwnershipReturn mergeLatestRowsIfNecessary(Dag currentlyOwned, Map<UUID, LockResult> locksForOwnership, + UUID ownershipOpId, String ownerId) throws MDBCServiceException { + recoverFromFailureAndUpdateDag(currentlyOwned,locksForOwnership, ownerId); if (locksForOwnership.keySet().size()==1) { //reuse if overlapping single partition, no merge necessary for (UUID uuid: locksForOwnership.keySet()) { - return new OwnershipReturn(ownershipId, locksForOwnership.get(uuid).getLockId(), uuid, + return new OwnershipReturn(ownershipOpId, locksForOwnership.get(uuid).getLockId(), uuid, currentlyOwned.getNode(uuid).getRangeSet(), currentlyOwned); } } @@ -2178,19 +2164,70 @@ public class MusicMixin implements MusicInterface { Set<Range> ranges = extractRangesToOwn(currentlyOwned, locksForOwnership.keySet()); - MusicRangeInformationRow createdRow = createAndAssignLock(ranges, locksForOwnership.keySet()); + MusicRangeInformationRow createdRow = createAndAssignLock(ranges, locksForOwnership.keySet(), ownerId); currentlyOwned.addNewNodeWithSearch(createdRow, ranges); changed = setReadOnlyAnyDoubleRow(currentlyOwned, locksForOwnership); releaseLocks(locksForOwnership); - return new OwnershipReturn(ownershipId, createdRow.getDBPartition().getLockId(), createdRow.getPartitionIndex(), + return new OwnershipReturn(ownershipOpId, createdRow.getDBPartition().getLockId(), createdRow.getPartitionIndex(), createdRow.getDBPartition().getSnapshot(), currentlyOwned); } + + + @Override + public DatabasePartition splitPartitionIfNecessary(DatabasePartition partition, Set<Range> rangesUsed, + String ownerId) throws MDBCServiceException { + if (!this.splitAllowed) { + return partition; + } + Set<Range> rangesOwned = partition.getSnapshot(); + if (rangesOwned==null || rangesUsed==null) { + return partition; + } + if (!rangesOwned.containsAll(rangesUsed)) { + throw new MDBCServiceException("Transaction was unable to acquire all necessary ranges."); + } + + if (rangesUsed.containsAll(rangesOwned)) { + //using all ranges in this partition + return partition; + } + + //split partition + logger.info(EELFLoggerDelegate.applicationLogger, "Full partition not being used need (" + rangesUsed + +") and own (" + rangesOwned + ", splitting the partition"); + Set<UUID> prevPartitions = new HashSet<>(); + prevPartitions.add(partition.getMRIIndex()); + MusicRangeInformationRow usedRow = createAndAssignLock(rangesUsed, prevPartitions, ownerId); + rangesOwned.removeAll(rangesUsed); + Set<Range> rangesNotUsed = rangesOwned; + MusicRangeInformationRow unusedRow = createAndAssignLock(rangesNotUsed, prevPartitions, ownerId); + + changeIsLatestToMRI(partition.getMRIIndex(), false, partition.getLockId()); + + /* + Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied(); + for (Range range: rangesUsed) { + alreadyApplied.put(range, Pair.of(new MriReference(usedRow.getPartitionIndex()), -1)); + } + for (Range range: rangesNotUsed) { + alreadyApplied.put(range, Pair.of(new MriReference(unusedRow.getPartitionIndex()), -1)); + } + */ - private MusicRangeInformationRow createAndAssignLock(Set<Range> ranges, Set<UUID> prevPartitions) throws MDBCServiceException { + //release/update old partition info + relinquish(unusedRow.getDBPartition()); + relinquish(partition); + + return usedRow.getDBPartition(); + } + + + private MusicRangeInformationRow createAndAssignLock(Set<Range> ranges, Set<UUID> prevPartitions, String ownerId) + throws MDBCServiceException { UUID newUUID = MDBCUtils.generateTimebasedUniqueKey(); DatabasePartition newPartition = new DatabasePartition(ranges,newUUID,null); MusicRangeInformationRow row = new MusicRangeInformationRow(newPartition, true, prevPartitions); - createLockedMRIRow(row); + createLockedMRIRow(row, ownerId); return row; } @@ -2269,7 +2306,7 @@ public class MusicMixin implements MusicInterface { } catch (MDBCServiceException e) { logger.error("Error relinquishing lock, will use timeout to solve"); } - partition.setLockId(""); + partition.setLockId(null); } } @@ -2509,25 +2546,27 @@ public class MusicMixin implements MusicInterface { } MusicRangeInformationRow mriRow = - createAndAssignLock(new HashSet<Range>(Arrays.asList(rangeToCreate)), new HashSet<UUID>()); + createAndAssignLock(new HashSet<Range>(Arrays.asList(rangeToCreate)), new HashSet<UUID>(), ""); //TODO: should make sure we didn't create 2 new rows simultaneously, while we still own the lock unlockKeyInMusic(musicRangeInformationTableName, mriRow.getPartitionIndex().toString(), mriRow.getDBPartition().getLockId()); } @Override - public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) { - String cql = String.format("INSERT INTO %s.%s (mdbcnode, mridigest, digestindex) VALUES (" - + this.myId + ", " + playbackPointer.getLeft() + ", " + playbackPointer.getRight() + ");", - music_ns, this.musicMdbcCheckpointsTableName); + public void updateCheckpointLocations(Range r, Pair<MriReference, MusicTxDigestId> playbackPointer) { + String cql = String.format("INSERT INTO %s.%s (mdbcnode, range, mridigest, digestid) VALUES ('%s', '%s', %s, %s);", + music_ns, this.musicMdbcCheckpointsTableName, this.stateManager.getMdbcServerName(), r.getTable(), + playbackPointer.getLeft().getIndex(), playbackPointer.getRight().transactionId); PreparedQueryObject pQueryObject = new PreparedQueryObject(); pQueryObject.appendQueryString(cql); try { MusicCore.nonKeyRelatedPut(pQueryObject,"eventual"); } catch (MusicServiceException e) { logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to update the checkpoint location", e); + } catch (MusicQueryException e) { + logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to update the checkpoint location with query", e); } } - + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java index ec91ceb..b544b94 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java @@ -35,9 +35,9 @@ import java.util.Properties; import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang3.tuple.Pair; import org.json.JSONObject; -import org.json.JSONTokener; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.mdbc.Configuration; @@ -45,7 +45,8 @@ import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; import org.onap.music.mdbc.query.SQLOperation; -import org.onap.music.mdbc.query.SQLOperationType; +import org.onap.music.mdbc.tables.MriReference; +import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.Operation; import org.onap.music.mdbc.tables.StagingTable; import net.sf.jsqlparser.JSQLParserException; @@ -87,7 +88,7 @@ public class MySQLMixin implements DBInterface { + "CONNECTION_ID INT, PRIMARY KEY (IX));"; private static final String CKPT_TBL = "MDBC_CHECKPOINT"; private static final String CREATE_CKPT_SQL = - "CREATE TABLE IF NOT EXISTS " + CKPT_TBL + " (RANGENAME VARCHAR(64) PRIMARY KEY, MRIROW VARCHAR(36), DIGESTINDEX INT);"; + "CREATE TABLE IF NOT EXISTS " + CKPT_TBL + " (RANGENAME VARCHAR(64) PRIMARY KEY, MRIROW VARCHAR(36), DIGESTID VARCHAR(36));"; private final MusicInterface mi; private final int connId; @@ -187,7 +188,7 @@ public class MySQLMixin implements DBInterface { String dbname = "mdbc"; // default name try { Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT DATABASE() AS DB"); + ResultSet rs = stmt.executeQuery("SELECT UPPER(DATABASE()) AS DB"); if (rs.next()) { dbname = rs.getString("DB"); } @@ -214,7 +215,7 @@ public class MySQLMixin implements DBInterface { public Set<String> getSQLTableSet() { Set<String> set = new TreeSet<String>(); String sql = - "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'"; + "SELECT CONCAT(UPPER(TABLE_SCHEMA), '.', UPPER(TABLE_NAME)) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE';"; try { Statement stmt = jdbcConn.createStatement(); ResultSet rs = stmt.executeQuery(sql); @@ -234,7 +235,7 @@ public class MySQLMixin implements DBInterface { public Set<Range> getSQLRangeSet() { Set<String> set = new TreeSet<String>(); String sql = - "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'"; + "SELECT CONCAT(UPPER(TABLE_SCHEMA), '.', UPPER(TABLE_NAME)) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE';"; try { Statement stmt = jdbcConn.createStatement(); ResultSet rs = stmt.executeQuery(sql); @@ -249,7 +250,10 @@ public class MySQLMixin implements DBInterface { logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set); Set<Range> rangeSet = new HashSet<>(); for (String table : set) { - rangeSet.add(new Range(table)); + if (!getReservedTblNames().contains(table)) { + // Don't create triggers for the table the triggers write into!!! + rangeSet.add(new Range(table)); + } } return rangeSet; } @@ -814,15 +818,31 @@ public class MySQLMixin implements DBInterface { private ArrayList<String> getMusicKey(String tbl, String cmd, String sql) { ArrayList<String> musicKeys = new ArrayList<String>(); /* - * if (cmd.equalsIgnoreCase("insert")) { //create key, return key musicKeys.add(msm.generatePrimaryKey()); } - * else if (cmd.equalsIgnoreCase("update") || cmd.equalsIgnoreCase("delete")) { try { - * net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql); String where; if (stmt instanceof - * Update) { where = ((Update) stmt).getWhere().toString(); } else if (stmt instanceof Delete) { where = - * ((Delete) stmt).getWhere().toString(); } else { System.err.println("Unknown type: " +stmt.getClass()); where - * = ""; } ResultSet rs = executeSQLRead("SELECT * FROM " + tbl + " WHERE " + where); musicKeys = - * msm.getMusicKeysWhere(tbl, Utils.parseResults(getTableInfo(tbl), rs)); } catch (JSQLParserException e) { - * - * e.printStackTrace(); } catch (SQLException e) { //Not a valid sql query e.printStackTrace(); } } + if (cmd.equalsIgnoreCase("insert")) { + //create key, return key + musicKeys.add(msm.generatePrimaryKey()); + } else if (cmd.equalsIgnoreCase("update") || cmd.equalsIgnoreCase("delete")) { + try { + net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql); + String where; + if (stmt instanceof Update) { + where = ((Update) stmt).getWhere().toString(); + } else if (stmt instanceof Delete) { + where = ((Delete) stmt).getWhere().toString(); + } else { + System.err.println("Unknown type: " +stmt.getClass()); + where = ""; + } + ResultSet rs = executeSQLRead("SELECT * FROM " + tbl + " WHERE " + where); + musicKeys = msm.getMusicKeysWhere(tbl, Utils.parseResults(getTableInfo(tbl), rs)); + } catch (JSQLParserException e) { + + e.printStackTrace(); + } catch (SQLException e) { + //Not a valid sql query + e.printStackTrace(); + } + } */ return musicKeys; } @@ -877,7 +897,7 @@ public class MySQLMixin implements DBInterface { * @param transaction - base 64 encoded, serialized digest * @throws MDBCServiceException */ - public void replayTransaction(StagingTable transaction, Set<Range> ranges) + public void replayTransaction(StagingTable transaction) throws SQLException, MDBCServiceException { boolean autocommit = jdbcConn.getAutoCommit(); jdbcConn.setAutoCommit(false); @@ -885,7 +905,6 @@ public class MySQLMixin implements DBInterface { ArrayList<Operation> opList = transaction.getOperationList(); for (Operation op : opList) { - if (Range.overlaps(ranges, op.getTable())) { try { replayOperationIntoDB(jdbcStmt, op); } catch (SQLException | MDBCServiceException e) { @@ -895,7 +914,6 @@ public class MySQLMixin implements DBInterface { jdbcConn.rollback(); throw e; } - } } clearReplayedOperations(jdbcStmt); @@ -920,8 +938,8 @@ public class MySQLMixin implements DBInterface { } @Override - public void applyTxDigest(StagingTable txDigest, Set<Range> ranges) throws SQLException, MDBCServiceException { - replayTransaction(txDigest, ranges); + public void applyTxDigest(StagingTable txDigest) throws SQLException, MDBCServiceException { + replayTransaction(txDigest); } /** @@ -939,17 +957,7 @@ public class MySQLMixin implements DBInterface { ArrayList<String> cols = new ArrayList<String>(); ArrayList<Object> vals = new ArrayList<Object>(); - Iterator<String> colIterator = jsonOp.keys(); - while (colIterator.hasNext()) { - String col = colIterator.next(); - // FIXME: should not explicitly refer to cassandramixin - if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) { - // reserved name - continue; - } - cols.add(col); - vals.add(jsonOp.get(col)); - } + constructColValues(jsonOp, cols, vals); // build and replay the queries StringBuilder sql = constructSQL(op, cols, vals); @@ -965,7 +973,11 @@ public class MySQLMixin implements DBInterface { logger.warn("Error Replaying operation: " + sql.toString() + "; Replacing insert/replace/viceversa and replaying "); - buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals); + try { + buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals); + } catch (Exception e) { + logger.warn(" Error replaying inverse operation; " + sql + "Ignore the exception"); + } } } catch (SQLException sqlE) { // This applies for replaying transactions involving Eventually Consistent tables @@ -977,6 +989,20 @@ public class MySQLMixin implements DBInterface { } } + public void constructColValues(JSONObject jsonOp, ArrayList<String> cols, + ArrayList<Object> vals) { + Iterator<String> colIterator = jsonOp.keys(); + while(colIterator.hasNext()) { + String col = colIterator.next(); + //FIXME: should not explicitly refer to cassandramixin + if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) { + //reserved name + continue; + } + cols.add(col); + vals.add(jsonOp.get(col)); + } + } protected void buildAndExecuteSQLInverse(Statement jdbcStmt, Operation op, ArrayList<String> cols, ArrayList<Object> vals) throws SQLException, MDBCServiceException { @@ -999,7 +1025,7 @@ public class MySQLMixin implements DBInterface { * @throws MDBCServiceException */ - protected StringBuilder constructSQLInverse(Operation op, ArrayList<String> cols, ArrayList<Object> vals) + public StringBuilder constructSQLInverse(Operation op, ArrayList<String> cols, ArrayList<Object> vals) throws MDBCServiceException { StringBuilder sqlInverse = null; switch (op.getOperationType()) { @@ -1015,7 +1041,7 @@ public class MySQLMixin implements DBInterface { return sqlInverse; } - protected StringBuilder constructSQL(Operation op, ArrayList<String> cols, ArrayList<Object> vals) + public StringBuilder constructSQL(Operation op, ArrayList<String> cols, ArrayList<Object> vals) throws MDBCServiceException { StringBuilder sql = null; switch (op.getOperationType()) { @@ -1059,7 +1085,7 @@ public class MySQLMixin implements DBInterface { sql.append(") VALUES ("); sep = ""; for (Object val : vals) { - sql.append(sep + "\"" + val + "\""); + sql.append(sep + (val!=JSONObject.NULL?"\"" + val +"\"":"null")); sep = ", "; } sql.append(");"); @@ -1074,7 +1100,7 @@ public class MySQLMixin implements DBInterface { sql.append(r + " SET "); sep = ""; for (int i = 0; i < cols.size(); i++) { - sql.append(sep + cols.get(i) + "=\"" + vals.get(i) + "\""); + sql.append(sep + cols.get(i) + (vals.get(i)!=JSONObject.NULL?"=\"" + vals.get(i) +"\"":"=null")); sep = ", "; } sql.append(" WHERE "); @@ -1095,7 +1121,7 @@ public class MySQLMixin implements DBInterface { String and = ""; for (String key : primaryKeys.keySet()) { // We cannot use the default primary key for the sql table and operations - if (!key.equals(mi.getMusicDefaultPrimaryKeyName())) { + if(!key.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) { Object val = primaryKeys.get(key); keyCondStmt.append(and + key + "=\"" + val + "\""); and = " AND "; @@ -1122,12 +1148,12 @@ public class MySQLMixin implements DBInterface { } @Override - public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) { - String query = "UPDATE " + CKPT_TBL + " SET MRIROW=?, DIGESTINDEX=? where RANGENAME=?;"; + public void updateCheckpointLocations(Range r, Pair<MriReference, MusicTxDigestId> playbackPointer) { + String query = "UPDATE " + CKPT_TBL + " SET MRIROW=?, DIGESTID=? where RANGENAME=?;"; try { PreparedStatement stmt = jdbcConn.prepareStatement(query); - stmt.setString(1, playbackPointer.getLeft().toString()); - stmt.setInt(2, playbackPointer.getRight()); + stmt.setString(1, playbackPointer.getLeft().getIndex().toString()); + stmt.setString(2, playbackPointer.getRight().transactionId.toString()); stmt.setString(3, r.getTable()); stmt.execute(); stmt.close(); @@ -1137,6 +1163,30 @@ public class MySQLMixin implements DBInterface { } @Override + public Map<Range, Pair<MriReference, MusicTxDigestId>> getCheckpointLocations() { + Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = new ConcurrentHashMap<>(); + try { + Statement stmt = jdbcConn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT * FROM " + CKPT_TBL + ";"); + while (rs.next()) { + Range r = new Range(rs.getString("RANGENAME")); + String mrirow = rs.getString("MRIROW"); + String txId = rs.getString("DIGESTID"); + if (mrirow!=null) { + logger.info(EELFLoggerDelegate.applicationLogger, + "Previously checkpointed: " + r.getTable() + " at (" + mrirow + ", " + txId + ")"); + alreadyApplied.put(r, Pair.of(new MriReference(mrirow), new MusicTxDigestId(mrirow, txId, -1))); + } + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Unable to get replay checkpoint location", e); + } + + return alreadyApplied; + } + + @Override public void initTables() { try { Statement stmt = jdbcConn.createStatement(); @@ -1161,5 +1211,4 @@ public class MySQLMixin implements DBInterface { logger.error(EELFLoggerDelegate.errorLogger, "initTables: problem creating th mdbc tables!"); } } - -} +}
\ No newline at end of file diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java index 4afaa71..15c7620 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java @@ -33,7 +33,7 @@ import net.sf.jsqlparser.statement.delete.Delete; import net.sf.jsqlparser.statement.insert.Insert; import net.sf.jsqlparser.statement.update.Update; import org.apache.commons.lang3.tuple.Pair; -import org.apache.zookeeper.KeeperException.UnimplementedException; +//import org.apache.zookeeper.KeeperException.UnimplementedException; import org.json.JSONObject; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; @@ -42,6 +42,8 @@ import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; import org.onap.music.mdbc.mixins.MySQLMixin.StagingTableUpdateRunnable; +import org.onap.music.mdbc.tables.MriReference; +import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.Operation; import org.onap.music.mdbc.query.SQLOperation; import org.onap.music.mdbc.tables.StagingTable; @@ -817,7 +819,7 @@ public class PostgresMixin implements DBInterface { * @param transaction - base 64 encoded, serialized digest */ @Override - public void replayTransaction(StagingTable transaction, Set<Range> ranges) + public void replayTransaction(StagingTable transaction) throws SQLException, MDBCServiceException { boolean autocommit = jdbcConn.getAutoCommit(); jdbcConn.setAutoCommit(false); @@ -825,7 +827,6 @@ public class PostgresMixin implements DBInterface { final ArrayList<Operation> opList = transaction.getOperationList(); for (Operation op : opList) { - if (Range.overlaps(ranges, op.getTable())) { try { replayOperationIntoDB(jdbcStmt, op); } catch (SQLException | MDBCServiceException e) { @@ -835,7 +836,6 @@ public class PostgresMixin implements DBInterface { jdbcConn.rollback(); throw e; } - } } clearReplayedOperations(jdbcStmt); @@ -859,8 +859,8 @@ public class PostgresMixin implements DBInterface { } @Override - public void applyTxDigest(StagingTable txDigest, Set<Range> ranges) throws SQLException, MDBCServiceException { - replayTransaction(txDigest, ranges); + public void applyTxDigest(StagingTable txDigest) throws SQLException, MDBCServiceException { + replayTransaction(txDigest); } /** @@ -1067,7 +1067,12 @@ public class PostgresMixin implements DBInterface { } @Override - public void updateCheckpointLocations(Range r, Pair<UUID, Integer> playbackPointer) { + public void updateCheckpointLocations(Range r, Pair<MriReference, MusicTxDigestId> playbackPointer) { + throw new org.apache.commons.lang.NotImplementedException(); + } + + @Override + public Map<Range, Pair<MriReference, MusicTxDigestId>> getCheckpointLocations() { throw new org.apache.commons.lang.NotImplementedException(); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java index 9d1685c..142cb34 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java @@ -32,6 +32,7 @@ import org.onap.music.mdbc.Range; import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MriRowComparator; import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.mdbc.tables.MusicTxDigestId; public class Dag { @@ -145,7 +146,6 @@ public class Dag { if(!readyInit){ initApplyDatastructures(); } - Set<Range> rangesSet = new HashSet<>(ranges); while(!toApplyNodes.isEmpty()){ DagNode nextNode = toApplyNodes.poll(); List<DagNode> outgoing = nextNode.getOutgoingEdges(); @@ -155,7 +155,7 @@ public class Dag { toApplyNodes.add(out); } } - if(!nextNode.wasApplied(rangesSet)){ + if(!nextNode.wasApplied(ranges)){ return nextNode; } } @@ -233,23 +233,23 @@ public class Dag { return toApplyNodes.isEmpty(); } - public void setAlreadyApplied(Map<Range, Pair<MriReference,Integer>> alreadyApplied, Set<Range> ranges) + public void setAlreadyApplied(Map<Range, Pair<MriReference,MusicTxDigestId>> alreadyApplied, Set<Range> ranges) throws MDBCServiceException { - for(Map.Entry<UUID,DagNode> node : nodes.entrySet()){ + for (DagNode node: nodes.values()) { Set<Range> intersection = new HashSet<>(ranges); - intersection.retainAll(node.getValue().getRangeSet()); + intersection.retainAll(node.getRangeSet()); for(Range r : intersection){ if(alreadyApplied.containsKey(r)){ - final Pair<MriReference, Integer> appliedPair = alreadyApplied.get(r); + final Pair<MriReference, MusicTxDigestId> appliedPair = alreadyApplied.get(r); final MriReference appliedRow = appliedPair.getKey(); - final int index = appliedPair.getValue(); + final int index = appliedPair.getValue().index; final long appliedTimestamp = appliedRow.getTimestamp(); - final long nodeTimestamp = node.getValue().getTimestamp(); + final long nodeTimestamp = node.getTimestamp(); if(appliedTimestamp > nodeTimestamp){ - setReady(node.getValue(),r); + setReady(node,r); } else if(appliedTimestamp == nodeTimestamp){ - setPartiallyReady(node.getValue(),r,index); + setPartiallyReady(node,r,index); } } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java index 78c68e1..5e4c899 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java @@ -30,6 +30,7 @@ import java.util.UUID; import org.apache.commons.lang3.tuple.Pair; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.MusicTxDigestId; @@ -73,6 +74,10 @@ public class DagNode { return owned; } + /** + * + * @return the row's MRI Index represented by this dagnode + */ public UUID getId(){ return row.getPartitionIndex(); } @@ -149,20 +154,25 @@ public class DagNode { currentIndex = currentIndex+1; } - public synchronized Pair<MusicTxDigestId, List<Range>> nextNotAppliedTransaction(Set<Range> ranges){ + /** + * + * @param ranges + * @return the index of the next transaction to replay and the ranges needed for this transaction + */ + public synchronized Pair<MusicTxDigestId, Set<Range>> nextNotAppliedTransaction(Set<Range> ranges){ if(row.getRedoLog().isEmpty()) return null; if(!applyInit){ initializeApply(ranges); } final List<MusicTxDigestId> redoLog = row.getRedoLog(); if(currentIndex < redoLog.size()){ - List<Range> responseRanges= new ArrayList<>(); + Set<Range> responseRanges= new HashSet<>(); startIndex.forEach((r, index) -> { if(index < currentIndex){ responseRanges.add(r); } }); - return Pair.of(redoLog.get(currentIndex++),responseRanges); + return Pair.of(row.getRedoLog().get(currentIndex++),responseRanges); } return null; } @@ -179,7 +189,7 @@ public class DagNode { if(row.getRedoLog().isEmpty()) return true; if(!applyInit){ initializeApply(ranges); - } + } return currentIndex >= row.getRedoLog().size(); } @@ -194,11 +204,13 @@ public class DagNode { if(o == null) return false; if(!(o instanceof DagNode)) return false; DagNode other = (DagNode) o; - return other.row.getPartitionIndex().equals(this.row.getPartitionIndex()); + return other.row.equals(this.row); } @Override public int hashCode(){ return row.getPartitionIndex().hashCode(); } + + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java index 933e000..0898e5d 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java @@ -27,9 +27,11 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang3.tuple.Pair; import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.exceptions.MusicDeadlockException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.Utils; import org.onap.music.mdbc.mixins.DBInterface; import org.onap.music.mdbc.mixins.LockRequest; import org.onap.music.mdbc.mixins.LockResult; @@ -45,8 +47,7 @@ public class OwnershipAndCheckpoint{ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(OwnershipAndCheckpoint.class); private Lock checkpointLock; - private AtomicBoolean change; - private Map<Range, Pair<MriReference, Integer>> alreadyApplied; + private Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied; private Map<UUID,Long> ownershipBeginTime; private long timeoutInMs; @@ -54,8 +55,7 @@ public class OwnershipAndCheckpoint{ this(new HashMap<>(),Long.MAX_VALUE); } - public OwnershipAndCheckpoint(Map<Range, Pair<MriReference, Integer>> alreadyApplied, long timeoutInMs){ - change = new AtomicBoolean(true); + public OwnershipAndCheckpoint(Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied, long timeoutInMs){ checkpointLock = new ReentrantLock(); this.alreadyApplied = alreadyApplied; ownershipBeginTime = new HashMap<>(); @@ -130,20 +130,17 @@ public class OwnershipAndCheckpoint{ * @param di * @param extendedDag * @param ranges - * @param locks * @param ownOpId * @throws MDBCServiceException */ - public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, Set<Range> ranges, - Map<MusicRangeInformationRow, LockResult> locks, UUID ownOpId) throws MDBCServiceException { + public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, Set<Range> ranges, UUID ownOpId) + throws MDBCServiceException { if(ranges.isEmpty()){ return; } try { checkpointLock.lock(); - change.set(true); - Set<Range> rangesSet = new HashSet<>(ranges); - extendedDag.setAlreadyApplied(alreadyApplied, rangesSet); + extendedDag.setAlreadyApplied(alreadyApplied, ranges); applyRequiredChanges(mi, di, extendedDag, ranges, ownOpId); } catch(MDBCServiceException e){ @@ -163,18 +160,18 @@ public class OwnershipAndCheckpoint{ } } - private void disableForeignKeys(DBInterface di) throws MDBCServiceException { + private void disableForeignKeys(DBInterface dbi) throws MDBCServiceException { try { - di.disableForeignKeyChecks(); + dbi.disableForeignKeyChecks(); } catch (SQLException e) { throw new MDBCServiceException("Error disable foreign keys checks",e); } } - private void applyTxDigest(Set<Range> ranges, DBInterface di, StagingTable txDigest) + private void applyTxDigest(DBInterface dbi, StagingTable txDigest) throws MDBCServiceException { try { - di.applyTxDigest(txDigest,ranges); + dbi.applyTxDigest(txDigest); } catch (SQLException e) { throw new MDBCServiceException("Error applying tx digest in local SQL",e); } @@ -191,69 +188,82 @@ public class OwnershipAndCheckpoint{ if(rangesToWarmup.isEmpty()){ return; } - boolean ready = false; - change.set(true); - Set<Range> rangeSet = new HashSet<Range>(rangesToWarmup); Dag dag = new Dag(false); - while(!ready){ - if(change.get()){ - change.set(false); - final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, rangesToWarmup,false); - dag = Dag.getDag(rows,rangesToWarmup); - } - else if(!dag.applied()){ - DagNode node = dag.nextToApply(rangesToWarmup); - if(node!=null) { - Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet); - while (pair != null) { + final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, rangesToWarmup,false); + dag = Dag.getDag(rows,rangesToWarmup); + dag.setAlreadyApplied(alreadyApplied, rangesToWarmup); + while(!dag.applied()){ + DagNode node = dag.nextToApply(rangesToWarmup); + if(node!=null) { + Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(rangesToWarmup); + while (pair != null) { + checkpointLock.lock(); + try { disableForeignKeys(di); - checkpointLock.lock(); - if (change.get()) { - enableForeignKeys(di); - checkpointLock.unlock(); - break; - } else { - applyDigestAndUpdateDataStructures(mi, di, rangesToWarmup, node, pair); - } - pair = node.nextNotAppliedTransaction(rangeSet); + applyDigestAndUpdateDataStructures(mi, di, node, pair.getLeft(), pair.getRight()); + pair = node.nextNotAppliedTransaction(rangesToWarmup); enableForeignKeys(di); + } catch (MDBCServiceException e) { checkpointLock.unlock(); + throw e; } + checkpointLock.unlock(); } } - else{ - ready = true; - } } } /** - * Apply tx digest for ranges, update checkpoint location (alreadyApplied) + * Apply tx digest for dagnode update checkpoint location (alreadyApplied) * @param mi * @param di - * @param ranges * @param node * @param pair * @throws MDBCServiceException */ - private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface di, Set<Range> ranges, DagNode node, - Pair<MusicTxDigestId, List<Range>> pair) throws MDBCServiceException { + private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface dbi, DagNode node, + MusicTxDigestId digestId, Set<Range> ranges) throws MDBCServiceException { + if (alreadyReplayed(node, digestId)) { + return; + } + final StagingTable txDigest; try { - txDigest = mi.getTxDigest(pair.getKey()); + txDigest = mi.getTxDigest(digestId); } catch (MDBCServiceException e) { logger.warn("Transaction digest was not found, this could be caused by a failure of the previous owner" +"And would normally only happen as the last ID of the corresponding redo log. Please check that this is the" - +" case for txID "+pair.getKey().transactionId.toString()); + +" case for txID "+digestId.transactionId.toString()); return; } - applyTxDigest(ranges,di, txDigest); - for (Range r : pair.getValue()) { - MusicRangeInformationRow row = node.getRow(); - alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index)); - - updateCheckpointLocations(mi, di, r, row.getPartitionIndex(), pair.getKey().index); + applyTxDigest(dbi, txDigest); + MusicRangeInformationRow row = node.getRow(); + updateAlreadyApplied(mi, dbi, ranges, row.getPartitionIndex(), digestId); + } + + /** + * Determine if this musictxdigest id has already been replayed + * @param node + * @param redoLogIndex + * @return true if alreadyApplied is past this node/redolog, false if it hasn't been replayed + */ + public boolean alreadyReplayed(DagNode node, MusicTxDigestId txdigest) { + int index = node.getRow().getRedoLog().indexOf(txdigest); + for (Range range: node.getRangeSet()) { + Pair<MriReference, MusicTxDigestId> applied = alreadyApplied.get(range); + if (applied==null) { + return false; + } + MriReference appliedMriRef = applied.getLeft(); + MusicTxDigestId appliedDigest = applied.getRight(); + appliedDigest.index = node.getRow().getRedoLog().indexOf(appliedDigest); + if (appliedMriRef==null || appliedMriRef.getTimestamp() < node.getTimestamp() + || (appliedMriRef.getTimestamp() == node.getTimestamp() + && appliedDigest.index < index)) { + return false; + } } + return true; } /** @@ -261,12 +271,13 @@ public class OwnershipAndCheckpoint{ * @param mi * @param di * @param r - * @param partitionIndex + * @param mriRef * @param index + * @throws MDBCServiceException */ - private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, int index) { - dbi.updateCheckpointLocations(r, Pair.of(partitionIndex, index)); - mi.updateCheckpointLocations(r, Pair.of(partitionIndex, index)); + private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, MriReference mriRef, MusicTxDigestId txdigest) { + dbi.updateCheckpointLocations(r, Pair.of(mriRef, txdigest)); + mi.updateCheckpointLocations(r, Pair.of(mriRef, txdigest)); } /** @@ -280,15 +291,14 @@ public class OwnershipAndCheckpoint{ */ private void applyRequiredChanges(MusicInterface mi, DBInterface db, Dag extendedDag, Set<Range> ranges, UUID ownOpId) throws MDBCServiceException { - Set<Range> rangeSet = new HashSet<Range>(ranges); disableForeignKeys(db); while(!extendedDag.applied()){ DagNode node = extendedDag.nextToApply(ranges); if(node!=null) { - Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet); + Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(ranges); while (pair != null) { - applyDigestAndUpdateDataStructures(mi, db, ranges, node, pair); - pair = node.nextNotAppliedTransaction(rangeSet); + applyDigestAndUpdateDataStructures(mi, db, node, pair.getLeft(), pair.getRight()); + pair = node.nextNotAppliedTransaction(ranges); if (timeout(ownOpId)) { enableForeignKeys(db); throw new MDBCServiceException("Timeout apply changes to local dbi"); @@ -311,7 +321,11 @@ public class OwnershipAndCheckpoint{ */ public OwnershipReturn own(MusicInterface mi, Set<Range> ranges, DatabasePartition currPartition, UUID opId, SQLOperationType lockType) throws MDBCServiceException { - + return own(mi, ranges, currPartition, opId, lockType, null); + } + + public OwnershipReturn own(MusicInterface mi, Set<Range> ranges, + DatabasePartition currPartition, UUID opId, SQLOperationType lockType, String ownerId) throws MDBCServiceException { if (ranges == null || ranges.isEmpty()) { return null; } @@ -331,7 +345,18 @@ public class OwnershipAndCheckpoint{ while ( (toOwn.isDifferent(currentlyOwn) || !currentlyOwn.isOwned() ) && !timeout(opId) ) { - takeOwnershipOfDag(mi, currPartition, opId, locksForOwnership, toOwn, lockType); + try { + takeOwnershipOfDag(mi, currPartition, opId, locksForOwnership, toOwn, lockType, ownerId); + } catch (MDBCServiceException e) { + MusicDeadlockException de = Utils.getDeadlockException(e); + if (de!=null) { + locksForOwnership.remove(currPartition.getMRIIndex()); + mi.releaseLocks(locksForOwnership); + stopOwnershipTimeoutClock(opId); + logger.error("Error when owning a range: Deadlock detected"); + } + throw e; + } currentlyOwn=toOwn; //TODO instead of comparing dags, compare rows rangesToOwnRows = extractRowsForRange(mi, rangesToOwn, false); @@ -347,9 +372,9 @@ public class OwnershipAndCheckpoint{ } Set<Range> allRanges = currentlyOwn.getAllRanges(); //TODO: we shouldn't need to go back to music at this point - List<MusicRangeInformationRow> latestRows = extractRowsForRange(mi, new HashSet<>(allRanges), true); + List<MusicRangeInformationRow> latestRows = extractRowsForRange(mi, allRanges, true); currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,latestRows)); - return mi.mergeLatestRowsIfNecessary(currentlyOwn,locksForOwnership,opId); + return mi.mergeLatestRowsIfNecessary(currentlyOwn,locksForOwnership,opId, ownerId); } /** @@ -362,7 +387,8 @@ public class OwnershipAndCheckpoint{ * @throws MDBCServiceException */ private void takeOwnershipOfDag(MusicInterface mi, DatabasePartition partition, UUID opId, - Map<UUID, LockResult> ownershipLocks, Dag toOwn, SQLOperationType lockType) throws MDBCServiceException { + Map<UUID, LockResult> ownershipLocks, Dag toOwn, SQLOperationType lockType, String ownerId) + throws MDBCServiceException { while(toOwn.hasNextToOwn()){ DagNode node = toOwn.nextToOwn(); @@ -374,10 +400,16 @@ public class OwnershipAndCheckpoint{ false, partition.getSnapshot())); } else if ( ownershipLocks.containsKey(uuidToOwn) || !row.getIsLatest() ) { toOwn.setOwn(node); + if (ownershipLocks.containsKey(uuidToOwn) && !row.getIsLatest()) { + //previously owned partition that is no longer latest, don't need anymore + LockResult result = ownershipLocks.get(uuidToOwn); + ownershipLocks.remove(uuidToOwn); + mi.relinquish(result.getLockId(), uuidToOwn.toString()); + } } else { LockRequest request = new LockRequest(uuidToOwn, new ArrayList<>(node.getRangeSet()), lockType); - String lockId = mi.createLock(request); + String lockId = mi.createLock(request, ownerId); LockResult result = null; boolean owned = false; while(!owned && !timeout(opId)){ @@ -457,15 +489,6 @@ public class OwnershipAndCheckpoint{ } - - public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException { - Set<Range> snapshot = partition.getSnapshot(); - UUID row = partition.getMRIIndex(); - for(Range r : snapshot){ - alreadyApplied.put(r,Pair.of(new MriReference(row),-1)); - } - } - // \TODO merge with dag code private Map<Range,Set<DagNode>> getIsLatestPerRange(Dag dag, List<MusicRangeInformationRow> rows) throws MDBCServiceException { Map<Range,Set<DagNode>> rowsPerLatestRange = new HashMap<>(); @@ -490,8 +513,20 @@ public class OwnershipAndCheckpoint{ } - public Map<Range, Pair<MriReference, Integer>> getAlreadyApplied() { + public Map<Range, Pair<MriReference, MusicTxDigestId>> getAlreadyApplied() { return this.alreadyApplied; - } + } + public void updateAlreadyApplied(MusicInterface mi, DBInterface dbi, Set<Range> ranges, UUID mriIndex, MusicTxDigestId digestId) { + for (Range r: ranges) { + updateAlreadyApplied(mi, dbi, r, mriIndex, digestId); + } + } + + public void updateAlreadyApplied(MusicInterface mi, DBInterface dbi, Range r, UUID mriIndex, MusicTxDigestId digestId) { + MriReference mriRef = new MriReference(mriIndex); + alreadyApplied.put(r, Pair.of(mriRef, digestId)); + updateCheckpointLocations(mi, dbi, r, mriRef, digestId); + } + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java b/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java index 6d6c661..27ea6ea 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java @@ -30,6 +30,7 @@ import org.apache.calcite.avatica.util.Casing; import org.apache.calcite.avatica.util.Quoting; import org.apache.calcite.sql.SqlBasicCall; import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlDelete; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlInsert; import org.apache.calcite.sql.SqlJoin; @@ -120,12 +121,15 @@ public class QueryProcessor { case UPDATE: parseUpdate((SqlUpdate) sqlNode, tableOpsMap); break; + case DELETE: + parseDelete((SqlDelete) sqlNode, tableOpsMap); + break; case SELECT: parseSelect((SqlSelect) sqlNode, tableOpsMap); break; case ORDER_BY: parseSelect((SqlSelect)((SqlOrderBy) sqlNode).query, tableOpsMap); - break; + break; default: logger.error("Unhandled sql query type " + sqlNode.getKind() +" for query " + query); } @@ -144,7 +148,7 @@ public class QueryProcessor { Ops.add(SQLOperation.INSERT); tableOpsMap.put(tableName, Ops); } - + private static void parseUpdate(SqlUpdate sqlUpdate, Map<String, List<SQLOperation>> tableOpsMap) { SqlNode targetTable = sqlUpdate.getTargetTable(); switch (targetTable.getKind()) { @@ -155,7 +159,18 @@ public class QueryProcessor { logger.error("Unable to process: " + targetTable.getKind() + " query"); } } - + + private static void parseDelete(SqlDelete sqlDelete, Map<String, List<SQLOperation>> tableOpsMap) { + SqlNode targetTable = sqlDelete.getTargetTable(); + switch (targetTable.getKind()) { + case IDENTIFIER: + addIdentifierToMap(tableOpsMap, (SqlIdentifier) targetTable, SQLOperation.DELETE); + break; + default: + logger.error("Unable to process: " + targetTable.getKind() + " query"); + } + } + private static void parseSelect(SqlSelect sqlSelect, Map<String, List<SQLOperation>> tableOpsMap ) { SqlNode from = sqlSelect.getFrom(); switch (from.getKind()) { diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java index 8aad335..3c15487 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java @@ -28,6 +28,19 @@ public final class MriReference { this.index= index; } - public long getTimestamp() { return index.timestamp();} + public MriReference(String mrirow) { + index = UUID.fromString(mrirow); + } + public long getTimestamp() { + return index.timestamp(); + } + + public UUID getIndex() { + return this.index; + } + + public String toString() { + return index.toString(); + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java index de711ef..8c95047 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java @@ -96,7 +96,7 @@ public final class MusicRangeInformationRow implements Comparable<MusicRangeInfo if(o == null) return false; if(!(o instanceof MusicRangeInformationRow)) return false; MusicRangeInformationRow other = (MusicRangeInformationRow) o; - return other.getPartitionIndex().equals(this.getPartitionIndex()); + return other.getPartitionIndex().equals(this.getPartitionIndex()) && other.getRedoLog().equals(this.getRedoLog()); } @Override diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java index 6f95d3c..a1ef346 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java @@ -60,8 +60,7 @@ public class MusicTxDigestDaemon implements Runnable { for (UUID txTimeID : keys) { transaction = ecDigestInformation.get(txTimeID); try { - dbi.replayTransaction(transaction, - ranges); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??) + dbi.replayTransaction(transaction); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??) } catch (SQLException e) { logger.error("EC:Rolling back the entire digest replay."); return; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java index db9e455..59eb97e 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java @@ -24,13 +24,19 @@ import java.util.UUID; public final class MusicTxDigestId { public final UUID mriId; public final UUID transactionId; - public final int index; + public int index; public MusicTxDigestId(UUID mriRowId, UUID digestId, int index) { this.mriId=mriRowId; this.transactionId= digestId; this.index=index; } + + public MusicTxDigestId(String mriRowId, String digestId, int index) { + this.mriId = UUID.fromString(mriRowId); + this.transactionId = UUID.fromString(digestId); + this.index = index; + } public MusicTxDigestId(UUID digestId, int index) { this.mriId = null; @@ -55,4 +61,8 @@ public final class MusicTxDigestId { public int hashCode(){ return transactionId.hashCode(); } + + public String toString() { + return this.transactionId.toString(); + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/TxDigestDecompression.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/TxDigestDecompression.java index 0b422fa..6b7b7be 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/TxDigestDecompression.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/TxDigestDecompression.java @@ -21,13 +21,15 @@ package org.onap.music.mdbc.tools; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.UUID; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; -import org.onap.music.mdbc.StateManager; +import org.onap.music.mdbc.mixins.MusicInterface; import org.onap.music.mdbc.mixins.MusicMixin; +import org.onap.music.mdbc.mixins.MySQLMixin; import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.Operation; @@ -42,6 +44,12 @@ import org.onap.music.mdbc.tables.StagingTable; public class TxDigestDecompression { public static final EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TxDigestDecompression.class); MusicMixin mi; + MySQLMixin ms; + + public TxDigestDecompression(MusicInterface _mi) { + mi = (MusicMixin) _mi; + ms = new MySQLMixin(); + } public TxDigestDecompression() { Properties prop = new Properties(); @@ -52,6 +60,7 @@ public class TxDigestDecompression { } try { mi = new MusicMixin(null, "mdbcservername", prop); + ms = new MySQLMixin(); } catch (MDBCServiceException e) { e.printStackTrace(); return; @@ -64,16 +73,7 @@ public class TxDigestDecompression { List<MusicRangeInformationRow> rows = mi.getAllMriRows(); for (MusicRangeInformationRow row: rows) { UUID mriId = row.getPartitionIndex(); - for (MusicTxDigestId id: row.getRedoLog()) { - StagingTable st = mi.getTxDigest(id); - System.out.print(id.transactionId + ": ["); - String sep = ""; - for (Operation op: st.getOperationList()) { - System.out.print(sep + op.getOperationType() + "-" + op.getTable() + "->" + op.getVal()); - sep =", "; - } - System.out.println("]"); - } + extractedRedoLog(row); } } catch (MDBCServiceException e) { e.printStackTrace(); @@ -81,6 +81,25 @@ public class TxDigestDecompression { } System.exit(0); } + + public void extractedRedoLog(MusicRangeInformationRow row) throws MDBCServiceException { + for (MusicTxDigestId id: row.getRedoLog()) { + StagingTable st = mi.getTxDigest(id); + System.out.print(id.transactionId + ": ["); + String sep = ", "; + for (Operation op: st.getOperationList()) { + + ArrayList<String> cols = new ArrayList<String>(); + ArrayList<Object> vals = new ArrayList<Object>(); + ms.constructColValues(op.getVal(), cols, vals); + StringBuilder sql = ms.constructSQL(op, cols, vals); + + System.out.print(sql + sep); + + } + System.out.println("]"); + } + } public static void main(String[] args) { TxDigestDecompression txDecompress = new TxDigestDecompression(); diff --git a/mdbc-server/src/main/resources/key.properties b/mdbc-server/src/main/resources/key.properties new file mode 100644 index 0000000..bd5d472 --- /dev/null +++ b/mdbc-server/src/main/resources/key.properties @@ -0,0 +1 @@ +cipher.enc.key=AAECAwQFBgcICQoLDA0ODw== diff --git a/mdbc-server/src/main/resources/mdbc.properties b/mdbc-server/src/main/resources/mdbc.properties index 49fdfd2..60adfae 100755 --- a/mdbc-server/src/main/resources/mdbc.properties +++ b/mdbc-server/src/main/resources/mdbc.properties @@ -15,4 +15,10 @@ DEFAULT_DRIVERS=\ org.mariadb.jdbc.Driver \ org.postgresql.Driver -txdaemonsleeps=15 +# whether or not to split the partitions +partition_splitting=true + +write_locks_only=true + +#time, in seconds, between when the daemon catches up +txdaemonsleeps=15
\ No newline at end of file diff --git a/mdbc-server/src/main/resources/music.properties b/mdbc-server/src/main/resources/music.properties index 1aaf7fd..23908ad 100755 --- a/mdbc-server/src/main/resources/music.properties +++ b/mdbc-server/src/main/resources/music.properties @@ -3,6 +3,7 @@ cassandra.host =\ cassandra.user =\ cassandra cassandra.password =\ - cassandra + OB06GaQG8BJOts8diB1jXS+LZrNUkplCt1XW5XwMAes= +# password "cassandra" encrypted with trivial key found in key.properties music_namespace =\ mdbc_namespace diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java index 72ec8d3..626b6ca 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java @@ -40,9 +40,11 @@ import org.junit.rules.TemporaryFolder; import org.onap.music.datastore.MusicDataStore; import org.onap.music.datastore.MusicDataStoreHandle; import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.exceptions.MusicServiceException; import org.onap.music.lockingservice.cassandra.CassaLockStore; import org.onap.music.mdbc.mixins.MusicMixin; import org.onap.music.mdbc.mixins.PostgresMixin; +import org.powermock.reflect.Whitebox; public class MdbcTestUtils { @@ -68,7 +70,7 @@ public class MdbcTestUtils { final private static String nodeInfoTableName = "nodeinfo"; //Mariadb variables static DB db=null; - final public static String mariaDBDatabaseName="test"; + final public static String mariaDBDatabaseName="TEST"; final static Integer mariaDbPort=13306; @@ -197,13 +199,14 @@ public class MdbcTestUtils { static void stopMySql(){ try { db.stop(); + db=null; } catch (ManagedProcessException e) { e.printStackTrace(); fail("Error closing mysql"); } } - public static void cleanDatabase(DBType type){ + public static void stopDatabase(DBType type){ switch(type) { case MySQL: stopMySql(); @@ -216,7 +219,7 @@ public class MdbcTestUtils { } } - public static void initCassandra(){ + public static void initCassandra() throws MDBCServiceException { try { EmbeddedCassandraServerHelper.startEmbeddedCassandra(EmbeddedCassandraServerHelper.CASSANDRA_RNDPORT_YML_FILE); } catch (Exception e) { @@ -230,8 +233,11 @@ public class MdbcTestUtils { session = EmbeddedCassandraServerHelper.getSession(); assertNotNull("Invalid configuration for cassandra", session); - MusicDataStoreHandle.mDstoreHandle = new MusicDataStore(cluster, session); - CassaLockStore store = new CassaLockStore(MusicDataStoreHandle.mDstoreHandle); + + MusicDataStore mds = new MusicDataStore(cluster, session); + Whitebox.setInternalState(MusicDataStoreHandle.class, "mDstoreHandle", mds); + CassaLockStore store = new CassaLockStore(mds); + assertNotNull("Invalid configuration for music", store); } @@ -249,7 +255,6 @@ public class MdbcTestUtils { public static MusicMixin getMusicMixin() throws MDBCServiceException { initNamespaces(); - initTables(); MusicMixin mixin=null; try { Properties properties = new Properties(); @@ -268,13 +273,4 @@ public class MdbcTestUtils { MusicMixin.createKeyspace("music_internal",1); MusicMixin.createKeyspace(keyspace,1); } - - public static void initTables() throws MDBCServiceException{ - MusicMixin.createMusicRangeInformationTable(keyspace, mriTableName); - MusicMixin.createMusicTxDigest(mtdTableName,keyspace, -1); - MusicMixin.createMusicEventualTxDigest(eventualMtxdTableName,keyspace, -1); - MusicMixin.createMusicNodeInfoTable(nodeInfoTableName,keyspace,-1); - MusicMixin.createMusicRangeDependencyTable(keyspace,rangeDependencyTableName); - } - } diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java index ef26cb6..bf27ea8 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java @@ -67,6 +67,8 @@ import org.onap.music.mdbc.query.SQLOperationType; import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.StagingTable; +import org.onap.music.service.impl.MusicCassaCore; + import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Session; import com.google.protobuf.InvalidProtocolBufferException; @@ -77,6 +79,7 @@ import org.onap.music.mdbc.ownership.Dag; import org.onap.music.mdbc.ownership.DagNode; import org.onap.music.mdbc.tables.MusicRangeInformationRow; +@Ignore public class MusicMixinTest { @@ -107,7 +110,7 @@ public class MusicMixinTest { public void initTest() throws MDBCServiceException { session = MdbcTestUtils.getSession(); session.execute("DROP KEYSPACE IF EXISTS "+ MdbcTestUtils.getKeyspace()); - mixin=MdbcTestUtils.getMusicMixin(); + mixin = MdbcTestUtils.getMusicMixin(); } //@Test(timeout=10000) @@ -141,17 +144,17 @@ public class MusicMixinTest { private DatabasePartition addRow(Set<Range> ranges,boolean isLatest){ final UUID uuid = MDBCUtils.generateTimebasedUniqueKey(); - DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null); + DatabasePartition dbPartition = new DatabasePartition(ranges,uuid); MusicRangeInformationRow newRow = new MusicRangeInformationRow(dbPartition, new ArrayList<>(), isLatest); DatabasePartition partition=null; try { - partition = mixin.createLockedMRIRow(newRow); + partition = mixin.createLockedMRIRow(newRow, ""); } catch (MDBCServiceException e) { fail("failure when creating new row"); } String fullyQualifiedMriKey = MdbcTestUtils.getKeyspace()+"."+ MdbcTestUtils.getMriTableName()+"."+partition.getMRIIndex().toString(); try { - MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId()); + MusicLockState musicLockState = MusicCassaCore.getInstance().voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId()); } catch (MusicLockingException e) { fail("failure when releasing lock"); } @@ -253,21 +256,7 @@ public class MusicMixinTest { mixin.addEventualTxDigest(digestId, compressed); LinkedHashMap<UUID, StagingTable> digest = mixin.getEveTxDigest("n1"); - - Consumer<Map.Entry<UUID,StagingTable>> consumer = new Consumer<Map.Entry<UUID,StagingTable>>() { - - @Override - public void accept(Entry<UUID, StagingTable> mapEntry) { - assertNotNull(mapEntry.getValue()); - } - - }; - - digest.entrySet().forEach(consumer); - - - - + digest.entrySet().forEach(e -> assertNotNull(e.getValue())); } protected ByteBuffer mockCompressedProtoByteBuff() throws MDBCServiceException, InvalidProtocolBufferException { diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MySQLMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MySQLMixinTest.java index 1f2c1dd..cf23305 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MySQLMixinTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MySQLMixinTest.java @@ -20,16 +20,25 @@ package org.onap.music.mdbc.mixins; +import java.util.HashSet; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import org.apache.commons.lang3.tuple.Pair; import org.junit.*; - +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.sql.Statement; +import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.MdbcTestUtils; import org.onap.music.mdbc.MdbcTestUtils.DBType; +import org.onap.music.mdbc.Range; import org.onap.music.mdbc.mixins.MySQLMixin; - +import org.onap.music.mdbc.tables.MriReference; +import org.onap.music.mdbc.tables.MusicTxDigestId; import ch.vorburger.mariadb4j.DB; public class MySQLMixinTest { @@ -60,7 +69,7 @@ public class MySQLMixinTest { @AfterClass public static void close() throws Exception { - + MdbcTestUtils.stopDatabase(DBType.MySQL); } @Before @@ -70,9 +79,81 @@ public class MySQLMixinTest { this.mysqlMixin = new MySQLMixin(null, null, conn, info); } - @Test - public void testGetDataBaseName() throws SQLException { - Assert.assertEquals(MdbcTestUtils.getMariaDBDBName(), mysqlMixin.getDatabaseName()); - } + + @After + public void afterEachTest() throws SQLException { + clearTables(); + } + + @Test + public void testGetDataBaseName() throws SQLException { + assertEquals(MdbcTestUtils.getMariaDBDBName(), mysqlMixin.getDatabaseName()); + } + + @Test + public void testGetTableSet() throws SQLException { + Set<Range> rangesAdded = new HashSet<>(); + rangesAdded.add(new Range("TEST.RANGER")); + rangesAdded.add(new Range("TEST.RANGES")); + Statement st = conn.createStatement(); + for (Range r: rangesAdded) { + st.execute("CREATE TABLE " + r + " (name VARCHAR(20));"); + } + st.close(); + Set<Range> ranges = mysqlMixin.getSQLRangeSet(); + + assertTrue(ranges.containsAll(rangesAdded)); + assertTrue(rangesAdded.containsAll(ranges)); + } + + @Test + public void testCkpt() throws SQLException { + createTables(); + + Range r1 = new Range(MdbcTestUtils.mariaDBDatabaseName + ".RANGER"); + MriReference mri1 = new MriReference(MDBCUtils.generateUniqueKey()); + MusicTxDigestId i1 = new MusicTxDigestId(mri1.index, MDBCUtils.generateUniqueKey(), 1); + Pair<MriReference, MusicTxDigestId> digestId = Pair.of(mri1, i1); + mysqlMixin.updateCheckpointLocations(r1, digestId); + + Range r2 = new Range(MdbcTestUtils.mariaDBDatabaseName + ".RANGES"); + MriReference mri2 = new MriReference(MDBCUtils.generateUniqueKey()); + MusicTxDigestId i2 = new MusicTxDigestId(mri2.index, MDBCUtils.generateUniqueKey(), 2); + Pair<MriReference, MusicTxDigestId> p2 = Pair.of(mri2, i2); + mysqlMixin.updateCheckpointLocations(r2, p2); + + Map<Range, Pair<MriReference, MusicTxDigestId>> ckptmap = mysqlMixin.getCheckpointLocations(); + assertTrue(ckptmap.containsKey(r1)); + assertEquals(mri1.getIndex(), ckptmap.get(r1).getLeft().getIndex()); + assertEquals(i1.transactionId, ckptmap.get(r1).getRight().transactionId); + + assertTrue(ckptmap.containsKey(r2)); + assertEquals(mri2.getIndex(), ckptmap.get(r2).getLeft().getIndex()); + assertEquals(i2.transactionId, ckptmap.get(r2).getRight().transactionId); + } + + private void createTables() throws SQLException { + Statement st = conn.createStatement(); + st.execute("CREATE TABLE RANGER (name VARCHAR(20));"); + st.execute("CREATE TABLE RANGES (name VARCHAR(20));"); + st.close(); + //need to re-initiate the tables + this.mysqlMixin.initTables(); + } + + + private void clearTables() throws SQLException { + Set<Range> ranges = mysqlMixin.getSQLRangeSet(); + Statement st = conn.createStatement(); + for (Range r: ranges) { + try { + st.execute("DROP TABLE " + r + ";"); + } catch (SQLException e) { + System.out.println("Trouble dropping: " + r); + e.printStackTrace(); + } + } + st.close(); + } } diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java index a1cf2b1..1ee8de7 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java @@ -30,6 +30,7 @@ import java.util.Properties; import java.util.Set; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.mdbc.MdbcTestUtils; @@ -38,6 +39,7 @@ import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; import org.onap.music.mdbc.tables.StagingTable; +@Ignore public class PostgresMixinTest { final private static String keyspace="metricmusictest"; final private static String mdbcServerName = "name"; @@ -65,7 +67,7 @@ public class PostgresMixinTest { public static void close(){ //TODO: shutdown cassandra mixin=null; - MdbcTestUtils.cleanDatabase(DBType.POSTGRES); + MdbcTestUtils.stopDatabase(DBType.POSTGRES); MdbcTestUtils.stopCassandra(); } @@ -211,7 +213,7 @@ public class PostgresMixinTest { Set<Range> ranges = new HashSet<>(); ranges.add(new Range("public.testtable")); try { - mixin.applyTxDigest(st,ranges); + mixin.applyTxDigest(st); } catch (SQLException|MDBCServiceException e) { e.printStackTrace(); fail(); diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java index ee50dca..afe378e 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java @@ -170,7 +170,7 @@ public class DagTest { HashSet<Range> rangesSet = new HashSet<>(ranges); while(!dag.applied()){ DagNode node = dag.nextToApply(ranges); - Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangesSet); + Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(rangesSet); int transactionCounter = 0; while(pair!=null) { assertNotEquals(1,transactionCounter); @@ -178,9 +178,10 @@ public class DagTest { MusicTxDigestId id = row.getRedoLog().get(transactionCounter); assertEquals(id,pair.getKey()); assertEquals(0,pair.getKey().index); - List<Range> value = pair.getValue(); + Set<Range> value = pair.getValue(); assertEquals(1,value.size()); - assertEquals(new Range("schema.range1"),value.get(0)); + assertTrue(value.contains(new Range("schema.range1"))); + //assertEquals(new Range("schema.range1"),value.get(0)); pair = node.nextNotAppliedTransaction(rangesSet); transactionCounter++; } @@ -192,7 +193,7 @@ public class DagTest { @Test public void nextToApply2() throws InterruptedException, MDBCServiceException { - Map<Range, Pair<MriReference, Integer>> alreadyApplied = new HashMap<>(); + Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = new HashMap<>(); List<MusicRangeInformationRow> rows = new ArrayList<>(); Set<Range> ranges = new HashSet<>( Arrays.asList( new Range("schema.range1") @@ -207,7 +208,7 @@ public class DagTest { new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),1) )); MusicRangeInformationRow newRow = createNewRow(new HashSet<>(ranges), "", false, redo2); - alreadyApplied.put(new Range("schema.range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0)); + alreadyApplied.put(new Range("schema.range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), redo1.get(0))); rows.add(newRow); MILLISECONDS.sleep(10); List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList( @@ -220,7 +221,7 @@ public class DagTest { int nodeCounter = 1; while(!dag.applied()){ DagNode node = dag.nextToApply(ranges); - Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangesSet); + Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(rangesSet); int transactionCounter = 0; while(pair!=null) { assertNotEquals(1,transactionCounter); @@ -228,9 +229,10 @@ public class DagTest { MusicTxDigestId id = row.getRedoLog().get(2-nodeCounter); assertEquals(id,pair.getKey()); assertEquals(2-nodeCounter,pair.getKey().index); - List<Range> value = pair.getValue(); + Set<Range> value = pair.getValue(); assertEquals(1,value.size()); - assertEquals(new Range("schema.range1"),value.get(0)); + assertTrue(value.contains(new Range("schema.range1"))); + //assertEquals(new Range("schema.range1"),value.get(0)); pair = node.nextNotAppliedTransaction(rangesSet); transactionCounter++; } diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java index 2443d1e..c0e7c50 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java @@ -34,6 +34,7 @@ import org.cassandraunit.utils.EmbeddedCassandraServerHelper; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.mockito.Mock; import org.mockito.Mockito; @@ -60,6 +61,7 @@ import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.TxCommitProgress; +@Ignore public class OwnershipAndCheckpointTest { public static final String DATABASE = MdbcTestUtils.mariaDBDatabaseName; public static final String TABLE= MdbcTestUtils.mariaDBDatabaseName+".PERSONS"; @@ -83,7 +85,7 @@ public class OwnershipAndCheckpointTest { @BeforeClass - public static void init() throws MusicServiceException, ClassNotFoundException, ManagedProcessException { + public static void init() throws MusicServiceException, ClassNotFoundException, ManagedProcessException, MDBCServiceException { MdbcTestUtils.initCassandra(); Class.forName("org.mariadb.jdbc.Driver"); //start embedded mariadb @@ -94,7 +96,7 @@ public class OwnershipAndCheckpointTest { public static void close() throws MusicServiceException, MusicQueryException, ManagedProcessException { //TODO: shutdown cassandra musicMixin=null; - MdbcTestUtils.cleanDatabase(DBType.MySQL); + MdbcTestUtils.stopDatabase(DBType.MySQL); MdbcTestUtils.stopCassandra(); } @@ -151,7 +153,6 @@ public class OwnershipAndCheckpointTest { String sqlOperation = "INSERT INTO "+TABLE+" (PersonID,LastName,FirstName,Address,City) VALUES "+ "(1,'SAUREZ','ENRIQUE','GATECH','ATLANTA');"; StagingTable stagingTable = new StagingTable(); - ownAndCheck.reloadAlreadyApplied(partition); final Statement executeStatement = this.conn.createStatement(); executeStatement.execute(sqlOperation); this.conn.commit(); @@ -222,9 +223,9 @@ public class OwnershipAndCheckpointTest { Map<MusicRangeInformationRow, LockResult> locks = new HashMap<>(); if(own.getDag()!=null) { locks.put(own.getDag().getNode(own.getRangeId()).getRow(), - new LockResult(own.getRangeId(), own.getOwnerId(), true, + new LockResult(own.getRangeId(), own.getLockId(), true, ranges)); - ownAndCheck.checkpoint(musicMixin, mysqlMixin, own.getDag(), ranges, locks, ownOpId); + ownAndCheck.checkpoint(musicMixin, mysqlMixin, own.getDag(), ranges, ownOpId); } checkData(); @@ -247,7 +248,7 @@ public class OwnershipAndCheckpointTest { Map<MusicRangeInformationRow, LockResult> locks = new HashMap<>(); if(own.getDag()!=null) { locks.put(own.getDag().getNode(own.getRangeId()).getRow(), - new LockResult(own.getRangeId(), own.getOwnerId(), true, + new LockResult(own.getRangeId(), own.getLockId(), true, ranges)); } ownAndCheck.warmup(musicMixin,mysqlMixin,ranges); diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/query/QueryProcessorTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/query/QueryProcessorTest.java index 8d851c7..99e8244 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/query/QueryProcessorTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/query/QueryProcessorTest.java @@ -98,6 +98,16 @@ public class QueryProcessorTest { } @Test + public void deleteQuery() throws SQLException { + String sqlQuery = "delete from db.employees where personid = 721 and lastname = 'Lastname'"; + HashMap<String, List<SQLOperation>> expectedOut = new HashMap<>(); + List<SQLOperation> t1op = new ArrayList<>(); + t1op.add(SQLOperation.DELETE); + expectedOut.put("DB.EMPLOYEES", t1op); + assertEquals(expectedOut, QueryProcessor.parseSqlQuery(sqlQuery, null)); + } + + @Test public void insertSelect() throws SQLException { String sqlQuery = "INSERT INTO table1 (CustomerName, City, Country) SELECT SupplierName, City, Country FROM table2"; diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/test/CrossSiteTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/test/CrossSiteTest.java index d4a7a27..57cbcd6 100755 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/test/CrossSiteTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/test/CrossSiteTest.java @@ -31,12 +31,11 @@ import java.sql.Statement; import java.sql.Timestamp; import java.util.Random; -import org.apache.log4j.Logger; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; - +import org.onap.music.logging.EELFLoggerDelegate; /** * This test tests a copy of data from DB1 to DB2. It tests the following H2 data types: @@ -46,7 +45,7 @@ public class CrossSiteTest extends TestCommon { private static final String DB_CONNECTION1 = "avatica://" + "mem:db1"; private static final String DB_CONNECTION2 = "avatica://" + "mem:db2"; private static final String KEYSPACE = "CrossSite_Test"; - private final static Logger logger = Logger.getLogger(CrossSiteTest.class); + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CrossSiteTest.class); private Connection db1, db2; @@ -96,7 +95,7 @@ public class CrossSiteTest extends TestCommon { fail("SELECT COUNT(*) produced no result"); } } catch (Exception e) { - logger.error(e); + logger.error(EELFLoggerDelegate.errorLogger, "Error", e); e.printStackTrace(); fail("2: " + e.toString()); } @@ -121,7 +120,7 @@ public class CrossSiteTest extends TestCommon { fail("SELECT COUNT(*) produced no result"); } } catch (Exception e) { - logger.error(e); + logger.error(EELFLoggerDelegate.errorLogger, "Error", e); e.printStackTrace(); fail("2: " + e.toString()); } @@ -158,7 +157,7 @@ public class CrossSiteTest extends TestCommon { fail("SELECT COUNT(*) produced no result"); } } catch (Exception e) { - logger.error(e); + logger.error(EELFLoggerDelegate.errorLogger, "Error", e); e.printStackTrace(); fail("2: " + e.toString()); } @@ -214,7 +213,7 @@ public class CrossSiteTest extends TestCommon { fail("SELECT * FROM DATATYPES"); } } catch (Exception ex) { - logger.error(ex); + logger.error(EELFLoggerDelegate.errorLogger, "Error", ex); ex.printStackTrace(); fail("2: " + ex.toString()); } @@ -252,7 +251,7 @@ public class CrossSiteTest extends TestCommon { fail("SELECT * FROM DATATYPES"); } } catch (Exception ex) { - logger.error(ex); + logger.error(EELFLoggerDelegate.errorLogger, "Error", ex); ex.printStackTrace(); fail("testIdentity 2: " + ex.toString()); } @@ -320,7 +319,7 @@ public class CrossSiteTest extends TestCommon { fail("SELECT * FROM BLOBTEST"); } } catch (Exception ex) { - logger.error(ex); + logger.error(EELFLoggerDelegate.errorLogger, "Error", ex); ex.printStackTrace(); fail("testBLOBColumn 2: " + ex.toString()); } @@ -358,7 +357,7 @@ public class CrossSiteTest extends TestCommon { fail("SELECT COUNT(*) produced no result"); } } catch (Exception e) { - logger.error(e); + logger.error(EELFLoggerDelegate.errorLogger, "Error", e); e.printStackTrace(); fail("2: " + e.toString()); } @@ -383,7 +382,7 @@ public class CrossSiteTest extends TestCommon { fail("SELECT COUNT(*) produced no result"); } } catch (Exception e) { - logger.error(e); + logger.error(EELFLoggerDelegate.errorLogger, "Error", e); e.printStackTrace(); fail("2: " + e.toString()); } @@ -417,7 +416,7 @@ public class CrossSiteTest extends TestCommon { fail("SELECT COUNT(*) produced no result"); } } catch (Exception e) { - logger.error(e); + logger.error(EELFLoggerDelegate.errorLogger, "Error", e); e.printStackTrace(); fail("2: " + e.toString()); } @@ -442,7 +441,7 @@ public class CrossSiteTest extends TestCommon { fail("SELECT OTHER produced no result"); } } catch (Exception e) { - logger.error(e); + logger.error(EELFLoggerDelegate.errorLogger, "Error", e); e.printStackTrace(); fail("2: " + e.toString()); } diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/test/TransactionTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/test/TransactionTest.java index 55b0f09..6a2f954 100755 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/test/TransactionTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/test/TransactionTest.java @@ -29,15 +29,15 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Set; -import org.apache.log4j.Logger; import org.junit.Test; +import org.onap.music.logging.EELFLoggerDelegate; public class TransactionTest extends TestCommon { private static final String DB_CONNECTION1 = "avatica://" + "mem:db1"; private static final String DB_CONNECTION2 = "avatica://" + "mem:db2"; private static final String KEYSPACE = "CrossSite_Test"; - private final static Logger logger = Logger.getLogger(CrossSiteTest.class); + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TransactionTest.class); //@Test public void testWithAutocommitTrue() { @@ -175,7 +175,7 @@ public class TransactionTest extends TestCommon { fail("missing element: "+t); } } catch (Exception e) { - logger.error(e); + logger.error(EELFLoggerDelegate.errorLogger, "Error", e); e.printStackTrace(); fail("2: " + e.toString()); } @@ -228,17 +228,6 @@ <version>1.1.7</version> </dependency> <dependency> - <groupId>org.onap.music</groupId> - <artifactId>dev-MUSIC-cassandra</artifactId> - <version>3.2.1</version> - <exclusions> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>io.netty</groupId> <artifactId>netty-handler</artifactId> <version>4.1.30.Final</version> |