aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClientNew.java379
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java15
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java263
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java2
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java12
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java69
6 files changed, 597 insertions, 143 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClientNew.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClientNew.java
new file mode 100644
index 0000000..16619be
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClientNew.java
@@ -0,0 +1,379 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+package org.onap.music.mdbc.examples;
+
+import java.sql.*;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.calcite.avatica.remote.Driver;
+
+public class MdbcTestClientNew {
+ private String lastName = "Lastname";
+ private int baseId = 700;
+ private int baseIdRange = 50;
+ private int maxCalls = 50;
+ private int maxTimeMs = 60000;
+ private int minDelayBetweenTestsMs = 1000;
+ private int additionalDelayBetweenTestsMs = 1000;
+ private boolean doDelete = true;
+ private boolean doUpdate = true;
+ private int connectionCloseChancePct = 50;
+
+ private boolean explainConnection = true;
+
+ public static class Employee {
+ public final int empid;
+ public String lastname;
+ public String firstname;
+ public String address;
+ public String city;
+
+ public Employee(int empid, String lastname, String firstname, String address, String city) {
+ super();
+ this.empid = empid;
+ this.lastname = lastname;
+ this.firstname = firstname;
+ this.address = address;
+ this.city = city;
+ }
+
+ public String getLastname() {
+ return lastname;
+ }
+
+ public void setLastname(String lastname) {
+ this.lastname = lastname;
+ }
+
+ public String getFirstname() {
+ return firstname;
+ }
+
+ public void setFirstname(String firstname) {
+ this.firstname = firstname;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public void setAddress(String address) {
+ this.address = address;
+ }
+
+ public String getCity() {
+ return city;
+ }
+
+ public void setCity(String city) {
+ this.city = city;
+ }
+
+ public int getEmpid() {
+ return empid;
+ }
+
+ @Override
+ public String toString() {
+ return "Employee: " + empid + ", " + lastname + ", " + firstname + ", " + address + ", " + city;
+ }
+
+
+ }
+
+ public MdbcTestClientNew(String[] args) {
+ lastName = args[0];
+ baseId = Integer.parseInt(args[1]);
+ baseIdRange = Integer.parseInt(args[2]);
+ maxCalls = Integer.parseInt(args[3]);
+ maxTimeMs = Integer.parseInt(args[4]);
+ minDelayBetweenTestsMs = Integer.parseInt(args[5]);
+ additionalDelayBetweenTestsMs = Integer.parseInt(args[6]);
+ doDelete = (args[7].toUpperCase().startsWith("Y"));
+ doUpdate = (args[8].toUpperCase().startsWith("Y"));
+ connectionCloseChancePct = Integer.parseInt(args[9]);
+ }
+
+ public MdbcTestClientNew() {
+ // Use default values
+ }
+
+ private void doTest(Connection connection, Random r) throws SQLException {
+ HashMap<Integer, Employee> employeeMap = new HashMap<Integer, Employee>();
+
+ Statement querySt = connection.createStatement();
+ doLog("Before select");
+
+ if (explainConnection) {
+ doLog("querySt is a: ");
+ Class<?> qsClass = querySt.getClass();
+ while (qsClass!=null) {
+ doLog(">>> " + qsClass.getName());
+ qsClass = qsClass.getSuperclass();
+ }
+ doLog("connection is a ");
+ qsClass = connection.getClass();
+ while (qsClass!=null) {
+ doLog(">>> " + qsClass.getName());
+ qsClass = qsClass.getSuperclass();
+ }
+ explainConnection = false;
+ }
+
+ ResultSet rs = querySt.executeQuery("select * from persons");
+ doLog("After select");
+ while (rs.next()) {
+// doLog("PersonId = " + rs.getInt("personId") + ", lastname = " + rs.getString("lastname") + ", firstname = " + rs.getString("firstname"));
+ Employee emp = new Employee(rs.getInt("personId"), rs.getString("lastname"), rs.getString("firstname"), rs.getString("address"), rs.getString("city"));
+ employeeMap.put(rs.getInt("personId"), emp);
+ doLog("Found: " + emp);
+ }
+ querySt.close();
+
+ Statement insertStmt = connection.createStatement();
+
+ insertStmt.execute(generateStatement(employeeMap, r));
+ while (r.nextBoolean()) {
+ insertStmt.execute(generateStatement(employeeMap, r));
+ }
+
+ connection.commit();
+
+ insertStmt.close();
+ }
+
+ private String generateStatement(HashMap<Integer, Employee> employeeMap, Random r) {
+ String toRet = null;
+
+ int which = r.nextInt(3);
+ if (which==0 && doDelete) {
+ toRet = generateDelete(employeeMap, r);
+ } else if (which==1 && doUpdate) {
+ toRet = generateUpdate(employeeMap, r);
+ }
+ if (toRet==null) {
+ toRet = generateInsert(employeeMap, r);
+ }
+
+ doLog("Generated statement: " + toRet);
+
+ return toRet;
+ }
+
+ private String generateInsert(HashMap<Integer, Employee> employeeMap, Random r) {
+ String toRet = null;
+
+ Integer id = null;
+ int range = baseIdRange;
+ while (id==null) {
+ id = baseId + r.nextInt(range);
+ if (employeeMap.containsKey(id)) id = null;
+ range+=(baseIdRange/5);
+ }
+ Employee newEmp = new Employee(id, lastName, Character.toUpperCase(randomLetter(r)) + generateLetters(r, 4+r.nextInt(4)), generateLetters(r, 4).toUpperCase(), generateLetters(r, 4).toUpperCase());
+ toRet = "insert into persons values (" + id + ", '" + newEmp.getLastname() + "', '" + newEmp.getFirstname() + "', '" + newEmp.getAddress() + "', '" + newEmp.getCity() + "')";
+ employeeMap.put(id, newEmp);
+
+ return toRet;
+ }
+
+ private String generateUpdate(HashMap<Integer, Employee> employeeMap, Random r) {
+ String toRet = null;
+
+ Employee toUpd = chooseTarget(employeeMap, r);
+ if (toUpd!=null) {
+ String newFirst = null;
+ if (toUpd.getFirstname().length()<=3 || r.nextBoolean()) {
+ newFirst = toUpd.getFirstname() + randomLetter(r);
+ } else {
+ newFirst = toUpd.getFirstname().substring(0, toUpd.getFirstname().length()-1);
+ }
+// toRet = "update persons set firstname = '" + newFirst + "' where personid = " + toUpd.getEmpid();
+ toRet = "update persons set firstname = '" + newFirst + "' where personid = " + toUpd.getEmpid() + " and lastname = '" + toUpd.getLastname() + "'";
+ toUpd.setFirstname(newFirst);
+ }
+
+ return toRet;
+ }
+
+ private String generateLetters(Random r, int count) {
+ StringBuffer toRet = new StringBuffer();
+ for (int i=0; i<count; i++) {
+ Character c = null;
+ while (c==null) {
+ c = randomLetter(r);
+ char cc = c.charValue();
+ if ( (cc=='a' || cc=='e' || cc=='i' || cc=='o' || cc=='u') ^ (i%2==0) ) c = null;
+ }
+ toRet.append(c);
+ }
+ return toRet.toString();
+ }
+
+ private char randomLetter(Random r) {
+ int a = (int)'a';
+ return (char)(a+r.nextInt(26));
+ }
+
+ private String generateDelete(HashMap<Integer, Employee> employeeMap, Random r) {
+ String toRet = null;
+
+ Employee toDel = chooseTarget(employeeMap, r);
+ if (toDel!=null) {
+ toRet = "delete from persons where personid = " + toDel.getEmpid() + " and lastname = '" + toDel.getLastname() + "'";
+ employeeMap.remove(toDel.getEmpid());
+ }
+
+ return toRet;
+ }
+
+
+
+ private Employee chooseTarget(HashMap<Integer, Employee> employeeMap, Random r) {
+ Employee toPick = null;
+ int count = 0;
+ for (int id : employeeMap.keySet()) {
+ Employee emp = employeeMap.get(id);
+ if (!emp.getLastname().equals(lastName)) continue;
+ count++;
+ if (r.nextInt(count)==0) toPick = emp;
+ }
+ return toPick;
+ }
+
+ public void runTests() {
+ try {
+ Class.forName("org.apache.calcite.avatica.remote.Driver");
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+
+ Connection connection = null;
+
+ Random r = new Random();
+ boolean done = false;
+ int calls = 0;
+ long startTime = new java.util.Date().getTime();
+ while (!done) {
+ if (connection==null) {
+ try {
+ doLog("Opening new connection");
+ connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf");
+ connection.setAutoCommit(false);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ return;
+ }
+ } else {
+ doLog("Keeping open connection");
+ }
+
+ try {
+ doLog("Running test");
+ doTest(connection, r);
+ doLog("Test complete");
+ } catch (SQLException e1) {
+ e1.printStackTrace();
+ done = true;
+ if (connection!=null) {
+ try {
+ doLog("Closing connection in catch block");
+ connection.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ done = true;
+ } finally {
+ connection = null;
+ }
+ }
+ }
+
+ if (!done && connection!=null && r.nextInt(100)<connectionCloseChancePct) {
+ try {
+ doLog("Closing connection");
+ connection.close();
+ connection = null;
+ } catch (SQLException e) {
+ e.printStackTrace();
+ done = true;
+ }
+ } else {
+ doLog("Not closing connection");
+ }
+
+ calls++;
+ long msElapsed = (new java.util.Date().getTime()) - startTime;
+ if (calls>maxCalls || msElapsed > maxTimeMs) done = true;
+
+ if (!done) {
+ long delay = r.nextInt(minDelayBetweenTestsMs);
+ while (r.nextBoolean()) delay += r.nextInt(additionalDelayBetweenTestsMs);
+ synchronized(r) {
+ try {
+ r.wait(delay);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ done = true;
+ }
+ }
+ }
+
+ doLog("");
+ }
+
+ if (connection!=null) {
+ try {
+ doLog("Closing connection at end");
+ connection.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ doLog("All done.");
+ }
+
+ private void doLog(String string) {
+ System.out.println(">>> " + string);
+ }
+
+ public static void main(String[] args) {
+ MdbcTestClientNew mtc = null;
+ if (args.length==0) {
+ mtc = new MdbcTestClientNew();
+ } else if (args.length==10) {
+ mtc = new MdbcTestClientNew(args);
+ } else {
+ System.out.println("Usage: [this] lastname baseId baseIdRange maxCalls maxTimeMs minDelayBetweenTestsMs additionalDelayBetweenTestsMs doDelete doUpdate connectionCloseChancePct");
+ System.out.println(" lastname: Lastname for all inserts/updates/deletes");
+ System.out.println(" baseId/baseRange: Id for all inserts will be between baseId and (baseId+baseRange). In case of collision baseRange will be increased until available Id is found.");
+ System.out.println(" maxCalls/maxTimeMs: Maximum number of commits (each of which may be 1+ updates) or time (in ms) that the test will run, whichever comes first");
+ System.out.println(" minDelayBetweenTestsMs/additionalDelayBetweenTestsMs: After each test, delay for minDelayBetweenTestsMs ms plus (0 or more) times additionalDelayBetweenTestsMs ms");
+ System.out.println(" doUpdate/doDelete: If \"Y\", will try to generate updates and deletes in addition to inserts. Any failures to generate an update/delete will be replaced with an insert.");
+ System.out.println(" connectionCloseChancePct: after each commit, percent chance of closing connection and opening a new one.");
+ System.out.println("Default settings: Lastname 700 50 50 60000 1000 1000 Y Y 50");
+ }
+
+ mtc.runTests();
+ }
+
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
index 8b91b28..71f1b8b 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
@@ -313,7 +313,20 @@ public interface MusicInterface {
void releaseLocks(Map<UUID, LockResult> newLocks) throws MDBCServiceException;
- OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges,
+ /**
+ * Combine previous musicrangeinformation rows for new partition, if necessary
+ *
+ * Does not merge rows if a single previous row is sufficient to match new partition needed
+ *
+ * @param extendedDag
+ * @param latestRows
+ * @param ranges
+ * @param locks
+ * @param ownershipId
+ * @return
+ * @throws MDBCServiceException
+ */
+ OwnershipReturn mergeLatestRowsIfNecessary(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges,
Map<UUID, LockResult> locks, UUID ownershipId) throws MDBCServiceException;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index c176ad9..e41b7c0 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -156,7 +156,7 @@ public class MusicMixin implements MusicInterface {
static {
// We only support the following type mappings currently (from DB -> Cassandra).
// Anything else will likely cause a NullPointerException
- typemap.put(Types.BIGINT, "BIGINT"); // aka. IDENTITY
+ typemap.put(Types.BIGINT, "BIGINT"); // aka. IDENTITY
typemap.put(Types.BLOB, "VARCHAR");
typemap.put(Types.BOOLEAN, "BOOLEAN");
typemap.put(Types.CLOB, "BLOB");
@@ -169,10 +169,10 @@ public class MusicMixin implements MusicInterface {
typemap.put(Types.TIMESTAMP, "VARCHAR");
typemap.put(Types.VARBINARY, "BLOB");
typemap.put(Types.VARCHAR, "VARCHAR");
- typemap.put(Types.CHAR, "VARCHAR");
+ typemap.put(Types.CHAR, "VARCHAR");
//The "Hacks", these don't have a direct mapping
- //typemap.put(Types.DATE, "VARCHAR");
- //typemap.put(Types.DATE, "TIMESTAMP");
+ //typemap.put(Types.DATE, "VARCHAR");
+ //typemap.put(Types.DATE, "TIMESTAMP");
}
@@ -389,12 +389,12 @@ public class MusicMixin implements MusicInterface {
@Override
public void createDirtyRowTable(TableInfo ti, String tableName) {
// create dirtybitsTable at all replicas
-// for (String repl : allReplicaIds) {
-//// String dirtyRowsTableName = "dirty_"+tableName+"_"+allReplicaIds[i];
-//// String dirtyTableQuery = "CREATE TABLE IF NOT EXISTS "+music_ns+"."+ dirtyRowsTableName+" (dirtyRowKeys text PRIMARY KEY);";
-// cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s_%s (dirtyRowKeys TEXT PRIMARY KEY);", music_ns, tableName, repl);
-// executeMusicWriteQuery(cql);
-// }
+// for (String repl : allReplicaIds) {
+//// String dirtyRowsTableName = "dirty_"+tableName+"_"+allReplicaIds[i];
+//// String dirtyTableQuery = "CREATE TABLE IF NOT EXISTS "+music_ns+"."+ dirtyRowsTableName+" (dirtyRowKeys text PRIMARY KEY);";
+// cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s_%s (dirtyRowKeys TEXT PRIMARY KEY);", music_ns, tableName, repl);
+// executeMusicWriteQuery(cql);
+// }
StringBuilder ddl = new StringBuilder("REPLICA__ TEXT");
StringBuilder cols = new StringBuilder("REPLICA__");
for (int i = 0; i < ti.columns.size(); i++) {
@@ -451,8 +451,8 @@ public class MusicMixin implements MusicInterface {
System.err.println("markDIrtyRow need to fix primary key");
}
String cql = String.format("INSERT INTO %s.DIRTY_%s (%s) VALUES (%s);", music_ns, tableName, cols.toString(), vals.toString());
- /*Session sess = getMusicSession();
- PreparedStatement ps = getPreparedStatementFromCache(cql);*/
+ /*Session sess = getMusicSession();
+ PreparedStatement ps = getPreparedStatementFromCache(cql);*/
String primaryKey;
if(ti.hasKey()) {
primaryKey = getMusicKeyFromRow(ti,tableName, keys);
@@ -476,13 +476,13 @@ public class MusicMixin implements MusicInterface {
pQueryObject.addValue(pkObj);
updateMusicDB(tableName, primaryKey, pQueryObject);
//if (!repl.equals(myId)) {
- /*logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
- vallist.set(0, repl);
- BoundStatement bound = ps.bind(vallist.toArray());
- bound.setReadTimeoutMillis(60000);
- synchronized (sess) {
- sess.execute(bound);
- }*/
+ /*logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
+ vallist.set(0, repl);
+ BoundStatement bound = ps.bind(vallist.toArray());
+ bound.setReadTimeoutMillis(60000);
+ synchronized (sess) {
+ sess.execute(bound);
+ }*/
//}
}
@@ -514,13 +514,13 @@ public class MusicMixin implements MusicInterface {
if(rt.getResult().getResult().toLowerCase().equals("failure")) {
System.out.println("Failure while cleanDirtyRow..."+rt.getMessage());
}
- /*Session sess = getMusicSession();
- PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(vallist.toArray());
- bound.setReadTimeoutMillis(60000);
- synchronized (sess) {
- sess.execute(bound);
- }*/
+ /*Session sess = getMusicSession();
+ PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(vallist.toArray());
+ bound.setReadTimeoutMillis(60000);
+ synchronized (sess) {
+ sess.execute(bound);
+ }*/
}
/**
* Get a list of "dirty rows" for a table. The dirty rows returned apply only to this replica,
@@ -533,14 +533,14 @@ public class MusicMixin implements MusicInterface {
String cql = String.format("SELECT * FROM %s.DIRTY_%s WHERE REPLICA__=?;", music_ns, tableName);
ResultSet results = null;
logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
-
- /*Session sess = getMusicSession();
- PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(new Object[] { myId });
- bound.setReadTimeoutMillis(60000);
- synchronized (sess) {
- results = sess.execute(bound);
- }*/
+
+ /*Session sess = getMusicSession();
+ PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(new Object[] { myId });
+ bound.setReadTimeoutMillis(60000);
+ synchronized (sess) {
+ results = sess.execute(bound);
+ }*/
PreparedQueryObject pQueryObject = new PreparedQueryObject();
pQueryObject.appendQueryString(cql);
try {
@@ -646,14 +646,14 @@ public class MusicMixin implements MusicInterface {
String cql = String.format("DELETE FROM %s.%s WHERE %s;", music_ns, tableName, where.toString());
logger.error(EELFLoggerDelegate.errorLogger,"Executing MUSIC write:"+ cql);
pQueryObject.appendQueryString(cql);
-
- /*PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(vallist.toArray());
- bound.setReadTimeoutMillis(60000);
- Session sess = getMusicSession();
- synchronized (sess) {
- sess.execute(bound);
- }*/
+
+ /*PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(vallist.toArray());
+ bound.setReadTimeoutMillis(60000);
+ Session sess = getMusicSession();
+ synchronized (sess) {
+ sess.execute(bound);
+ }*/
String primaryKey = getMusicKeyFromRow(ti,tableName, oldRow);
updateMusicDB(tableName, primaryKey, pQueryObject);
@@ -716,15 +716,15 @@ public class MusicMixin implements MusicInterface {
e.printStackTrace();
}
- /*
- Session sess = getMusicSession();
- PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(vallist.toArray());
- bound.setReadTimeoutMillis(60000);
- ResultSet dirtyRows = null;
- synchronized (sess) {
- dirtyRows = sess.execute(bound);
- }*/
+ /*
+ Session sess = getMusicSession();
+ PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(vallist.toArray());
+ bound.setReadTimeoutMillis(60000);
+ ResultSet dirtyRows = null;
+ synchronized (sess) {
+ dirtyRows = sess.execute(bound);
+ }*/
List<Row> rows = dirtyRows.all();
if (rows.isEmpty()) {
// No rows, the row must have been deleted
@@ -771,48 +771,48 @@ public class MusicMixin implements MusicInterface {
}
logger.debug("Blocking rowid: "+rowid);
- in_progress.add(rowid); // Block propagation of the following INSERT/UPDATE
+ in_progress.add(rowid); // Block propagation of the following INSERT/UPDATE
dbi.insertRowIntoSqlDb(tableName, map);
logger.debug("Unblocking rowid: "+rowid);
- in_progress.remove(rowid); // Unblock propagation
-
-// try {
-// String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString());
-// executeSQLWrite(sql);
-// } catch (SQLException e) {
-// logger.debug("Insert failed because row exists, do an update");
-// // TODO - rewrite this UPDATE command should not update key fields
-// String sql = String.format("UPDATE %s SET (%s) = (%s) WHERE %s", tableName, fields.toString(), values.toString(), where.toString());
-// try {
-// executeSQLWrite(sql);
-// } catch (SQLException e1) {
-// e1.printStackTrace();
-// }
-// }
+ in_progress.remove(rowid); // Unblock propagation
+
+// try {
+// String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString());
+// executeSQLWrite(sql);
+// } catch (SQLException e) {
+// logger.debug("Insert failed because row exists, do an update");
+// // TODO - rewrite this UPDATE command should not update key fields
+// String sql = String.format("UPDATE %s SET (%s) = (%s) WHERE %s", tableName, fields.toString(), values.toString(), where.toString());
+// try {
+// executeSQLWrite(sql);
+// } catch (SQLException e1) {
+// e1.printStackTrace();
+// }
+// }
ti = dbi.getTableInfo(tableName);
cleanDirtyRow(ti, tableName, new JSONObject(vallist));
-// String selectQuery = "select "+ primaryKeyName+" FROM "+tableName+" WHERE "+primaryKeyName+"="+primaryKeyValue+";";
-// java.sql.ResultSet rs = executeSQLRead(selectQuery);
-// String dbWriteQuery=null;
-// try {
-// if(rs.next()){//this entry is there, do an update
-// dbWriteQuery = "UPDATE "+tableName+" SET "+columnNameString+" = "+ valueString +"WHERE "+primaryKeyName+"="+primaryKeyValue+";";
-// }else
-// dbWriteQuery = "INSERT INTO "+tableName+" VALUES"+valueString+";";
-// executeSQLWrite(dbWriteQuery);
-// } catch (SQLException e) {
-// // ZZTODO Auto-generated catch block
-// e.printStackTrace();
-// }
+// String selectQuery = "select "+ primaryKeyName+" FROM "+tableName+" WHERE "+primaryKeyName+"="+primaryKeyValue+";";
+// java.sql.ResultSet rs = executeSQLRead(selectQuery);
+// String dbWriteQuery=null;
+// try {
+// if(rs.next()){//this entry is there, do an update
+// dbWriteQuery = "UPDATE "+tableName+" SET "+columnNameString+" = "+ valueString +"WHERE "+primaryKeyName+"="+primaryKeyValue+";";
+// }else
+// dbWriteQuery = "INSERT INTO "+tableName+" VALUES"+valueString+";";
+// executeSQLWrite(dbWriteQuery);
+// } catch (SQLException e) {
+// // ZZTODO Auto-generated catch block
+// e.printStackTrace();
+// }
//clean the music dirty bits table
-// String dirtyRowIdsTableName = music_ns+".DIRTY_"+tableName+"_"+myId;
-// String deleteQuery = "DELETE FROM "+dirtyRowIdsTableName+" WHERE dirtyRowKeys=$$"+primaryKeyValue+"$$;";
-// executeMusicWriteQuery(deleteQuery);
+// String dirtyRowIdsTableName = music_ns+".DIRTY_"+tableName+"_"+myId;
+// String deleteQuery = "DELETE FROM "+dirtyRowIdsTableName+" WHERE dirtyRowKeys=$$"+primaryKeyValue+"$$;";
+// executeMusicWriteQuery(deleteQuery);
}
private Object getValue(Row musicRow, String colname) {
ColumnDefinitions cdef = musicRow.getColumnDefinitions();
@@ -914,14 +914,14 @@ public class MusicMixin implements MusicInterface {
pQueryObject.appendQueryString(cql);
String primaryKey = getMusicKeyFromRow(ti,tableName, changedRow);
updateMusicDB(tableName, primaryKey, pQueryObject);
-
- /*PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind(newrow);
- bound.setReadTimeoutMillis(60000);
- Session sess = getMusicSession();
- synchronized (sess) {
- sess.execute(bound);
- }*/
+
+ /*PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind(newrow);
+ bound.setReadTimeoutMillis(60000);
+ Session sess = getMusicSession();
+ synchronized (sess) {
+ sess.execute(bound);
+ }*/
// Mark the dirty rows in music for all the replicas but us
markDirtyRow(ti,tableName, changedRow);
}
@@ -978,7 +978,7 @@ public class MusicMixin implements MusicInterface {
if(rt.getResult().getResult().toLowerCase().equals("failure")) {
logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage());
}
-
+
}
/**
@@ -1287,7 +1287,10 @@ public class MusicMixin implements MusicInterface {
@Override
public void commitLog(DatabasePartition partition,List<Range> eventualRanges, StagingTable transactionDigest,
String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException {
-
+
+ // first deal with commit for eventually consistent tables
+ filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper);
+
if(partition==null){
logger.warn("Trying tcommit log with null partition");
return;
@@ -1382,7 +1385,7 @@ public class MusicMixin implements MusicInterface {
throw new MDBCServiceException();
}
- if(!transactionDigest.isEmpty()) {
+ if(!transactionDigest.isEventualEmpty()) {
ByteBuffer serialized = transactionDigest.getSerializedEventuallyStagingAndClean();
if (serialized!=null && useCompression) {
@@ -1513,7 +1516,7 @@ public class MusicMixin implements MusicInterface {
throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e);
}
- return getMRIRowFromCassandraRow(newRow);
+ return getMRIRowFromCassandraRow(newRow);
}
@Override
@@ -1529,18 +1532,18 @@ public class MusicMixin implements MusicInterface {
logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName);
throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e);
}
- return getRangeDependenciesFromCassandraRow(newRow);
+ return getRangeDependenciesFromCassandraRow(newRow);
}
/**
* This function creates the TransactionInformation table. It contain information related
* to the transactions happening in a given partition.
- * * The schema of the table is
- * * Id, uiid.
- * * Partition, uuid id of the partition
- * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables
- * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables
- * * Redo: list of uiids associated to the Redo Records Table
+ * * The schema of the table is
+ * * Id, uiid.
+ * * Partition, uuid id of the partition
+ * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables
+ * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables
+ * * Redo: list of uiids associated to the Redo Records Table
*
*/
public static void createMusicRangeInformationTable(String namespace, String tableName) throws MDBCServiceException {
@@ -1727,8 +1730,8 @@ public class MusicMixin implements MusicInterface {
/**
* This function creates the MusicEveTxDigest table. It contain information related to each eventual transaction committed
- * * LeaseId: id associated with the lease, text
- * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
+ * * LeaseId: id associated with the lease, text
+ * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
* * TransactionDigest: text that contains all the changes in the transaction
*/
public static void createMusicEventualTxDigest(String musicEventualTxDigestTableName, String musicNamespace, int musicTxDigestTableNumber) throws MDBCServiceException {
@@ -1738,11 +1741,12 @@ public class MusicMixin implements MusicInterface {
"-" +
Integer.toString(musicTxDigestTableNumber);
}
- String priKey = "txTimeId";
+ String priKey = "txTimeId, year";
StringBuilder fields = new StringBuilder();
fields.append("txid uuid, ");
fields.append("transactiondigest blob, ");
fields.append("compressed boolean, ");
+ fields.append("year int, ");
fields.append("txTimeId TIMEUUID ");//notice lack of ','
String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
try {
@@ -1832,19 +1836,17 @@ public class MusicMixin implements MusicInterface {
public void addEventualTxDigest(MusicTxDigestId newId, ByteBuffer transactionDigest) throws MDBCServiceException {
//createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest);
PreparedQueryObject query = new PreparedQueryObject();
- String cqlQuery = "INSERT INTO " +
- this.music_ns +
- '.' +
- this.musicEventualTxDigestTableName +
- " (txid,transactiondigest,compressed,txTimeId) " +
- "VALUES (" +
- newId.transactionId+ ",'" +
- transactionDigest + "'," +
- useCompression + ","+
- // "toTimestamp(now())" +
- "now()" +
- ");";
- query.appendQueryString(cqlQuery);
+ int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR);
+
+
+ String cql = String.format("INSERT INTO %s.%s (txid,transactiondigest,compressed,year,txTimeId ) VALUES (?,?,?,?,now());",this.music_ns,
+ this.musicEventualTxDigestTableName);
+ query.appendQueryString(cql);
+ query.addValue( newId.transactionId);
+ query.addValue(transactionDigest);
+ query.addValue(useCompression);
+ query.addValue(year);
+ // query.appendQueryString(cqlQuery);
//\TODO check if I am not shooting on my own foot
try {
MusicCore.nonKeyRelatedPut(query,"critical");
@@ -1885,6 +1887,15 @@ public class MusicMixin implements MusicInterface {
@Override
public LinkedHashMap<UUID, StagingTable> getEveTxDigest(String nodeName) throws MDBCServiceException {
+ int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR);
+ StringBuffer yearSb = new StringBuffer();
+ String sep = "";
+ for (int y=2019; y<=year; y++) {
+ yearSb.append(sep);
+ yearSb.append(y);
+ sep = ",";
+ }
+
StagingTable changes;
String cql;
LinkedHashMap<UUID, StagingTable> ecDigestInformation = new LinkedHashMap<>();
@@ -1893,12 +1904,12 @@ public class MusicMixin implements MusicInterface {
if (musicevetxdigestNodeinfoTimeID != null) {
// this will fetch only few records based on the time-stamp condition.
- cql = String.format("SELECT * FROM %s.%s WHERE txtimeid > ? LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName);
+ cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) AND txtimeid > ? LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString());
pQueryObject.appendQueryString(cql);
pQueryObject.addValue(musicevetxdigestNodeinfoTimeID);
} else {
// This is going to Fetch all the Transactiondigest records from the musicevetxdigest table.
- cql = String.format("SELECT * FROM %s.%s LIMIT 10;", music_ns, this.musicEventualTxDigestTableName);
+ cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString());
pQueryObject.appendQueryString(cql);
}
@@ -2110,9 +2121,17 @@ public class MusicMixin implements MusicInterface {
}
@Override
- public OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges,
- Map<UUID,LockResult> locks, UUID ownershipId) throws MDBCServiceException{
+ public OwnershipReturn mergeLatestRowsIfNecessary(Dag extendedDag, List<MusicRangeInformationRow> latestRows,
+ List<Range> ranges, Map<UUID, LockResult> locks, UUID ownershipId) throws MDBCServiceException {
recoverFromFailureAndUpdateDag(extendedDag,latestRows,ranges,locks);
+ if (latestRows.size()==1) {
+ //reuse current row if possible
+ MusicRangeInformationRow row = latestRows.get(0);
+ LockResult lockresult = locks.get(row.getPartitionIndex());
+ if (lockresult!=null) {
+ return new OwnershipReturn(ownershipId, lockresult.getLockId(), row.getPartitionIndex(), ranges, extendedDag);
+ }
+ }
List<MusicRangeInformationRow> changed = setReadOnlyAnyDoubleRow(extendedDag, latestRows,locks);
releaseLocks(changed, locks);
MusicRangeInformationRow row = createAndAssignLock(ranges);
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
index ef98ebd..8823b81 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
@@ -316,7 +316,7 @@ public class OwnershipAndCheckpoint{
Set<Range> allRanges = currentlyOwn.getAllRanges();
List<MusicRangeInformationRow> isLatestRows = extractRowsForRange(mi, new ArrayList<>(allRanges), true);
currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,isLatestRows));
- return mi.mergeLatestRows(currentlyOwn,rows,ranges,newLocks,opId);
+ return mi.mergeLatestRowsIfNecessary(currentlyOwn,rows,ranges,newLocks,opId);
}
/**
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
index dbed9e4..26ee73c 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
@@ -91,7 +91,7 @@ public class StagingTable {
builderInitialized=true;
digestBuilder = CompleteDigest.newBuilder();
this.eventuallyConsistentRanges=eventuallyConsistentRanges;
- eventuallyBuilder = (!this.eventuallyConsistentRanges.isEmpty())?null:CompleteDigest.newBuilder();
+ eventuallyBuilder = (this.eventuallyConsistentRanges.isEmpty())?null:CompleteDigest.newBuilder();
}
public StagingTable(ByteBuffer serialized) throws MDBCServiceException {
@@ -242,7 +242,15 @@ public class StagingTable {
}
synchronized public boolean isEmpty() {
- return (digestBuilder.getRowsCount()==0);
+ return (digestBuilder.getRowsCount()==0 && eventuallyBuilder.getRowsCount()==0);
+ }
+
+ synchronized public boolean isStrongEmpty() {
+ return (digestBuilder.getRowsCount()==0);
+ }
+
+ synchronized public boolean isEventualEmpty() {
+ return (eventuallyBuilder.getRowsCount()==0);
}
synchronized public void clear() throws MDBCServiceException {
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java
index aba8cb4..41a943e 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java
@@ -20,24 +20,22 @@
package org.onap.music.mdbc.mixins;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.junit.Assert.*;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
-
-import java.util.*;
-
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.function.Consumer;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
-
-
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Test;
-import org.junit.rules.Timeout;
import org.onap.music.datastore.MusicDataStore;
import org.onap.music.datastore.MusicDataStoreHandle;
import org.onap.music.exceptions.MDBCServiceException;
@@ -51,12 +49,13 @@ import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.StateManager;
-import org.onap.music.mdbc.TestUtils;
-import org.onap.music.mdbc.ownership.Dag;
-import org.onap.music.mdbc.ownership.DagNode;
+import org.onap.music.mdbc.proto.ProtoDigest.Digest.CompleteDigest;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.MusicTxDigestId;
-import org.onap.music.service.impl.MusicCassaCore;
+import org.onap.music.mdbc.tables.StagingTable;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.google.protobuf.InvalidProtocolBufferException;
public class MusicMixinTest {
@@ -109,6 +108,7 @@ public class MusicMixinTest {
Properties properties = new Properties();
properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,keyspace);
properties.setProperty(MusicMixin.KEY_MY_ID,mdbcServerName);
+ properties.setProperty(MusicMixin.KEY_COMPRESSION, Boolean.toString(true));
mixin=new MusicMixin(stateManager, mdbcServerName,properties);
} catch (MDBCServiceException e) {
fail("error creating music mixin");
@@ -246,4 +246,39 @@ public class MusicMixinTest {
@Test
public void relinquishIfRequired() {
}
+
+ @Test
+ public void getEveTxDigest() throws Exception {
+
+ mixin.createMusicEventualTxDigest();
+ ByteBuffer compressed = mockCompressedProtoByteBuff();
+ MusicTxDigestId digestId = new MusicTxDigestId(UUID.randomUUID(), 1);
+ mixin.addEventualTxDigest(digestId, compressed);
+
+ LinkedHashMap<UUID, StagingTable> digest = mixin.getEveTxDigest("n1");
+
+ Consumer<Map.Entry<UUID,StagingTable>> consumer = new Consumer<Map.Entry<UUID,StagingTable>>() {
+
+ @Override
+ public void accept(Entry<UUID, StagingTable> mapEntry) {
+ assertNotNull(mapEntry.getValue());
+ }
+
+ };
+
+ digest.entrySet().forEach(consumer);
+
+
+
+
+ }
+
+ protected ByteBuffer mockCompressedProtoByteBuff() throws MDBCServiceException, InvalidProtocolBufferException {
+ CompleteDigest instance = CompleteDigest.getDefaultInstance();
+ // CompleteDigest instance = CompleteDigest.parseFrom(ByteBuffer.wrap("Test".getBytes()));
+ byte[] bytes = instance.toByteArray();
+ ByteBuffer serialized = ByteBuffer.wrap(bytes);
+ ByteBuffer compressed = StagingTable.Compress(serialized);
+ return compressed;
+ }
}