diff options
6 files changed, 597 insertions, 143 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClientNew.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClientNew.java new file mode 100644 index 0000000..16619be --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClientNew.java @@ -0,0 +1,379 @@ +/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+package org.onap.music.mdbc.examples;
+
+import java.sql.*;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.calcite.avatica.remote.Driver;
+
+public class MdbcTestClientNew {
+ private String lastName = "Lastname";
+ private int baseId = 700;
+ private int baseIdRange = 50;
+ private int maxCalls = 50;
+ private int maxTimeMs = 60000;
+ private int minDelayBetweenTestsMs = 1000;
+ private int additionalDelayBetweenTestsMs = 1000;
+ private boolean doDelete = true;
+ private boolean doUpdate = true;
+ private int connectionCloseChancePct = 50;
+
+ private boolean explainConnection = true;
+
+ public static class Employee {
+ public final int empid;
+ public String lastname;
+ public String firstname;
+ public String address;
+ public String city;
+
+ public Employee(int empid, String lastname, String firstname, String address, String city) {
+ super();
+ this.empid = empid;
+ this.lastname = lastname;
+ this.firstname = firstname;
+ this.address = address;
+ this.city = city;
+ }
+
+ public String getLastname() {
+ return lastname;
+ }
+
+ public void setLastname(String lastname) {
+ this.lastname = lastname;
+ }
+
+ public String getFirstname() {
+ return firstname;
+ }
+
+ public void setFirstname(String firstname) {
+ this.firstname = firstname;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public void setAddress(String address) {
+ this.address = address;
+ }
+
+ public String getCity() {
+ return city;
+ }
+
+ public void setCity(String city) {
+ this.city = city;
+ }
+
+ public int getEmpid() {
+ return empid;
+ }
+
+ @Override
+ public String toString() {
+ return "Employee: " + empid + ", " + lastname + ", " + firstname + ", " + address + ", " + city;
+ }
+
+
+ }
+
+ public MdbcTestClientNew(String[] args) {
+ lastName = args[0];
+ baseId = Integer.parseInt(args[1]);
+ baseIdRange = Integer.parseInt(args[2]);
+ maxCalls = Integer.parseInt(args[3]);
+ maxTimeMs = Integer.parseInt(args[4]);
+ minDelayBetweenTestsMs = Integer.parseInt(args[5]);
+ additionalDelayBetweenTestsMs = Integer.parseInt(args[6]);
+ doDelete = (args[7].toUpperCase().startsWith("Y"));
+ doUpdate = (args[8].toUpperCase().startsWith("Y"));
+ connectionCloseChancePct = Integer.parseInt(args[9]);
+ }
+
+ public MdbcTestClientNew() {
+ // Use default values
+ }
+
+ private void doTest(Connection connection, Random r) throws SQLException {
+ HashMap<Integer, Employee> employeeMap = new HashMap<Integer, Employee>();
+
+ Statement querySt = connection.createStatement();
+ doLog("Before select");
+
+ if (explainConnection) {
+ doLog("querySt is a: ");
+ Class<?> qsClass = querySt.getClass();
+ while (qsClass!=null) {
+ doLog(">>> " + qsClass.getName());
+ qsClass = qsClass.getSuperclass();
+ }
+ doLog("connection is a ");
+ qsClass = connection.getClass();
+ while (qsClass!=null) {
+ doLog(">>> " + qsClass.getName());
+ qsClass = qsClass.getSuperclass();
+ }
+ explainConnection = false;
+ }
+
+ ResultSet rs = querySt.executeQuery("select * from persons");
+ doLog("After select");
+ while (rs.next()) {
+// doLog("PersonId = " + rs.getInt("personId") + ", lastname = " + rs.getString("lastname") + ", firstname = " + rs.getString("firstname"));
+ Employee emp = new Employee(rs.getInt("personId"), rs.getString("lastname"), rs.getString("firstname"), rs.getString("address"), rs.getString("city"));
+ employeeMap.put(rs.getInt("personId"), emp);
+ doLog("Found: " + emp);
+ }
+ querySt.close();
+
+ Statement insertStmt = connection.createStatement();
+
+ insertStmt.execute(generateStatement(employeeMap, r));
+ while (r.nextBoolean()) {
+ insertStmt.execute(generateStatement(employeeMap, r));
+ }
+
+ connection.commit();
+
+ insertStmt.close();
+ }
+
+ private String generateStatement(HashMap<Integer, Employee> employeeMap, Random r) {
+ String toRet = null;
+
+ int which = r.nextInt(3);
+ if (which==0 && doDelete) {
+ toRet = generateDelete(employeeMap, r);
+ } else if (which==1 && doUpdate) {
+ toRet = generateUpdate(employeeMap, r);
+ }
+ if (toRet==null) {
+ toRet = generateInsert(employeeMap, r);
+ }
+
+ doLog("Generated statement: " + toRet);
+
+ return toRet;
+ }
+
+ private String generateInsert(HashMap<Integer, Employee> employeeMap, Random r) {
+ String toRet = null;
+
+ Integer id = null;
+ int range = baseIdRange;
+ while (id==null) {
+ id = baseId + r.nextInt(range);
+ if (employeeMap.containsKey(id)) id = null;
+ range+=(baseIdRange/5);
+ }
+ Employee newEmp = new Employee(id, lastName, Character.toUpperCase(randomLetter(r)) + generateLetters(r, 4+r.nextInt(4)), generateLetters(r, 4).toUpperCase(), generateLetters(r, 4).toUpperCase());
+ toRet = "insert into persons values (" + id + ", '" + newEmp.getLastname() + "', '" + newEmp.getFirstname() + "', '" + newEmp.getAddress() + "', '" + newEmp.getCity() + "')";
+ employeeMap.put(id, newEmp);
+
+ return toRet;
+ }
+
+ private String generateUpdate(HashMap<Integer, Employee> employeeMap, Random r) {
+ String toRet = null;
+
+ Employee toUpd = chooseTarget(employeeMap, r);
+ if (toUpd!=null) {
+ String newFirst = null;
+ if (toUpd.getFirstname().length()<=3 || r.nextBoolean()) {
+ newFirst = toUpd.getFirstname() + randomLetter(r);
+ } else {
+ newFirst = toUpd.getFirstname().substring(0, toUpd.getFirstname().length()-1);
+ }
+// toRet = "update persons set firstname = '" + newFirst + "' where personid = " + toUpd.getEmpid();
+ toRet = "update persons set firstname = '" + newFirst + "' where personid = " + toUpd.getEmpid() + " and lastname = '" + toUpd.getLastname() + "'";
+ toUpd.setFirstname(newFirst);
+ }
+
+ return toRet;
+ }
+
+ private String generateLetters(Random r, int count) {
+ StringBuffer toRet = new StringBuffer();
+ for (int i=0; i<count; i++) {
+ Character c = null;
+ while (c==null) {
+ c = randomLetter(r);
+ char cc = c.charValue();
+ if ( (cc=='a' || cc=='e' || cc=='i' || cc=='o' || cc=='u') ^ (i%2==0) ) c = null;
+ }
+ toRet.append(c);
+ }
+ return toRet.toString();
+ }
+
+ private char randomLetter(Random r) {
+ int a = (int)'a';
+ return (char)(a+r.nextInt(26));
+ }
+
+ private String generateDelete(HashMap<Integer, Employee> employeeMap, Random r) {
+ String toRet = null;
+
+ Employee toDel = chooseTarget(employeeMap, r);
+ if (toDel!=null) {
+ toRet = "delete from persons where personid = " + toDel.getEmpid() + " and lastname = '" + toDel.getLastname() + "'";
+ employeeMap.remove(toDel.getEmpid());
+ }
+
+ return toRet;
+ }
+
+
+
+ private Employee chooseTarget(HashMap<Integer, Employee> employeeMap, Random r) {
+ Employee toPick = null;
+ int count = 0;
+ for (int id : employeeMap.keySet()) {
+ Employee emp = employeeMap.get(id);
+ if (!emp.getLastname().equals(lastName)) continue;
+ count++;
+ if (r.nextInt(count)==0) toPick = emp;
+ }
+ return toPick;
+ }
+
+ public void runTests() {
+ try {
+ Class.forName("org.apache.calcite.avatica.remote.Driver");
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+
+ Connection connection = null;
+
+ Random r = new Random();
+ boolean done = false;
+ int calls = 0;
+ long startTime = new java.util.Date().getTime();
+ while (!done) {
+ if (connection==null) {
+ try {
+ doLog("Opening new connection");
+ connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf");
+ connection.setAutoCommit(false);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ return;
+ }
+ } else {
+ doLog("Keeping open connection");
+ }
+
+ try {
+ doLog("Running test");
+ doTest(connection, r);
+ doLog("Test complete");
+ } catch (SQLException e1) {
+ e1.printStackTrace();
+ done = true;
+ if (connection!=null) {
+ try {
+ doLog("Closing connection in catch block");
+ connection.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ done = true;
+ } finally {
+ connection = null;
+ }
+ }
+ }
+
+ if (!done && connection!=null && r.nextInt(100)<connectionCloseChancePct) {
+ try {
+ doLog("Closing connection");
+ connection.close();
+ connection = null;
+ } catch (SQLException e) {
+ e.printStackTrace();
+ done = true;
+ }
+ } else {
+ doLog("Not closing connection");
+ }
+
+ calls++;
+ long msElapsed = (new java.util.Date().getTime()) - startTime;
+ if (calls>maxCalls || msElapsed > maxTimeMs) done = true;
+
+ if (!done) {
+ long delay = r.nextInt(minDelayBetweenTestsMs);
+ while (r.nextBoolean()) delay += r.nextInt(additionalDelayBetweenTestsMs);
+ synchronized(r) {
+ try {
+ r.wait(delay);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ done = true;
+ }
+ }
+ }
+
+ doLog("");
+ }
+
+ if (connection!=null) {
+ try {
+ doLog("Closing connection at end");
+ connection.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ doLog("All done.");
+ }
+
+ private void doLog(String string) {
+ System.out.println(">>> " + string);
+ }
+
+ public static void main(String[] args) {
+ MdbcTestClientNew mtc = null;
+ if (args.length==0) {
+ mtc = new MdbcTestClientNew();
+ } else if (args.length==10) {
+ mtc = new MdbcTestClientNew(args);
+ } else {
+ System.out.println("Usage: [this] lastname baseId baseIdRange maxCalls maxTimeMs minDelayBetweenTestsMs additionalDelayBetweenTestsMs doDelete doUpdate connectionCloseChancePct");
+ System.out.println(" lastname: Lastname for all inserts/updates/deletes");
+ System.out.println(" baseId/baseRange: Id for all inserts will be between baseId and (baseId+baseRange). In case of collision baseRange will be increased until available Id is found.");
+ System.out.println(" maxCalls/maxTimeMs: Maximum number of commits (each of which may be 1+ updates) or time (in ms) that the test will run, whichever comes first");
+ System.out.println(" minDelayBetweenTestsMs/additionalDelayBetweenTestsMs: After each test, delay for minDelayBetweenTestsMs ms plus (0 or more) times additionalDelayBetweenTestsMs ms");
+ System.out.println(" doUpdate/doDelete: If \"Y\", will try to generate updates and deletes in addition to inserts. Any failures to generate an update/delete will be replaced with an insert.");
+ System.out.println(" connectionCloseChancePct: after each commit, percent chance of closing connection and opening a new one.");
+ System.out.println("Default settings: Lastname 700 50 50 60000 1000 1000 Y Y 50");
+ }
+
+ mtc.runTests();
+ }
+
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java index 8b91b28..71f1b8b 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java @@ -313,7 +313,20 @@ public interface MusicInterface { void releaseLocks(Map<UUID, LockResult> newLocks) throws MDBCServiceException; - OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges, + /** + * Combine previous musicrangeinformation rows for new partition, if necessary + * + * Does not merge rows if a single previous row is sufficient to match new partition needed + * + * @param extendedDag + * @param latestRows + * @param ranges + * @param locks + * @param ownershipId + * @return + * @throws MDBCServiceException + */ + OwnershipReturn mergeLatestRowsIfNecessary(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges, Map<UUID, LockResult> locks, UUID ownershipId) throws MDBCServiceException; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index c176ad9..e41b7c0 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -156,7 +156,7 @@ public class MusicMixin implements MusicInterface { static { // We only support the following type mappings currently (from DB -> Cassandra). // Anything else will likely cause a NullPointerException - typemap.put(Types.BIGINT, "BIGINT"); // aka. IDENTITY + typemap.put(Types.BIGINT, "BIGINT"); // aka. IDENTITY typemap.put(Types.BLOB, "VARCHAR"); typemap.put(Types.BOOLEAN, "BOOLEAN"); typemap.put(Types.CLOB, "BLOB"); @@ -169,10 +169,10 @@ public class MusicMixin implements MusicInterface { typemap.put(Types.TIMESTAMP, "VARCHAR"); typemap.put(Types.VARBINARY, "BLOB"); typemap.put(Types.VARCHAR, "VARCHAR"); - typemap.put(Types.CHAR, "VARCHAR"); + typemap.put(Types.CHAR, "VARCHAR"); //The "Hacks", these don't have a direct mapping - //typemap.put(Types.DATE, "VARCHAR"); - //typemap.put(Types.DATE, "TIMESTAMP"); + //typemap.put(Types.DATE, "VARCHAR"); + //typemap.put(Types.DATE, "TIMESTAMP"); } @@ -389,12 +389,12 @@ public class MusicMixin implements MusicInterface { @Override public void createDirtyRowTable(TableInfo ti, String tableName) { // create dirtybitsTable at all replicas -// for (String repl : allReplicaIds) { -//// String dirtyRowsTableName = "dirty_"+tableName+"_"+allReplicaIds[i]; -//// String dirtyTableQuery = "CREATE TABLE IF NOT EXISTS "+music_ns+"."+ dirtyRowsTableName+" (dirtyRowKeys text PRIMARY KEY);"; -// cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s_%s (dirtyRowKeys TEXT PRIMARY KEY);", music_ns, tableName, repl); -// executeMusicWriteQuery(cql); -// } +// for (String repl : allReplicaIds) { +//// String dirtyRowsTableName = "dirty_"+tableName+"_"+allReplicaIds[i]; +//// String dirtyTableQuery = "CREATE TABLE IF NOT EXISTS "+music_ns+"."+ dirtyRowsTableName+" (dirtyRowKeys text PRIMARY KEY);"; +// cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s_%s (dirtyRowKeys TEXT PRIMARY KEY);", music_ns, tableName, repl); +// executeMusicWriteQuery(cql); +// } StringBuilder ddl = new StringBuilder("REPLICA__ TEXT"); StringBuilder cols = new StringBuilder("REPLICA__"); for (int i = 0; i < ti.columns.size(); i++) { @@ -451,8 +451,8 @@ public class MusicMixin implements MusicInterface { System.err.println("markDIrtyRow need to fix primary key"); } String cql = String.format("INSERT INTO %s.DIRTY_%s (%s) VALUES (%s);", music_ns, tableName, cols.toString(), vals.toString()); - /*Session sess = getMusicSession(); - PreparedStatement ps = getPreparedStatementFromCache(cql);*/ + /*Session sess = getMusicSession(); + PreparedStatement ps = getPreparedStatementFromCache(cql);*/ String primaryKey; if(ti.hasKey()) { primaryKey = getMusicKeyFromRow(ti,tableName, keys); @@ -476,13 +476,13 @@ public class MusicMixin implements MusicInterface { pQueryObject.addValue(pkObj); updateMusicDB(tableName, primaryKey, pQueryObject); //if (!repl.equals(myId)) { - /*logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql); - vallist.set(0, repl); - BoundStatement bound = ps.bind(vallist.toArray()); - bound.setReadTimeoutMillis(60000); - synchronized (sess) { - sess.execute(bound); - }*/ + /*logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql); + vallist.set(0, repl); + BoundStatement bound = ps.bind(vallist.toArray()); + bound.setReadTimeoutMillis(60000); + synchronized (sess) { + sess.execute(bound); + }*/ //} } @@ -514,13 +514,13 @@ public class MusicMixin implements MusicInterface { if(rt.getResult().getResult().toLowerCase().equals("failure")) { System.out.println("Failure while cleanDirtyRow..."+rt.getMessage()); } - /*Session sess = getMusicSession(); - PreparedStatement ps = getPreparedStatementFromCache(cql); - BoundStatement bound = ps.bind(vallist.toArray()); - bound.setReadTimeoutMillis(60000); - synchronized (sess) { - sess.execute(bound); - }*/ + /*Session sess = getMusicSession(); + PreparedStatement ps = getPreparedStatementFromCache(cql); + BoundStatement bound = ps.bind(vallist.toArray()); + bound.setReadTimeoutMillis(60000); + synchronized (sess) { + sess.execute(bound); + }*/ } /** * Get a list of "dirty rows" for a table. The dirty rows returned apply only to this replica, @@ -533,14 +533,14 @@ public class MusicMixin implements MusicInterface { String cql = String.format("SELECT * FROM %s.DIRTY_%s WHERE REPLICA__=?;", music_ns, tableName); ResultSet results = null; logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql); - - /*Session sess = getMusicSession(); - PreparedStatement ps = getPreparedStatementFromCache(cql); - BoundStatement bound = ps.bind(new Object[] { myId }); - bound.setReadTimeoutMillis(60000); - synchronized (sess) { - results = sess.execute(bound); - }*/ + + /*Session sess = getMusicSession(); + PreparedStatement ps = getPreparedStatementFromCache(cql); + BoundStatement bound = ps.bind(new Object[] { myId }); + bound.setReadTimeoutMillis(60000); + synchronized (sess) { + results = sess.execute(bound); + }*/ PreparedQueryObject pQueryObject = new PreparedQueryObject(); pQueryObject.appendQueryString(cql); try { @@ -646,14 +646,14 @@ public class MusicMixin implements MusicInterface { String cql = String.format("DELETE FROM %s.%s WHERE %s;", music_ns, tableName, where.toString()); logger.error(EELFLoggerDelegate.errorLogger,"Executing MUSIC write:"+ cql); pQueryObject.appendQueryString(cql); - - /*PreparedStatement ps = getPreparedStatementFromCache(cql); - BoundStatement bound = ps.bind(vallist.toArray()); - bound.setReadTimeoutMillis(60000); - Session sess = getMusicSession(); - synchronized (sess) { - sess.execute(bound); - }*/ + + /*PreparedStatement ps = getPreparedStatementFromCache(cql); + BoundStatement bound = ps.bind(vallist.toArray()); + bound.setReadTimeoutMillis(60000); + Session sess = getMusicSession(); + synchronized (sess) { + sess.execute(bound); + }*/ String primaryKey = getMusicKeyFromRow(ti,tableName, oldRow); updateMusicDB(tableName, primaryKey, pQueryObject); @@ -716,15 +716,15 @@ public class MusicMixin implements MusicInterface { e.printStackTrace(); } - /* - Session sess = getMusicSession(); - PreparedStatement ps = getPreparedStatementFromCache(cql); - BoundStatement bound = ps.bind(vallist.toArray()); - bound.setReadTimeoutMillis(60000); - ResultSet dirtyRows = null; - synchronized (sess) { - dirtyRows = sess.execute(bound); - }*/ + /* + Session sess = getMusicSession(); + PreparedStatement ps = getPreparedStatementFromCache(cql); + BoundStatement bound = ps.bind(vallist.toArray()); + bound.setReadTimeoutMillis(60000); + ResultSet dirtyRows = null; + synchronized (sess) { + dirtyRows = sess.execute(bound); + }*/ List<Row> rows = dirtyRows.all(); if (rows.isEmpty()) { // No rows, the row must have been deleted @@ -771,48 +771,48 @@ public class MusicMixin implements MusicInterface { } logger.debug("Blocking rowid: "+rowid); - in_progress.add(rowid); // Block propagation of the following INSERT/UPDATE + in_progress.add(rowid); // Block propagation of the following INSERT/UPDATE dbi.insertRowIntoSqlDb(tableName, map); logger.debug("Unblocking rowid: "+rowid); - in_progress.remove(rowid); // Unblock propagation - -// try { -// String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString()); -// executeSQLWrite(sql); -// } catch (SQLException e) { -// logger.debug("Insert failed because row exists, do an update"); -// // TODO - rewrite this UPDATE command should not update key fields -// String sql = String.format("UPDATE %s SET (%s) = (%s) WHERE %s", tableName, fields.toString(), values.toString(), where.toString()); -// try { -// executeSQLWrite(sql); -// } catch (SQLException e1) { -// e1.printStackTrace(); -// } -// } + in_progress.remove(rowid); // Unblock propagation + +// try { +// String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString()); +// executeSQLWrite(sql); +// } catch (SQLException e) { +// logger.debug("Insert failed because row exists, do an update"); +// // TODO - rewrite this UPDATE command should not update key fields +// String sql = String.format("UPDATE %s SET (%s) = (%s) WHERE %s", tableName, fields.toString(), values.toString(), where.toString()); +// try { +// executeSQLWrite(sql); +// } catch (SQLException e1) { +// e1.printStackTrace(); +// } +// } ti = dbi.getTableInfo(tableName); cleanDirtyRow(ti, tableName, new JSONObject(vallist)); -// String selectQuery = "select "+ primaryKeyName+" FROM "+tableName+" WHERE "+primaryKeyName+"="+primaryKeyValue+";"; -// java.sql.ResultSet rs = executeSQLRead(selectQuery); -// String dbWriteQuery=null; -// try { -// if(rs.next()){//this entry is there, do an update -// dbWriteQuery = "UPDATE "+tableName+" SET "+columnNameString+" = "+ valueString +"WHERE "+primaryKeyName+"="+primaryKeyValue+";"; -// }else -// dbWriteQuery = "INSERT INTO "+tableName+" VALUES"+valueString+";"; -// executeSQLWrite(dbWriteQuery); -// } catch (SQLException e) { -// // ZZTODO Auto-generated catch block -// e.printStackTrace(); -// } +// String selectQuery = "select "+ primaryKeyName+" FROM "+tableName+" WHERE "+primaryKeyName+"="+primaryKeyValue+";"; +// java.sql.ResultSet rs = executeSQLRead(selectQuery); +// String dbWriteQuery=null; +// try { +// if(rs.next()){//this entry is there, do an update +// dbWriteQuery = "UPDATE "+tableName+" SET "+columnNameString+" = "+ valueString +"WHERE "+primaryKeyName+"="+primaryKeyValue+";"; +// }else +// dbWriteQuery = "INSERT INTO "+tableName+" VALUES"+valueString+";"; +// executeSQLWrite(dbWriteQuery); +// } catch (SQLException e) { +// // ZZTODO Auto-generated catch block +// e.printStackTrace(); +// } //clean the music dirty bits table -// String dirtyRowIdsTableName = music_ns+".DIRTY_"+tableName+"_"+myId; -// String deleteQuery = "DELETE FROM "+dirtyRowIdsTableName+" WHERE dirtyRowKeys=$$"+primaryKeyValue+"$$;"; -// executeMusicWriteQuery(deleteQuery); +// String dirtyRowIdsTableName = music_ns+".DIRTY_"+tableName+"_"+myId; +// String deleteQuery = "DELETE FROM "+dirtyRowIdsTableName+" WHERE dirtyRowKeys=$$"+primaryKeyValue+"$$;"; +// executeMusicWriteQuery(deleteQuery); } private Object getValue(Row musicRow, String colname) { ColumnDefinitions cdef = musicRow.getColumnDefinitions(); @@ -914,14 +914,14 @@ public class MusicMixin implements MusicInterface { pQueryObject.appendQueryString(cql); String primaryKey = getMusicKeyFromRow(ti,tableName, changedRow); updateMusicDB(tableName, primaryKey, pQueryObject); - - /*PreparedStatement ps = getPreparedStatementFromCache(cql); - BoundStatement bound = ps.bind(newrow); - bound.setReadTimeoutMillis(60000); - Session sess = getMusicSession(); - synchronized (sess) { - sess.execute(bound); - }*/ + + /*PreparedStatement ps = getPreparedStatementFromCache(cql); + BoundStatement bound = ps.bind(newrow); + bound.setReadTimeoutMillis(60000); + Session sess = getMusicSession(); + synchronized (sess) { + sess.execute(bound); + }*/ // Mark the dirty rows in music for all the replicas but us markDirtyRow(ti,tableName, changedRow); } @@ -978,7 +978,7 @@ public class MusicMixin implements MusicInterface { if(rt.getResult().getResult().toLowerCase().equals("failure")) { logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage()); } - + } /** @@ -1287,7 +1287,10 @@ public class MusicMixin implements MusicInterface { @Override public void commitLog(DatabasePartition partition,List<Range> eventualRanges, StagingTable transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException { - + + // first deal with commit for eventually consistent tables + filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper); + if(partition==null){ logger.warn("Trying tcommit log with null partition"); return; @@ -1382,7 +1385,7 @@ public class MusicMixin implements MusicInterface { throw new MDBCServiceException(); } - if(!transactionDigest.isEmpty()) { + if(!transactionDigest.isEventualEmpty()) { ByteBuffer serialized = transactionDigest.getSerializedEventuallyStagingAndClean(); if (serialized!=null && useCompression) { @@ -1513,7 +1516,7 @@ public class MusicMixin implements MusicInterface { throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e); } - return getMRIRowFromCassandraRow(newRow); + return getMRIRowFromCassandraRow(newRow); } @Override @@ -1529,18 +1532,18 @@ public class MusicMixin implements MusicInterface { logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName); throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e); } - return getRangeDependenciesFromCassandraRow(newRow); + return getRangeDependenciesFromCassandraRow(newRow); } /** * This function creates the TransactionInformation table. It contain information related * to the transactions happening in a given partition. - * * The schema of the table is - * * Id, uiid. - * * Partition, uuid id of the partition - * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables - * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables - * * Redo: list of uiids associated to the Redo Records Table + * * The schema of the table is + * * Id, uiid. + * * Partition, uuid id of the partition + * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables + * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables + * * Redo: list of uiids associated to the Redo Records Table * */ public static void createMusicRangeInformationTable(String namespace, String tableName) throws MDBCServiceException { @@ -1727,8 +1730,8 @@ public class MusicMixin implements MusicInterface { /** * This function creates the MusicEveTxDigest table. It contain information related to each eventual transaction committed - * * LeaseId: id associated with the lease, text - * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later + * * LeaseId: id associated with the lease, text + * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later * * TransactionDigest: text that contains all the changes in the transaction */ public static void createMusicEventualTxDigest(String musicEventualTxDigestTableName, String musicNamespace, int musicTxDigestTableNumber) throws MDBCServiceException { @@ -1738,11 +1741,12 @@ public class MusicMixin implements MusicInterface { "-" + Integer.toString(musicTxDigestTableNumber); } - String priKey = "txTimeId"; + String priKey = "txTimeId, year"; StringBuilder fields = new StringBuilder(); fields.append("txid uuid, "); fields.append("transactiondigest blob, "); fields.append("compressed boolean, "); + fields.append("year int, "); fields.append("txTimeId TIMEUUID ");//notice lack of ',' String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey); try { @@ -1832,19 +1836,17 @@ public class MusicMixin implements MusicInterface { public void addEventualTxDigest(MusicTxDigestId newId, ByteBuffer transactionDigest) throws MDBCServiceException { //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest); PreparedQueryObject query = new PreparedQueryObject(); - String cqlQuery = "INSERT INTO " + - this.music_ns + - '.' + - this.musicEventualTxDigestTableName + - " (txid,transactiondigest,compressed,txTimeId) " + - "VALUES (" + - newId.transactionId+ ",'" + - transactionDigest + "'," + - useCompression + ","+ - // "toTimestamp(now())" + - "now()" + - ");"; - query.appendQueryString(cqlQuery); + int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR); + + + String cql = String.format("INSERT INTO %s.%s (txid,transactiondigest,compressed,year,txTimeId ) VALUES (?,?,?,?,now());",this.music_ns, + this.musicEventualTxDigestTableName); + query.appendQueryString(cql); + query.addValue( newId.transactionId); + query.addValue(transactionDigest); + query.addValue(useCompression); + query.addValue(year); + // query.appendQueryString(cqlQuery); //\TODO check if I am not shooting on my own foot try { MusicCore.nonKeyRelatedPut(query,"critical"); @@ -1885,6 +1887,15 @@ public class MusicMixin implements MusicInterface { @Override public LinkedHashMap<UUID, StagingTable> getEveTxDigest(String nodeName) throws MDBCServiceException { + int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR); + StringBuffer yearSb = new StringBuffer(); + String sep = ""; + for (int y=2019; y<=year; y++) { + yearSb.append(sep); + yearSb.append(y); + sep = ","; + } + StagingTable changes; String cql; LinkedHashMap<UUID, StagingTable> ecDigestInformation = new LinkedHashMap<>(); @@ -1893,12 +1904,12 @@ public class MusicMixin implements MusicInterface { if (musicevetxdigestNodeinfoTimeID != null) { // this will fetch only few records based on the time-stamp condition. - cql = String.format("SELECT * FROM %s.%s WHERE txtimeid > ? LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName); + cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) AND txtimeid > ? LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString()); pQueryObject.appendQueryString(cql); pQueryObject.addValue(musicevetxdigestNodeinfoTimeID); } else { // This is going to Fetch all the Transactiondigest records from the musicevetxdigest table. - cql = String.format("SELECT * FROM %s.%s LIMIT 10;", music_ns, this.musicEventualTxDigestTableName); + cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString()); pQueryObject.appendQueryString(cql); } @@ -2110,9 +2121,17 @@ public class MusicMixin implements MusicInterface { } @Override - public OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges, - Map<UUID,LockResult> locks, UUID ownershipId) throws MDBCServiceException{ + public OwnershipReturn mergeLatestRowsIfNecessary(Dag extendedDag, List<MusicRangeInformationRow> latestRows, + List<Range> ranges, Map<UUID, LockResult> locks, UUID ownershipId) throws MDBCServiceException { recoverFromFailureAndUpdateDag(extendedDag,latestRows,ranges,locks); + if (latestRows.size()==1) { + //reuse current row if possible + MusicRangeInformationRow row = latestRows.get(0); + LockResult lockresult = locks.get(row.getPartitionIndex()); + if (lockresult!=null) { + return new OwnershipReturn(ownershipId, lockresult.getLockId(), row.getPartitionIndex(), ranges, extendedDag); + } + } List<MusicRangeInformationRow> changed = setReadOnlyAnyDoubleRow(extendedDag, latestRows,locks); releaseLocks(changed, locks); MusicRangeInformationRow row = createAndAssignLock(ranges); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java index ef98ebd..8823b81 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java @@ -316,7 +316,7 @@ public class OwnershipAndCheckpoint{ Set<Range> allRanges = currentlyOwn.getAllRanges(); List<MusicRangeInformationRow> isLatestRows = extractRowsForRange(mi, new ArrayList<>(allRanges), true); currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,isLatestRows)); - return mi.mergeLatestRows(currentlyOwn,rows,ranges,newLocks,opId); + return mi.mergeLatestRowsIfNecessary(currentlyOwn,rows,ranges,newLocks,opId); } /** diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java index dbed9e4..26ee73c 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java @@ -91,7 +91,7 @@ public class StagingTable { builderInitialized=true; digestBuilder = CompleteDigest.newBuilder(); this.eventuallyConsistentRanges=eventuallyConsistentRanges; - eventuallyBuilder = (!this.eventuallyConsistentRanges.isEmpty())?null:CompleteDigest.newBuilder(); + eventuallyBuilder = (this.eventuallyConsistentRanges.isEmpty())?null:CompleteDigest.newBuilder(); } public StagingTable(ByteBuffer serialized) throws MDBCServiceException { @@ -242,7 +242,15 @@ public class StagingTable { } synchronized public boolean isEmpty() { - return (digestBuilder.getRowsCount()==0); + return (digestBuilder.getRowsCount()==0 && eventuallyBuilder.getRowsCount()==0); + } + + synchronized public boolean isStrongEmpty() { + return (digestBuilder.getRowsCount()==0); + } + + synchronized public boolean isEventualEmpty() { + return (eventuallyBuilder.getRowsCount()==0); } synchronized public void clear() throws MDBCServiceException { diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java index aba8cb4..41a943e 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java @@ -20,24 +20,22 @@ package org.onap.music.mdbc.mixins; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.junit.Assert.*; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.Session; - -import java.util.*; - +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.UUID; +import java.util.function.Consumer; import org.cassandraunit.utils.EmbeddedCassandraServerHelper; - - import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; -import org.junit.rules.Timeout; import org.onap.music.datastore.MusicDataStore; import org.onap.music.datastore.MusicDataStoreHandle; import org.onap.music.exceptions.MDBCServiceException; @@ -51,12 +49,13 @@ import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.StateManager; -import org.onap.music.mdbc.TestUtils; -import org.onap.music.mdbc.ownership.Dag; -import org.onap.music.mdbc.ownership.DagNode; +import org.onap.music.mdbc.proto.ProtoDigest.Digest.CompleteDigest; import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.MusicTxDigestId; -import org.onap.music.service.impl.MusicCassaCore; +import org.onap.music.mdbc.tables.StagingTable; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import com.google.protobuf.InvalidProtocolBufferException; public class MusicMixinTest { @@ -109,6 +108,7 @@ public class MusicMixinTest { Properties properties = new Properties(); properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,keyspace); properties.setProperty(MusicMixin.KEY_MY_ID,mdbcServerName); + properties.setProperty(MusicMixin.KEY_COMPRESSION, Boolean.toString(true)); mixin=new MusicMixin(stateManager, mdbcServerName,properties); } catch (MDBCServiceException e) { fail("error creating music mixin"); @@ -246,4 +246,39 @@ public class MusicMixinTest { @Test public void relinquishIfRequired() { } + + @Test + public void getEveTxDigest() throws Exception { + + mixin.createMusicEventualTxDigest(); + ByteBuffer compressed = mockCompressedProtoByteBuff(); + MusicTxDigestId digestId = new MusicTxDigestId(UUID.randomUUID(), 1); + mixin.addEventualTxDigest(digestId, compressed); + + LinkedHashMap<UUID, StagingTable> digest = mixin.getEveTxDigest("n1"); + + Consumer<Map.Entry<UUID,StagingTable>> consumer = new Consumer<Map.Entry<UUID,StagingTable>>() { + + @Override + public void accept(Entry<UUID, StagingTable> mapEntry) { + assertNotNull(mapEntry.getValue()); + } + + }; + + digest.entrySet().forEach(consumer); + + + + + } + + protected ByteBuffer mockCompressedProtoByteBuff() throws MDBCServiceException, InvalidProtocolBufferException { + CompleteDigest instance = CompleteDigest.getDefaultInstance(); + // CompleteDigest instance = CompleteDigest.parseFrom(ByteBuffer.wrap("Test".getBytes())); + byte[] bytes = instance.toByteArray(); + ByteBuffer serialized = ByteBuffer.wrap(bytes); + ByteBuffer compressed = StagingTable.Compress(serialized); + return compressed; + } } |