aboutsummaryrefslogtreecommitdiffstats
path: root/mdbc-server
diff options
context:
space:
mode:
authorArthur Martella <arthur.martella.1@att.com>2019-04-29 15:03:07 -0400
committerArthur Martella <arthur.martella.1@att.com>2019-05-03 10:04:56 -0400
commit52159a510dc31ee1dacf612a1e05cc8612a117e5 (patch)
tree167fd4e5357ecab6b7a5fa8675dc7c3b63453b38 /mdbc-server
parent601050a535e916c0ff52826fbe2c61b798c74530 (diff)
Implement postgres, fixes to eventual, and many bug fixes
Streamline upper and lower cases for Ranges Fix initialization of system Merged version of commit https://gerrit.onap.org/r/#/c/86160/ Change-Id: I169ed56ff79ff0a2e14ab9bc5e0467d1c0b9f0a9 Issue-ID: MUSIC-387 Signed-off-by: Arthur Martella <arthur.martella.1@att.com>
Diffstat (limited to 'mdbc-server')
-rwxr-xr-xmdbc-server/pom.xml19
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java4
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java19
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java55
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java9
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/Range.java21
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java61
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java (renamed from mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java)0
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java80
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java16
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java9
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json11
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json8
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java100
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java10
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java5
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java2
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java55
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java156
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java1066
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java26
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java35
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java8
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java2
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java11
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java6
-rwxr-xr-xmdbc-server/src/main/resources/mdbc.properties13
-rwxr-xr-xmdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java2
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java280
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java19
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java263
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java220
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java96
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java64
34 files changed, 2320 insertions, 431 deletions
diff --git a/mdbc-server/pom.xml b/mdbc-server/pom.xml
index cdf7cff..7a46120 100755
--- a/mdbc-server/pom.xml
+++ b/mdbc-server/pom.xml
@@ -89,9 +89,9 @@
<version>20160810</version>
</dependency>
<dependency>
- <groupId>org.mariadb.jdbc</groupId>
- <artifactId>mariadb-java-client</artifactId>
- <version>2.3.0</version>
+ <groupId>org.mariadb.jdbc</groupId>
+ <artifactId>mariadb-java-client</artifactId>
+ <version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
@@ -200,6 +200,19 @@
<artifactId>protobuf-java</artifactId>
<version>3.6.1</version>
</dependency>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>42.2.5</version>
+ </dependency>
+ <dependency>
+ <groupId>com.opentable.components</groupId>
+ <artifactId>otj-pg-embedded</artifactId>
+ <version>0.13.1</version>
+ <scope>test</scope>
+ </dependency>
+
+
</dependencies>
<build>
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
index ced5745..91b13f3 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
@@ -28,6 +28,8 @@ public class Configuration {
public static final String KEY_DB_MIXIN_NAME = "MDBC_DB_MIXIN";
/** The property name to use to select the MUSIC 'mixin'. */
public static final String KEY_MUSIC_MIXIN_NAME = "MDBC_MUSIC_MIXIN";
+ /** The property name to select if async staging table update is used */
+ public static final String KEY_ASYNC_STAGING_TABLE_UPDATE = "ASYNC_STAGING_TABLE_UPDATE";
/** The name of the default mixin to use for the DBInterface. */
public static final String DB_MIXIN_DEFAULT = "mysql";//"h2";
/** The name of the default mixin to use for the MusicInterface. */
@@ -44,4 +46,6 @@ public class Configuration {
public static final long DEFAULT_OWNERSHIP_TIMEOUT = 5*60*60*1000;//default of 5 hours
/** The property name to provide comma separated list of ranges to warmup */
public static final String KEY_WARMUPRANGES = "warmupranges";
+ /** Default async staging table update o ption*/
+ public static final String ASYNC_STAGING_TABLE_UPDATE = "false";
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
index 32d456e..ae2b869 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
@@ -98,10 +98,25 @@ public class MDBCUtils {
}
public static List<Range> getTables(Map<String,List<SQLOperation>> queryParsed){
+ return getTables(null, queryParsed);
+ }
+
+ public static List<Range> getTables(String defaultDatabaseName, Map<String,List<SQLOperation>> queryParsed){
List<Range> ranges = new ArrayList<>();
for(String table: queryParsed.keySet()){
- ranges.add(new Range(table));
- }
+ String[] parts = table.split("\\.");
+ if(parts.length==2){
+ ranges.add(new Range(table));
+ }
+ else if(parts.length==1 && defaultDatabaseName!=null){
+ ranges.add(new Range(defaultDatabaseName+"."+table));
+ }
+ else{
+ throw new IllegalArgumentException("Table should either have only one '.' or none at all, the table "
+ + "received is "+table);
+ }
+
+ }
return ranges;
}
} \ No newline at end of file
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
index 3db6c3f..f1ac851 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
@@ -160,18 +160,7 @@ public class MdbcConnection implements Connection {
if(progressKeeper!=null) progressKeeper.commitRequested(id);
logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b);
if (b) {
- // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction
- if(id == null || id.isEmpty()) {
- logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
- throw new SQLException("tx id is null");
- }
- try {
- mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper);
- } catch (MDBCServiceException e) {
- // TODO Auto-generated catch block
- logger.error("Cannot commit log to music" + e.getStackTrace());
- throw new SQLException(e.getMessage(), e);
- }
+ musicCommit();
}
if(progressKeeper!=null) {
progressKeeper.setMusicDone(id);
@@ -191,13 +180,7 @@ public class MdbcConnection implements Connection {
return jdbcConn.getAutoCommit();
}
- /**
- * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed,
- * they are performed now and copied into MUSIC.
- * @throws SQLException
- */
- @Override
- public void commit() throws SQLException {
+ private void musicCommit() throws SQLException {
if(progressKeeper.isComplete(id)) {
return;
}
@@ -205,6 +188,7 @@ public class MdbcConnection implements Connection {
progressKeeper.commitRequested(id);
}
+ dbi.preCommitHook();
try {
logger.debug(EELFLoggerDelegate.applicationLogger, " commit ");
// transaction was committed -- add all the updates into the REDO-Log in MUSIC
@@ -214,6 +198,16 @@ public class MdbcConnection implements Connection {
logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
throw new SQLException("Failure commiting to MUSIC", e);
}
+ }
+
+ /**
+ * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed,
+ * they are performed now and copied into MUSIC.
+ * @throws SQLException
+ */
+ @Override
+ public void commit() throws SQLException {
+ musicCommit();
if(progressKeeper != null) {
progressKeeper.setMusicDone(id);
@@ -273,9 +267,10 @@ public class MdbcConnection implements Connection {
} catch (MDBCServiceException e) {
throw new SQLException("Failure during relinquish of partition",e);
}
- // Warning! Make sure this call remains AFTER the call to jdbcConn.close(),
- // otherwise you're going to get stuck in an infinite loop.
- statemanager.closeConnection(id);
+
+ // Warning! Make sure this call remains AFTER the call to jdbcConn.close(),
+ // otherwise you're going to get stuck in an infinite loop.
+ statemanager.closeConnection(id);
}
@Override
@@ -507,7 +502,8 @@ public class MdbcConnection implements Connection {
//Parse tables from the sql query
Map<String, List<SQLOperation>> tableToInstruction = QueryProcessor.parseSqlQuery(sql);
//Check ownership of keys
- List<Range> queryTables = MDBCUtils.getTables(tableToInstruction);
+ String defaultSchema = dbi.getSchema();
+ List<Range> queryTables = MDBCUtils.getTables(defaultSchema, tableToInstruction);
if (this.partition!=null) {
List<Range> snapshot = this.partition.getSnapshot();
if(snapshot!=null){
@@ -550,18 +546,15 @@ public class MdbcConnection implements Connection {
logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1);
for (String tableName : set1) {
// This map will be filled in if this table was previously discovered
- tableName = tableName.toUpperCase();
- if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) {
+ if (!table_set.contains(tableName.toUpperCase()) && !dbi.getReservedTblNames().contains(tableName.toUpperCase())) {
logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName);
try {
TableInfo ti = dbi.getTableInfo(tableName);
- mi.initializeMusicForTable(ti,tableName);
- //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted
- ti = dbi.getTableInfo(tableName);
- mi.createDirtyRowTable(ti,tableName);
+ //mi.initializeMusicForTable(ti,tableName);
+ //mi.createDirtyRowTable(ti,tableName);
dbi.createSQLTriggers(tableName);
- table_set.add(tableName);
- dbi.synchronizeData(tableName);
+ table_set.add(tableName.toUpperCase());
+ //dbi.synchronizeData(tableName);
logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" +
table_set.size() + "/" + set1.size() + "tables uploaded");
} catch (Exception e) {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
index 42b9710..500ed81 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
@@ -80,6 +80,15 @@ public class MdbcServer {
Properties connectionProps = new Properties();
connectionProps.put("user", user);
connectionProps.put("password", password);
+ String defaultMusicMixin = Utils.getDefaultMusicMixin();
+ if(defaultMusicMixin!=null){
+ connectionProps.put(Configuration.KEY_MUSIC_MIXIN_NAME,defaultMusicMixin);
+ }
+ String defaultDBMixin = Utils.getDefaultDBMixin();
+ if(defaultMusicMixin!=null){
+ connectionProps.put(Configuration.KEY_DB_MIXIN_NAME,defaultDBMixin);
+ }
+ Utils.registerDefaultDrivers();
meta = new MdbcServerLogic(url,connectionProps,config);
LocalService service = new LocalService(meta);
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
index 16e7170..41aed26 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
@@ -23,6 +23,9 @@ import java.io.Serializable;
import java.util.List;
import java.util.Objects;
+import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.mixins.MusicMixin;
+
/**
* This class represent a range of the whole database
@@ -32,13 +35,21 @@ import java.util.Objects;
*/
public class Range implements Cloneable{
+ private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Range.class);
+
private String table;
public Range(String table) {
- this.table = table.toUpperCase();
+ final String[] split = table.split("\\.");
+ if(split.length!=2){
+ logger.debug("Table should contain schema, received in constructor: " + table);
+// throw new IllegalArgumentException("Table should always contain the schema, table received in "
+// + "constructor is "+table);
+ }
+ this.table = table;
}
- public String toString(){return table.toUpperCase();}
+ public String toString(){return table;}
/**
* Compares to Range types
@@ -55,7 +66,7 @@ public class Range implements Cloneable{
@Override
public int hashCode(){
- return table.hashCode();
+ return table.toUpperCase().hashCode();
}
@Override
@@ -74,11 +85,11 @@ public class Range implements Cloneable{
public static boolean overlaps(List<Range> ranges, String table){
//\TODO check if parallel stream makes sense here
- return ranges.stream().map((Range r) -> r.table.equals(table)).anyMatch((Boolean b) -> b);
+ return ranges.stream().map((Range r) -> r.table.toUpperCase().equals(table.toUpperCase())).anyMatch((Boolean b) -> b);
}
public boolean overlaps(Range other) {
- return table.equals(other.table);
+ return table.toUpperCase().equals(other.table.toUpperCase());
}
public String getTable() {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index e4c4a24..18bc0db 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -21,7 +21,6 @@ package org.onap.music.mdbc;
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.onap.music.exceptions.MDBCServiceException;
@@ -47,6 +46,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -102,10 +102,10 @@ public class StateManager {
public StateManager() {
}
- public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
+ public StateManager(String sqlDBUrl, Properties newInfo, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
this.sqlDBName = sqlDBName;
- this.sqlDBUrl = sqlDBUrl;
- this.info = info;
+ this.sqlDBUrl = cleanSqlUrl(sqlDBUrl);
+ this.info = new Properties();
this.mdbcServerName = mdbcServerName;
this.connectionRanges = new ConcurrentHashMap<>();
@@ -117,6 +117,7 @@ public class StateManager {
} catch (IOException e) {
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
}
+ info.putAll(newInfo);
cassandraUrl = info.getProperty(Configuration.KEY_CASSANDRA_URL, Configuration.CASSANDRA_URL_DEFAULT);
musicmixin = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT);
@@ -129,6 +130,16 @@ public class StateManager {
ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout);
}
+ protected String cleanSqlUrl(String url){
+ if(url!=null) {
+ url = url.trim();
+ if (url.length() > 0 && url.charAt(url.length() - 1) == '/') {
+ url= url.substring(0, url.length() - 1);
+ }
+ }
+ return url;
+ }
+
protected void initTxDaemonThread(){
txDaemon = new Thread(
new MusicTxDigestDaemon(Integer.parseInt(
@@ -149,27 +160,21 @@ public class StateManager {
}
protected void initSqlDatabase() throws MDBCServiceException {
- try {
- //\TODO: pass the driver as a variable
- Class.forName("org.mariadb.jdbc.Driver");
- }
- catch (ClassNotFoundException e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
- ErrorTypes.GENERALSERVICEERROR);
- return;
- }
- try {
- Connection sqlConnection = DriverManager.getConnection(this.sqlDBUrl, this.info);
- StringBuilder sql = new StringBuilder("CREATE DATABASE IF NOT EXISTS ")
+ if(!this.sqlDBUrl.toLowerCase().startsWith("jdbc:postgresql")) {
+ try {
+ Connection sqlConnection = DriverManager.getConnection(this.sqlDBUrl, this.info);
+ StringBuilder sql = new StringBuilder("CREATE DATABASE IF NOT EXISTS ")
.append(sqlDBName)
.append(";");
- Statement stmt = sqlConnection.createStatement();
- stmt.execute(sql.toString());
- sqlConnection.close();
- } catch (SQLException e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
+ Statement stmt = sqlConnection.createStatement();
+ stmt.execute(sql.toString());
+ sqlConnection.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.UNKNOWNERROR,
+ ErrorSeverity.CRITICAL,
ErrorTypes.GENERALSERVICEERROR);
- throw new MDBCServiceException(e.getMessage(), e);
+ throw new MDBCServiceException(e.getMessage(), e);
+ }
}
}
@@ -306,17 +311,9 @@ public class StateManager {
public Connection openConnection(String id) {
Connection sqlConnection;
MdbcConnection newConnection;
- try {
- //TODO: pass the driver as a variable
- Class.forName("org.mariadb.jdbc.Driver");
- }
- catch (ClassNotFoundException e) {
- // TODO Auto-generated catch block
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL,
- ErrorTypes.QUERYERROR);
- }
-
+ Utils.registerDefaultDrivers();
//Create connection to local SQL DB
+
try {
sqlConnection = DriverManager.getConnection(this.sqlDBUrl+"/"+this.sqlDBName, this.info);
} catch (SQLException e) {
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java
index e5a3252..e5a3252 100755
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java
new file mode 100644
index 0000000..7a09dca
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java
@@ -0,0 +1,80 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.onap.music.logging.EELFLoggerDelegate;
+
+public class Utils {
+
+
+ public static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Utils.class);
+
+ static Properties retrieveMdbcProperties() {
+ Properties pr = null;
+ try {
+ pr = new Properties();
+ pr.load(Utils.class.getResourceAsStream("/mdbc.properties"));
+ } catch (IOException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Could not load property file: " + e.getMessage());
+ }
+ return pr;
+ }
+
+ public static String getDefaultMusicMixin() {
+ Properties pr = retrieveMdbcProperties();
+ if (pr == null)
+ return null;
+ String defaultMusicMixin = pr.getProperty("DEFAULT_MUSIC_MIXIN");
+ return defaultMusicMixin;
+ }
+
+ public static String getDefaultDBMixin() {
+ Properties pr = retrieveMdbcProperties();
+ if (pr == null)
+ return null;
+ String defaultMusicMixin = pr.getProperty("DEFAULT_DB_MIXIN");
+ return defaultMusicMixin;
+ }
+
+ public static void registerDefaultDrivers() {
+ Properties pr = null;
+ try {
+ pr = new Properties();
+ pr.load(Utils.class.getResourceAsStream("/mdbc.properties"));
+ } catch (IOException e) {
+ logger.error("Could not load property file > " + e.getMessage());
+ }
+
+ String drivers = pr.getProperty("DEFAULT_DRIVERS");
+ for (String driver : drivers.split("[ ,]")) {
+ logger.info(EELFLoggerDelegate.applicationLogger, "Registering jdbc driver '" + driver + "'");
+ try {
+ Class.forName(driver.trim());
+ } catch (ClassNotFoundException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Driver class " + driver + " not found.");
+ }
+ }
+ }
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
index 38309d5..391ee1a 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
@@ -41,7 +41,7 @@ public class NodeConfiguration {
public String nodeName;
public String sqlDatabaseName;
- public NodeConfiguration(String tables, String eventualTables, UUID mriIndex, String sqlDatabaseName, String node){
+ public NodeConfiguration(String tables, List<String> eventualTables, UUID mriIndex, String sqlDatabaseName, String node){
// public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String mriTable, String lockId, String musicTxDigestTable) {
partition = new DatabasePartition(toRanges(tables), mriIndex, null) ;
eventual = new Eventual(toRanges(eventualTables));
@@ -49,16 +49,20 @@ public class NodeConfiguration {
this.sqlDatabaseName = sqlDatabaseName;
}
+ protected List<Range> toRanges(List<String> tables){
+ List<Range> newRange = new ArrayList<>();
+ for(String table: tables) {
+ newRange.add(new Range(table));
+ }
+ return newRange;
+ }
+
protected List<Range> toRanges(String tables){
if(tables.isEmpty()){
return new ArrayList<>();
}
- List<Range> newRange = new ArrayList<>();
String[] tablesArray=tables.split(",");
- for(String table: tablesArray) {
- newRange.add(new Range(table));
- }
- return newRange;
+ return toRanges(new ArrayList<>(Arrays.asList(tablesArray)));
}
public String toJson() {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
index 343a8b8..0598271 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
@@ -26,6 +26,7 @@ import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.mixins.MusicMixin;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigestDaemon;
import com.google.gson.Gson;
import org.onap.music.datastore.PreparedQueryObject;
@@ -46,6 +47,7 @@ public class TablesConfiguration {
private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TablesConfiguration.class);
private List<PartitionInformation> partitions;
+ private List<String> eventual;
String tableToPartitionName;
private String musicNamespace;
private String partitionInformationTableName;
@@ -83,16 +85,17 @@ public class TablesConfiguration {
logger.info("Creating empty row with id "+partitionId);
MusicMixin.createEmptyMriRow(musicNamespace,partitionInfo.mriTableName,UUID.fromString(partitionId),
- partitionInfo.owner,null,partitionInfo.getTables(),true);
+ partitionInfo.owner,null,
+ partitionInfo.getTables(),true);
//3) Create config for this node
StringBuilder newStr = new StringBuilder();
for(Range r: partitionInfo.tables){
- newStr.append(r.toString().toUpperCase()).append(",");
+ newStr.append(r.toString()).append(",");
}
logger.info("Appending row to configuration "+partitionId);
- nodeConfigs.add(new NodeConfiguration(newStr.toString(),"",UUID.fromString(partitionId),
+ nodeConfigs.add(new NodeConfiguration(newStr.toString(),eventual,UUID.fromString(partitionId),
sqlDatabaseName, partitionInfo.owner));
}
return nodeConfigs;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json
new file mode 100644
index 0000000..430525f
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json
@@ -0,0 +1,11 @@
+{
+ "internalNamespace": "music_internal",
+ "internalReplicationFactor": 1,
+ "musicNamespace": "namespace",
+ "musicReplicationFactor": 1,
+ "mriTableName": "musicrangeinformation",
+ "mtxdTableName": "musictxdigest",
+ "eventualMtxdTableName":"musicevetxdigest",
+ "nodeInfoTableName":"nodeinfo",
+ "rangeDependencyTableName":"musicrangedependency"
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json
index 8cbbfec..559d597 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json
@@ -3,21 +3,19 @@
{
"tables": [
{
- "table": "PERSONS"
+ "table": "public.PERSONS"
}
],
"owner": "",
"mriTableName": "musicrangeinformation",
- "mtxdTableName": "musictxdigest",
"partitionId": ""
}
],
- "internalNamespace": "music_internal",
- "internalReplicationFactor": 1,
+ "eventual": ["public.eventualPERSONS"],
"musicNamespace": "namespace",
- "musicReplicationFactor": 1,
"tableToPartitionName": "tabletopartition",
"partitionInformationTableName": "partitioninfo",
"redoHistoryTableName": "redohistory",
"sqlDatabaseName": "test"
}
+
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java
new file mode 100644
index 0000000..543cbd0
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java
@@ -0,0 +1,100 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc.mixins;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.websocket.RemoteEndpoint.Async;
+import org.onap.music.logging.EELFLoggerDelegate;
+
+public class AsyncUpdateHandler {
+
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(AsyncUpdateHandler.class);
+
+ Object finishMonitor;
+ Object taskMonitor;
+ AtomicBoolean pendingUpdate;
+ Runnable handler;
+
+ AsyncUpdateHandler(Runnable handlerToRun){
+ handler=handlerToRun;
+ finishMonitor=new Object();
+ taskMonitor = new Object();
+ pendingUpdate=new AtomicBoolean(false);
+ createUpdateExecutor();
+ }
+
+ void createUpdateExecutor(){
+ Runnable newRunnable = ()-> UpdateExecutor();
+ Thread newThread = new Thread(newRunnable);
+ newThread.setDaemon(true);
+ newThread.start();
+ }
+
+ public void processNewUpdate(){
+ pendingUpdate.set(true);
+ synchronized (taskMonitor) {
+ taskMonitor.notifyAll();
+ }
+ }
+
+ void UpdateExecutor(){
+ while(true) {
+ synchronized (taskMonitor) {
+ try {
+ if(!pendingUpdate.get()){
+ taskMonitor.wait();
+ }
+ } catch (InterruptedException e) {
+ logger.error("Update Executor received an interrup exception");
+ }
+ }
+ startUpdate();
+ }
+ }
+
+ void startUpdate(){
+ synchronized (finishMonitor) {
+ //Keep running until there are no more requests
+ while (pendingUpdate.getAndSet(false)) {
+ if(handler!=null){
+ handler.run();
+ }
+ }
+ finishMonitor.notifyAll();
+ }
+
+ }
+
+ public void waitForAllPendingUpdates(){
+ //Wait until there are no more requests and the thread is not longer running
+ //We could use a join, but given that the thread could be temporally null, then it is easier to
+ //separate the wait from the thread that is running
+ synchronized(finishMonitor){
+ while(pendingUpdate.get()) {
+ try {
+ finishMonitor.wait();
+ } catch (InterruptedException e) {
+ logger.error("waitForAllPendingUpdate received exception");
+ }
+ }
+ }
+ }
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
index a594918..dd71d97 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
@@ -111,6 +112,13 @@ public interface DBInterface {
* @return a ResultSet containing the rows returned from the query
*/
ResultSet executeSQLRead(String sql);
+
+ /**
+ * This method is used to verify that all data structures needed for commit are ready for commit
+ * Either for the MusicMixin (e.g. staging table) or internally for the DBMixin
+ * Important this is not for actual commit operation, it should be treated as a signal.
+ */
+ void preCommitHook();
void synchronizeData(String tableName);
@@ -133,4 +141,6 @@ public interface DBInterface {
void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException;
Connection getSQLConnection();
+
+ String getSchema();
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
index d822615..324b0f6 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
@@ -61,12 +61,13 @@ public class MixinFactory {
con = cl.getConstructor(MusicInterface.class, String.class, Connection.class, Properties.class);
if (con != null) {
logger.info(EELFLoggerDelegate.applicationLogger,"Found match: "+miname);
- return (DBInterface) con.newInstance(mi, url, conn, info);
+ DBInterface newdbi = (DBInterface) con.newInstance(mi, url, conn, info);
+ return newdbi;
}
}
}
} catch (Exception e) {
- logger.error(EELFLoggerDelegate.errorLogger,"createDBInterface: "+e);
+ logger.error(EELFLoggerDelegate.errorLogger,"createDBInterface error for "+cl.getName(),e);
}
}
return null;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
index 71f1b8b..fc8897f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
@@ -303,8 +303,8 @@ public interface MusicInterface {
void deleteOldMriRows(Map<UUID,String> oldRowsAndLocks) throws MDBCServiceException;
List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException;
-
+
void deleteMriRow(MusicRangeInformationRow row) throws MDBCServiceException;
void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index cb37a55..5a322f3 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -61,7 +61,6 @@ import org.onap.music.mdbc.StateManager;
import org.onap.music.mdbc.TableInfo;
import org.onap.music.mdbc.ownership.Dag;
import org.onap.music.mdbc.ownership.DagNode;
-import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.MusicTxDigestId;
@@ -99,6 +98,8 @@ public class MusicMixin implements MusicInterface {
public static final String KEY_MUSIC_RFACTOR = "music_rfactor";
/** The property name to use to provide the replication factor for Cassandra. */
public static final String KEY_MUSIC_NAMESPACE = "music_namespace";
+ /** The property name to use to provide a timeout to mdbc (ownership) */
+ public static final String KEY_TIMEOUT = "mdbc_timeout";
/** The property name to use to provide a flag indicating if compression is required */
public static final String KEY_COMPRESSION = "mdbc_compression";
/** Namespace for the tables in MUSIC (Cassandra) */
@@ -106,13 +107,15 @@ public class MusicMixin implements MusicInterface {
/** The default property value to use for the Cassandra IP address. */
public static final String DEFAULT_MUSIC_ADDRESS = "localhost";
/** The default property value to use for the Cassandra replication factor. */
- public static final int DEFAULT_MUSIC_RFACTOR = 1;
+ public static final int DEFAULT_MUSIC_RFACTOR = 3;
+ /** The default property value to use for the MDBC timeout */
+ public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours
/** The default primary string column, if none is provided. */
public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid";
/** Type of the primary key, if none is defined by the user */
public static final String MDBC_PRIMARYKEY_TYPE = "uuid";
public static final boolean DEFAULT_COMPRESSION = true;
-
+ //TODO: Control network topology strategy with a configuration file entry
public static final boolean ENABLE_NETWORK_TOPOLOGY_STRATEGY = false;
//\TODO Add logic to change the names when required and create the tables when necessary
@@ -254,6 +257,7 @@ public class MusicMixin implements MusicInterface {
public static void createKeyspace(String keyspace, int replicationFactor) throws MDBCServiceException {
Map<String,Object> replicationInfo = new HashMap<>();
replicationInfo.put("'class'", "'NetworkTopologyStrategy'");
+
if (ENABLE_NETWORK_TOPOLOGY_STRATEGY && replicationFactor==3) {
replicationInfo.put("'dc1'", 1);
replicationInfo.put("'dc2'", 1);
@@ -673,7 +677,7 @@ public class MusicMixin implements MusicInterface {
}
if(rs!=null) {
for (Row row : rs) {
- set.add(row.getString("TABLE_NAME").toUpperCase());
+ set.add(row.getString("TABLE_NAME"));
}
}
return set;
@@ -1295,15 +1299,13 @@ public class MusicMixin implements MusicInterface {
logger.warn("Trying tcommit log with null partition");
return;
}
+
List<Range> snapshot = partition.getSnapshot();
if(snapshot==null || snapshot.isEmpty()){
logger.warn("Trying to commit log with empty ranges");
return;
}
- // first deal with commit for eventually consistent tables
- filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper);
-
UUID mriIndex = partition.getMRIIndex();
String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex;
//0. See if reference to lock was already created
@@ -1475,7 +1477,7 @@ public class MusicMixin implements MusicInterface {
for(TupleValue t: log){
//final String tableName = t.getString(0);
final UUID id = t.getUUID(1);
- digestIds.add(new MusicTxDigestId(id,index++));
+ digestIds.add(new MusicTxDigestId(partitionIndex,id,index++));
}
List<Range> partitions = new ArrayList<>();
Set<String> tables = newRow.getSet("keys",String.class);
@@ -1526,11 +1528,14 @@ public class MusicMixin implements MusicInterface {
pQueryObject.appendQueryString(cql);
pQueryObject.addValue(baseRange.getTable());
Row newRow;
+ //TODO Change this when music fix the "." problem in the primary key
+ final String table = baseRange.getTable();
+ final String tableWithoutDot = table.replaceAll("\\.","");
try {
- newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,baseRange.getTable(),null);
+ newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,tableWithoutDot,null);
} catch (MDBCServiceException e) {
- logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName);
- throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e);
+ logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName+" trying for table "+tableWithoutDot);
+ throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information for table "+tableWithoutDot, e);
}
return getRangeDependenciesFromCassandraRow(newRow);
}
@@ -1838,7 +1843,6 @@ public class MusicMixin implements MusicInterface {
PreparedQueryObject query = new PreparedQueryObject();
int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR);
-
String cql = String.format("INSERT INTO %s.%s (txid,transactiondigest,compressed,year,txTimeId ) VALUES (?,?,?,?,now());",this.music_ns,
this.musicEventualTxDigestTableName);
query.appendQueryString(cql);
@@ -1856,6 +1860,7 @@ public class MusicMixin implements MusicInterface {
}
}
+
@Override
public StagingTable getTxDigest(MusicTxDigestId id) throws MDBCServiceException {
String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName);
@@ -1883,8 +1888,7 @@ public class MusicMixin implements MusicInterface {
}
return changes;
}
-
-
+
@Override
public LinkedHashMap<UUID, StagingTable> getEveTxDigest(String nodeName) throws MDBCServiceException {
int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR);
@@ -1901,7 +1905,7 @@ public class MusicMixin implements MusicInterface {
LinkedHashMap<UUID, StagingTable> ecDigestInformation = new LinkedHashMap<>();
UUID musicevetxdigestNodeinfoTimeID = getTxTimeIdFromNodeInfo(nodeName);
PreparedQueryObject pQueryObject = new PreparedQueryObject();
-
+
if (musicevetxdigestNodeinfoTimeID != null) {
// this will fetch only few records based on the time-stamp condition.
cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) AND txtimeid > ? LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString());
@@ -1912,7 +1916,7 @@ public class MusicMixin implements MusicInterface {
cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString());
pQueryObject.appendQueryString(cql);
}
-
+
// I need to get a ResultSet of all the records and give each row to the below HashMap.
ResultSet rs = executeMusicRead(pQueryObject);
while (!rs.isExhausted()) {
@@ -1920,8 +1924,8 @@ public class MusicMixin implements MusicInterface {
ByteBuffer digest = row.getBytes("transactiondigest");
Boolean compressed = row.getBool("compressed");
//String txTimeId = row.getString("txtimeid"); //???
- UUID txTimeId = row.getUUID("txtimeid");
-
+ UUID txTimeId = row.getUUID("txtimeid");
+
try {
if(compressed){
digest=StagingTable.Decompress(digest);
@@ -1932,7 +1936,7 @@ public class MusicMixin implements MusicInterface {
throw e;
}
ecDigestInformation.put(txTimeId, changes);
- }
+ }
return ecDigestInformation;
}
@@ -2218,7 +2222,7 @@ public class MusicMixin implements MusicInterface {
* @param cql the CQL to be sent to Cassandra
*/
private static void executeMusicWriteQuery(String keyspace, String table, String cql)
- throws MDBCServiceException {
+ throws MDBCServiceException {
PreparedQueryObject pQueryObject = new PreparedQueryObject();
pQueryObject.appendQueryString(cql);
ResultType rt = null;
@@ -2254,6 +2258,9 @@ public class MusicMixin implements MusicInterface {
throw new MDBCServiceException("Error executing atomic get", e);
}
}
+ if(result==null){
+ throw new MDBCServiceException("Error executing atomic get for primary key: "+primaryKey);
+ }
if(result.isExhausted()){
return null;
}
@@ -2418,11 +2425,17 @@ public class MusicMixin implements MusicInterface {
pQueryObject.addValue(row.getPartitionIndex());
ReturnType rt ;
try {
- rt = MusicCore.atomicPut(music_ns, musicRangeDependencyTableName, row.getPartitionIndex().toString(),
+ rt = MusicCore.atomicPut(music_ns, musicRangeInformationTableName, row.getPartitionIndex().toString(),
pQueryObject, null);
} catch (MusicLockingException|MusicQueryException|MusicServiceException e) {
logger.error("Failure when deleting mri row");
new MDBCServiceException("Error deleting mri row",e);
}
}
+
+ public StateManager getStateManager() {
+ return stateManager;
+ }
+
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
index 6d6e691..15caf3f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
@@ -40,6 +40,7 @@ import org.json.JSONTokener;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.Configuration;
import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
@@ -75,7 +76,7 @@ public class MySQLMixin implements DBInterface {
public static final String TRANS_TBL = "MDBC_TRANSLOG";
private static final String CREATE_TBL_SQL =
"CREATE TABLE IF NOT EXISTS "+TRANS_TBL+
- " (IX INT AUTO_INCREMENT, OP CHAR(1), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA VARCHAR(1024), " +
+ " (IX INT AUTO_INCREMENT, OP CHAR(1), SCHEMANAME VARCHAR(255), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA VARCHAR(1024), " +
"CONNECTION_ID INT, PRIMARY KEY (IX));";
private final MusicInterface mi;
@@ -85,6 +86,10 @@ public class MySQLMixin implements DBInterface {
private final Map<String, TableInfo> tables;
private PreparedStatement deleteStagingStatement;
private boolean server_tbl_created = false;
+ private boolean useAsyncStagingUpdate = false;
+ private Object stagingHandlerLock = new Object();
+ private AsyncUpdateHandler stagingHandler = null;
+ private StagingTable currentStaging=null;
public MySQLMixin() {
this.mi = null;
@@ -100,12 +105,35 @@ public class MySQLMixin implements DBInterface {
this.dbName = getDBName(conn);
this.jdbcConn = conn;
this.tables = new HashMap<String, TableInfo>();
+ useAsyncStagingUpdate = Boolean.parseBoolean(info.getProperty(Configuration.KEY_ASYNC_STAGING_TABLE_UPDATE,
+ Configuration.ASYNC_STAGING_TABLE_UPDATE));
+ }
+
+ class StagingTableUpdateRunnable implements Runnable{
+
+ private MySQLMixin mixin;
+ private StagingTable staging;
+
+ StagingTableUpdateRunnable(MySQLMixin mixin, StagingTable staging){
+ this.mixin=mixin;
+ this.staging=staging;
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.mixin.updateStagingTable(staging);
+ } catch (NoSuchFieldException|MDBCServiceException e) {
+ this.mixin.logger.error("Error when updating the staging table");
+ }
+ }
}
private PreparedStatement getStagingDeletePreparedStatement() throws SQLException {
return jdbcConn.prepareStatement("DELETE FROM "+TRANS_TBL+" WHERE (IX BETWEEN ? AND ? ) AND " +
"CONNECTION_ID = ?;");
}
+
// This is used to generate a unique connId for this connection to the DB.
private int generateConnID(Connection conn) {
int rv = (int) System.currentTimeMillis(); // random-ish
@@ -156,12 +184,20 @@ public class MySQLMixin implements DBInterface {
}
return dbname;
}
-
+
+ @Override
public String getDatabaseName() {
return this.dbName;
}
@Override
+ public String getSchema() {return this.dbName;}
+
+ /**
+ * Get a set of the table names in the database.
+ * @return the set
+ */
+ @Override
public Set<String> getSQLTableSet() {
Set<String> set = new TreeSet<String>();
String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
@@ -183,7 +219,7 @@ public class MySQLMixin implements DBInterface {
@Override
public Set<Range> getSQLRangeSet() {
Set<String> set = new TreeSet<String>();
- String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
+ String sql = "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
try {
Statement stmt = jdbcConn.createStatement();
ResultSet rs = stmt.executeQuery(sql);
@@ -244,9 +280,19 @@ mysql> describe tables;
TableInfo ti = tables.get(tableName);
if (ti == null) {
try {
- String tbl = tableName;//.toUpperCase();
- String sql = "SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME='"+tbl+"'";
- ResultSet rs = executeSQLRead(sql);
+ final String[] split = tableName.split("\\.");
+ String tbl = (split.length==2)?split[1]:tableName;
+ String localSchema = (split.length==2)?split[0]:getSchema();
+ StringBuilder sql=new StringBuilder();
+ sql.append("SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=");
+ if(localSchema==null) {
+ sql.append("DATABASE() AND TABLE_NAME='");
+ }
+ else {
+ sql.append("'").append(localSchema).append("' AND TABLE_NAME='");
+ }
+ sql.append(tbl).append("';");
+ ResultSet rs = executeSQLRead(sql.toString());
if (rs != null) {
ti = new TableInfo();
while (rs.next()) {
@@ -296,9 +342,13 @@ mysql> describe tables;
}
}
@Override
- public void createSQLTriggers(String tableName) {
- // Don't create triggers for the table the triggers write into!!!
+ public void createSQLTriggers(String table) {
+ final String[] split = table.split("\\.");
+ String schemaName = (split.length==2)?split[0]:getSchema();
+ String tableName = (split.length==2)?split[1]:table;
+
if (tableName.equals(TRANS_TBL))
+ // Don't create triggers for the table the triggers write into!!!
return;
try {
if (!server_tbl_created) {
@@ -321,12 +371,12 @@ mysql> describe tables;
//msm.register(name);
}
// No SELECT trigger
- executeSQLWrite(generateTrigger(tableName, "INSERT"));
- executeSQLWrite(generateTrigger(tableName, "UPDATE"));
+ executeSQLWrite(generateTrigger(schemaName,tableName, "INSERT"));
+ executeSQLWrite(generateTrigger(schemaName,tableName, "UPDATE"));
//\TODO: save key row instead of the whole row for delete
- executeSQLWrite(generateTrigger(tableName, "DELETE"));
+ executeSQLWrite(generateTrigger(schemaName,tableName, "DELETE"));
} catch (SQLException e) {
- if (e.getMessage().equals("Trigger already exists")) {
+ if (e.getMessage().equals("Trigger already exists") || e.getMessage().endsWith("already exists")){
//only warn if trigger already exists
logger.warn(EELFLoggerDelegate.applicationLogger, "createSQLTriggers" + e);
} else {
@@ -343,7 +393,7 @@ END;
OLD.field refers to the old value
NEW.field refers to the new value
*/
- private String generateTrigger(String tableName, String op) {
+ private String generateTrigger(String schema, String tableName, String op) {
boolean isdelete = op.equals("DELETE");
boolean isinsert = op.equals("INSERT");
boolean isupdate = op.equals("UPDATE");
@@ -371,25 +421,27 @@ NEW.field refers to the new value
//\TODO check if using mysql driver, so instead check the exception
//\TODO add conditional for update, if primary key is still the same, use null in the KEYDATA col
StringBuilder sb = new StringBuilder()
- .append("CREATE TRIGGER ") // IF NOT EXISTS not supported by MySQL!
- .append(String.format("%s_%s", op.substring(0, 1), tableName))
- .append(" AFTER ")
- .append(op)
- .append(" ON ")
- .append(tableName.toUpperCase())
- .append(" FOR EACH ROW INSERT INTO ")
- .append(TRANS_TBL)
- .append(" (TABLENAME, OP, KEYDATA, ROWDATA, CONNECTION_ID) VALUES('")
- .append(tableName.toUpperCase())
- .append("', ")
- .append(isdelete ? "'D'" : (op.equals("INSERT") ? "'I'" : "'U'"))
- .append(", ")
- .append( (keyJson.length()>"JSON_OBJECT()".length()) ? keyJson.toString() : "NULL")
- .append(", ")
- .append(newJson.toString())
- .append(", ")
- .append("CONNECTION_ID()")
- .append(")");
+ .append("CREATE TRIGGER ") // IF NOT EXISTS not supported by MySQL!
+ .append(String.format("%s_%s", op.substring(0, 1), tableName))
+ .append(" AFTER ")
+ .append(op)
+ .append(" ON ")
+ .append(tableName)
+ .append(" FOR EACH ROW INSERT INTO ")
+ .append(TRANS_TBL)
+ .append(" (SCHEMANAME, TABLENAME, OP, KEYDATA, ROWDATA, CONNECTION_ID) VALUES('")
+ .append( (schema==null)?this.getSchema():schema )
+ .append("', '")
+ .append(tableName)
+ .append("', ")
+ .append(isdelete ? "'D'" : (op.equals("INSERT") ? "'I'" : "'U'"))
+ .append(", ")
+ .append( (keyJson.length()>"JSON_OBJECT()".length()) ? keyJson.toString() : "NULL")
+ .append(", ")
+ .append(newJson.toString())
+ .append(", ")
+ .append("CONNECTION_ID()")
+ .append(")");
return sb.toString();
}
private String[] getTriggerNames(String tableName) {
@@ -521,6 +573,16 @@ NEW.field refers to the new value
return rs;
}
+ @Override
+ public void preCommitHook() {
+ synchronized (stagingHandlerLock){
+ //\TODO check if this can potentially block forever in certain scenarios
+ if(stagingHandler!=null){
+ stagingHandler.waitForAllPendingUpdates();
+ }
+ }
+ }
+
/**
* This method executes a write query in the sql database.
* @param sql the SQL to be sent to MySQL
@@ -569,11 +631,24 @@ NEW.field refers to the new value
String[] parts = sql.trim().split(" ");
String cmd = parts[0].toLowerCase();
if ("delete".equals(cmd) || "insert".equals(cmd) || "update".equals(cmd)) {
- try {
- this.updateStagingTable(transactionDigest);
- } catch (NoSuchFieldException|MDBCServiceException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ if (useAsyncStagingUpdate) {
+ synchronized (stagingHandlerLock){
+ if(stagingHandler==null||currentStaging!=transactionDigest){
+ Runnable newRunnable = new StagingTableUpdateRunnable(this, transactionDigest);
+ currentStaging=transactionDigest;
+ stagingHandler=new AsyncUpdateHandler(newRunnable);
+ }
+ //else we can keep using the current staging Handler
+ }
+ stagingHandler.processNewUpdate();
+ } else {
+
+ try {
+ this.updateStagingTable(transactionDigest);
+ } catch (NoSuchFieldException | MDBCServiceException e) {
+ // TODO Auto-generated catch block
+ this.logger.error("Error updating the staging table");
+ }
}
}
}
@@ -604,7 +679,7 @@ NEW.field refers to the new value
throws NoSuchFieldException, MDBCServiceException {
// copy from DB.MDBC_TRANSLOG where connid == myconnid
// then delete from MDBC_TRANSLOG
- String sql2 = "SELECT IX, TABLENAME, OP, ROWDATA,KEYDATA FROM "+TRANS_TBL +" WHERE CONNECTION_ID = " + this.connId;
+ String sql2 = "SELECT IX, SCHEMANAME, TABLENAME, OP, ROWDATA, KEYDATA FROM " + TRANS_TBL +" WHERE CONNECTION_ID = " + this.connId;
Integer biggestIx = Integer.MIN_VALUE;
Integer smallestIx = Integer.MAX_VALUE;
try {
@@ -616,10 +691,11 @@ NEW.field refers to the new value
smallestIx = Integer.min(smallestIx,ix);
String op = rs.getString("OP");
SQLOperation opType = toOpEnum(op);
+ String schema= rs.getString("SCHEMANAME");
String tbl = rs.getString("TABLENAME");
String newRowStr = rs.getString("ROWDATA");
String rowStr = rs.getString("KEYDATA");
- Range range = new Range(tbl);
+ Range range = new Range(schema+"."+tbl);
transactionDigests.addOperation(range,opType,newRowStr,rowStr);
rows.add(ix);
}
@@ -1056,7 +1132,7 @@ NEW.field refers to the new value
private void clearReplayedOperations(Statement jdbcStmt) throws SQLException {
logger.info("Clearing replayed operations");
String sql = "DELETE FROM " + TRANS_TBL + " WHERE CONNECTION_ID = " + this.connId;
- jdbcStmt.executeQuery(sql);
+ jdbcStmt.executeUpdate(sql);
}
@Override
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java
new file mode 100755
index 0000000..0f66731
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java
@@ -0,0 +1,1066 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+package org.onap.music.mdbc.mixins;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.*;
+import java.util.Map.Entry;
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.delete.Delete;
+import net.sf.jsqlparser.statement.insert.Insert;
+import net.sf.jsqlparser.statement.update.Update;
+import org.json.JSONObject;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.Configuration;
+import org.onap.music.mdbc.MDBCUtils;
+import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.TableInfo;
+import org.onap.music.mdbc.mixins.MySQLMixin.StagingTableUpdateRunnable;
+import org.onap.music.mdbc.tables.Operation;
+import org.onap.music.mdbc.query.SQLOperation;
+import org.onap.music.mdbc.tables.StagingTable;
+import org.postgresql.util.PGInterval;
+import org.postgresql.util.PGobject;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+/**
+ * This class provides the methods that MDBC needs in order to mirror data to/from a
+ * <a href="https://dev.mysql.com/">MySQL</a> or <a href="http://mariadb.org/">MariaDB</a> database instance. This class
+ * uses the <code>JSON_OBJECT()</code> database function, which means it requires the following minimum versions of
+ * either database:
+ * <table summary="">
+ * <tr>
+ * <th>DATABASE</th>
+ * <th>VERSION</th>
+ * </tr>
+ * <tr>
+ * <td>MySQL</td>
+ * <td>5.7.8</td>
+ * </tr>
+ * <tr>
+ * <td>MariaDB</td>
+ * <td>10.2.3 (Note: 10.2.3 is currently (July 2017) a <i>beta</i> release)</td>
+ * </tr>
+ * </table>
+ *
+ * @author Robert P. Eby
+ */
+public class PostgresMixin implements DBInterface {
+
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(PostgresMixin.class);
+
+ public static final String MIXIN_NAME = "postgres";
+ public static final String TRANS_TBL_SCHEMA = "audit";
+ public static final String TRANS_TBL = "mdbc_translog";
+
+ private final MusicInterface mi;
+ private final String connId;
+ private final String dbName;
+ private final String schema;
+ private final Connection jdbcConn;
+ private final Map<String, TableInfo> tables;
+ private PreparedStatement deleteStagingStatement;
+ private boolean useAsyncStagingUpdate = false;
+ private Object stagingHandlerLock = new Object();
+ private AsyncUpdateHandler stagingHandler = null;
+ private StagingTable currentStaging = null;
+
+ public PostgresMixin() {
+ this.mi = null;
+ this.connId = "";
+ this.dbName = null;
+ this.schema = null;
+ this.jdbcConn = null;
+ this.tables = null;
+ this.deleteStagingStatement = null;
+ }
+
+ private void initializeDeleteStatement() throws SQLException {
+ deleteStagingStatement = jdbcConn.prepareStatement("DELETE FROM " + TRANS_TBL_SCHEMA + "." + TRANS_TBL
+ + " WHERE (ix BETWEEN ? AND ? ) AND " + "connection_id = ?;");
+ }
+
+ public PostgresMixin(MusicInterface mi, String url, Connection conn, Properties info) throws SQLException {
+ this.mi = mi;
+ this.connId = generateConnID(conn);
+ this.dbName = getDBName(conn);
+ this.schema = getSchema(conn);
+ this.jdbcConn = conn;
+ this.tables = new HashMap<>();
+ useAsyncStagingUpdate = Boolean.parseBoolean(info.getProperty(Configuration.KEY_ASYNC_STAGING_TABLE_UPDATE,
+ Configuration.ASYNC_STAGING_TABLE_UPDATE));
+ initializePostgresTriggersStructures();
+ initializeDeleteStatement();
+ }
+
+ class StagingTableUpdateRunnable implements Runnable {
+
+ private PostgresMixin mixin;
+ private StagingTable staging;
+
+ StagingTableUpdateRunnable(PostgresMixin mixin, StagingTable staging) {
+ this.mixin = mixin;
+ this.staging = staging;
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.mixin.updateStagingTable(staging);
+ } catch (NoSuchFieldException | MDBCServiceException e) {
+ this.mixin.logger.error("Error when updating the staging table");
+ }
+ }
+ }
+
+
+ private void createTriggerTable() throws SQLException {
+ final String createSchemaSQL = "CREATE SCHEMA IF NOT EXISTS " + TRANS_TBL_SCHEMA + ";";
+ final String revokeCreatePrivilegesSQL = "REVOKE CREATE ON schema " + TRANS_TBL_SCHEMA + " FROM public;";
+ final String createTableSQL = "CREATE TABLE IF NOT EXISTS " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " ("
+ + "ix serial," + "op TEXT NOT NULL CHECK (op IN ('I','D','U'))," + "schema_name text NOT NULL,"
+ + "table_name text NOT NULL," + "original_data json," + "new_data json," + "connection_id text,"
+ + "PRIMARY KEY (connection_id,ix)" + ") WITH (fillfactor=100);";
+ final String revokeSQL = "REVOKE INSERT,UPDATE,DELETE,TRUNCATE,REFERENCES,TRIGGER ON " + TRANS_TBL_SCHEMA + "."
+ + TRANS_TBL + " FROM public;";
+ final String grantSelectSQL = "GRANT SELECT ON " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " TO public;";
+ final String createIndexSQL = "CREATE INDEX IF NOT EXISTS logged_actions_connection_id_idx" + " ON "
+ + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " (connection_id);";
+ Map<String, String> sqlStatements = new LinkedHashMap<String, String>() {
+ {
+ put("create_schema", createSchemaSQL);
+ put("revoke_privileges", revokeCreatePrivilegesSQL);
+ put("create_table", createTableSQL);
+ put("revoke_sql", revokeSQL);
+ put("grant_select", grantSelectSQL);
+ put("create_index", createIndexSQL);
+ }
+ };
+ for (Entry<String, String> query : sqlStatements.entrySet()) {
+ int retryCount = 0;
+ boolean ready = false;
+ while (retryCount < 3 && !ready) {
+ try {
+ Statement statement = jdbcConn.createStatement();
+ statement.executeUpdate(query.getValue());
+ if (!jdbcConn.getAutoCommit()) {
+ jdbcConn.commit();
+ }
+ statement.close();
+ ready = true;
+ } catch (SQLException e) {
+ if (e.getMessage().equalsIgnoreCase("ERROR: tuple concurrently updated") || e.getMessage()
+ .toLowerCase().startsWith("error: duplicate key value violates unique constraint")) {
+ logger.warn("Error creating schema, retrying. for " + query.getKey(), e);
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e1) {
+ }
+ } else {
+ logger.error("Error executing " + query.getKey(), e);
+ throw e;
+ }
+ }
+ retryCount++;
+ }
+ }
+ }
+
+ private String updateTriggerSection() {
+ return "IF (TG_OP = 'UPDATE') THEN\n" + "v_old_data := row_to_json(OLD);\n"
+ + "v_new_data := row_to_json(NEW);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA + "." + TRANS_TBL
+ + " (op,schema_name,table_name,original_data,new_data,connection_id)\n"
+ + "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_old_data,v_new_data,pg_backend_pid());\n"
+ + "RETURN NEW; ";
+ }
+
+ private String insertTriggerSection() {
+ // \TODO add additinoal conditional on change "IF NEW IS DISTINCT FROM OLD THEN"
+ return "IF (TG_OP = 'INSERT') THEN\n" + "v_new_data := row_to_json(NEW);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA
+ + "." + TRANS_TBL + " (op,schema_name,table_name,new_data,connection_id)\n"
+ + "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_new_data,pg_backend_pid());\n"
+ + "RETURN NEW; ";
+ }
+
+ private String deleteTriggerSection() {
+ return "IF (TG_OP = 'DELETE') THEN\n" + "v_old_data := row_to_json(OLD);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA
+ + "." + TRANS_TBL + " (op,schema_name,table_name,original_data,connection_id)\n"
+ + "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_old_data,pg_backend_pid());\n"
+ + "RETURN OLD; ";
+ }
+
+ private String functionName(SQLOperation type) {
+ final String functionName = (type.equals(SQLOperation.UPDATE)) ? "if_updated_func"
+ : (type.equals(SQLOperation.INSERT)) ? "if_inserted_func" : "if_deleted_func";
+ return "audit." + functionName + "()";
+ }
+
+ private void createTriggerFunctions(SQLOperation type) throws SQLException {
+ StringBuilder functionSQL =
+ new StringBuilder("CREATE OR REPLACE FUNCTION " + functionName(type) + " RETURNS TRIGGER AS $body$\n"
+ + "DECLARE\n" + "v_old_data json;\n" + "v_new_data json;\n" + "BEGIN\n");
+ switch (type) {
+ case UPDATE:
+ functionSQL.append(updateTriggerSection());
+ break;
+ case INSERT:
+ functionSQL.append(insertTriggerSection());
+ break;
+ case DELETE:
+ functionSQL.append(deleteTriggerSection());
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid operation type for creation of trigger functions");
+ }
+ functionSQL.append("ELSE\n"
+ + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - Other action occurred: %, at %',TG_OP,now();\n"
+ + "RETURN NULL;\n" + "END IF;\n" + "EXCEPTION\n" + "WHEN data_exception THEN\n"
+ + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [DATA EXCEPTION] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n"
+ + "RETURN NULL;\n" + "WHEN unique_violation THEN\n"
+ + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [UNIQUE] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n"
+ + "RETURN NULL;\n" + "WHEN OTHERS THEN\n"
+ + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [OTHER] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n"
+ + "RETURN NULL;\n" + "END;\n" + "$body$\n" + "LANGUAGE plpgsql\n" + "SECURITY DEFINER\n"
+ + "SET search_path = pg_catalog, audit;");
+ int retryCount = 0;
+ boolean ready = false;
+ while (retryCount < 3 && !ready) {
+ try {
+ executeSQLWrite(functionSQL.toString());
+ ready = true;
+ } catch (SQLException e) {
+ if (e.getMessage().equalsIgnoreCase("ERROR: tuple concurrently updated")) {
+ logger.warn("Error creating schema, retrying. ", e);
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e1) {
+ }
+ } else {
+ logger.error("Error executing creation of trigger function", e);
+ throw e;
+ }
+ }
+ retryCount++;
+ }
+ }
+
+ private void initializePostgresTriggersStructures() throws SQLException {
+ try {
+ createTriggerTable();
+ } catch (SQLException e) {
+ logger.error("Error creating the trigger tables in postgres", e);
+ throw e;
+ }
+ try {
+ createTriggerFunctions(SQLOperation.INSERT);
+ createTriggerFunctions(SQLOperation.UPDATE);
+ createTriggerFunctions(SQLOperation.DELETE);
+ } catch (SQLException e) {
+ logger.error("Error creating the trigger functions in postgres", e);
+ throw e;
+ }
+ }
+
+ // This is used to generate a unique connId for this connection to the DB.
+ private String generateConnID(Connection conn) {
+ String rv = Integer.toString((int) System.currentTimeMillis()); // random-ish
+ try {
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT pg_backend_pid() AS IX");
+ if (rs.next()) {
+ rv = rs.getString("IX");
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "generateConnID: problem generating a connection ID!");
+ }
+ return rv;
+ }
+
+ /**
+ * Get the name of this DBnterface mixin object.
+ *
+ * @return the name
+ */
+ @Override
+ public String getMixinName() {
+ return MIXIN_NAME;
+ }
+
+ @Override
+ public void close() {
+ // nothing yet
+ }
+
+ /**
+ * Determines the db name associated with the connection This is the private/internal method that actually
+ * determines the name
+ *
+ * @param conn
+ * @return
+ */
+ private String getDBName(Connection conn) {
+ String dbname = "mdbc"; // default name
+ try {
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT current_database();");
+ if (rs.next()) {
+ dbname = rs.getString("current_database");
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "getDBName: problem getting database name from mysql");
+ }
+ return dbname;
+ }
+
+ /**
+ * Determines the db name associated with the connection This is the private/internal method that actually
+ * determines the name
+ *
+ * @param conn
+ * @return
+ */
+ private String getSchema(Connection conn) {
+ String schema = "public"; // default name
+ try {
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT current_schema();");
+ if (rs.next()) {
+ schema = rs.getString("current_schema");
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "getDBName: problem getting database name from mysql");
+ }
+ return schema;
+ }
+
+ @Override
+ public String getSchema() {
+ return schema;
+ }
+
+ @Override
+ public String getDatabaseName() {
+ return this.dbName;
+ }
+
+ /**
+ * Get a set of the table names in the database.
+ *
+ * @return the set
+ */
+ @Override
+ public Set<Range> getSQLRangeSet() {
+ Set<String> set = new TreeSet<String>();
+ String sql =
+ "SELECT table_name FROM information_schema.tables WHERE table_type='BASE TABLE' AND table_schema=current_schema();";
+ try {
+ Statement stmt = jdbcConn.createStatement();
+ ResultSet rs = stmt.executeQuery(sql);
+ while (rs.next()) {
+ String s = rs.getString("table_name");
+ set.add(s);
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "getSQLTableSet: " + e);
+ }
+ logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set);
+ Set<Range> rangeSet = new HashSet<>();
+ for (String table : set) {
+ rangeSet.add(new Range(table));
+ }
+ return rangeSet;
+ }
+
+ /**
+ * Return a TableInfo object for the specified table. This method first looks in a cache of previously constructed
+ * TableInfo objects for the table. If not found, it queries the INFORMATION_SCHEMA.COLUMNS table to obtain the
+ * column names, types, and indexes of the table. It creates a new TableInfo object with the results.
+ *
+ * @param tableName the table to look up
+ * @return a TableInfo object containing the info we need, or null if the table does not exist
+ */
+ @Override
+ public TableInfo getTableInfo(String tableName) {
+ // \TODO: invalidate "tables" when a table schema is modified (uncommon), but needs to be handled
+ TableInfo ti = tables.get(tableName);
+ if (ti == null) {
+ try {
+ String tbl, localSchema;
+ final String[] split = tableName.split("\\.");
+ if (split.length == 2) {
+ localSchema = split[0];
+ tbl = split[1];
+ } else {
+ tbl = tableName;
+ localSchema = this.schema;
+ }
+ String sql;
+ if (schema == null) {
+ sql = "select column_name, data_type from information_schema.columns where table_schema=current_schema() and table_name='"
+ + tbl + "';";
+ } else {
+ sql = "select column_name, data_type from information_schema.columns where table_schema='"
+ + localSchema + "' and table_name='" + tbl + "';";
+ }
+ ResultSet rs = executeSQLRead(sql);
+ if (rs != null) {
+ ti = new TableInfo();
+ while (rs.next()) {
+ String name = rs.getString("column_name");
+ String type = rs.getString("data_type");
+ ti.columns.add(name);
+ ti.coltype.add(mapDatatypeNameToType(type));
+ }
+ rs.getStatement().close();
+ } else {
+ logger.error(EELFLoggerDelegate.errorLogger,
+ "Cannot retrieve table info for table " + tableName + " from POSTGRES.");
+ return null;
+ }
+ final String keysql =
+ "SELECT a.attname as column_name FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid"
+ + " AND a.attnum = ANY(i.indkey) WHERE i.indrelid = '" + tbl + "'::regclass "
+ + " AND i.indisprimary;";
+ ResultSet rs2 = executeSQLRead(keysql);
+ Set<String> keycols = new HashSet<>();
+ if (rs2 != null) {
+ while (rs2.next()) {
+ String name = rs2.getString("column_name");
+ keycols.add(name);
+ }
+ rs2.getStatement().close();
+ } else {
+ logger.error(EELFLoggerDelegate.errorLogger,
+ "Cannot retrieve table info for table " + tableName + " from MySQL.");
+ }
+ for (String col : ti.columns) {
+ if (keycols.contains(col)) {
+ ti.iskey.add(true);
+ } else {
+ ti.iskey.add(false);
+ }
+ }
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,
+ "Cannot retrieve table info for table " + tableName + " from MySQL: " + e);
+ return null;
+ }
+ tables.put(tableName, ti);
+ }
+ return ti;
+ }
+
+ // Map Postgres data type names to the java.sql.Types equivalent
+ private int mapDatatypeNameToType(String nm) {
+ switch (nm) {
+ case "character":
+ return Types.CHAR;
+ case "national character":
+ return Types.NCHAR;
+ case "character varying":
+ return Types.VARCHAR;
+ case "national character varying":
+ return Types.NVARCHAR;
+ case "text":
+ return Types.VARCHAR;
+ case "bytea":
+ return Types.BINARY;
+ case "smallint":
+ return Types.SMALLINT;
+ case "integer":
+ return Types.INTEGER;
+ case "bigint":
+ return Types.BIGINT;
+ case "smallserial":
+ return Types.SMALLINT;
+ case "serial":
+ return Types.INTEGER;
+ case "bigserial":
+ return Types.BIGINT;
+ case "real":
+ return Types.REAL;
+ case "double precision":
+ return Types.DOUBLE;
+ case "numeric":
+ return Types.NUMERIC;
+ case "decimal":
+ return Types.DECIMAL;
+ case "date":
+ return Types.DATE;
+ case "time with time zone":
+ return Types.TIME_WITH_TIMEZONE;
+ case "time without time zone":
+ return Types.TIME;
+ case "timestamp without time zone":
+ return Types.TIMESTAMP;
+ case "timestamp with time zone":
+ return Types.TIMESTAMP_WITH_TIMEZONE;
+ case "boolean":
+ return Types.BIT;
+ case "bit":
+ return Types.BIT;
+ case "oid":
+ return Types.BIGINT;
+ case "xml":
+ return Types.SQLXML;
+ case "array":
+ return Types.ARRAY;
+ case "tinyint":
+ return Types.TINYINT;
+ case "uuid":
+ case "money":
+ case "interval":
+ case "bit varying":
+ case "box":
+ case "point":
+ case "lseg":
+ case "path":
+ case "polygon":
+ case "circle":
+ case "json":
+ case "inet":
+ case "cidr":
+ case "macaddr":
+ case "tsvector":
+ case "tsquery":
+ return Types.OTHER;
+ default:
+ logger.error(EELFLoggerDelegate.errorLogger, "unrecognized and/or unsupported data type " + nm);
+ return Types.VARCHAR;
+ }
+ }
+
+ @Override
+ public void createSQLTriggers(String tableName) {
+ // Don't create triggers for the table the triggers write into!!!
+ if (tableName.equals(TRANS_TBL) || tableName.equals(TRANS_TBL_SCHEMA + "." + TRANS_TBL))
+ return;
+ try {
+ // No SELECT trigger
+ executeSQLWrite(generateTrigger(tableName, SQLOperation.INSERT));
+ executeSQLWrite(generateTrigger(tableName, SQLOperation.DELETE));
+ executeSQLWrite(generateTrigger(tableName, SQLOperation.UPDATE));
+ } catch (SQLException e) {
+ if (e.getMessage().trim().endsWith("already exists")) {
+ // only warn if trigger already exists
+ logger.warn(EELFLoggerDelegate.applicationLogger, "createSQLTriggers" + e);
+ } else {
+ logger.error(EELFLoggerDelegate.errorLogger, "createSQLTriggers: " + e);
+ }
+ }
+ }
+
+ private String generateTrigger(String tableName, SQLOperation op) {
+ StringBuilder triggerSql = new StringBuilder("CREATE TRIGGER ").append(getTriggerName(tableName, op))
+ .append(" AFTER " + op + " ON ").append(tableName).append(" FOR EACH ROW EXECUTE PROCEDURE ")
+ .append(functionName(op)).append(';');
+ return triggerSql.toString();
+ }
+
+ private String getTriggerName(String tableName, SQLOperation op) {
+ switch (op) {
+ case DELETE:
+ return "D_" + tableName;
+ case UPDATE:
+ return "U_" + tableName;
+ case INSERT:
+ return "I_" + tableName;
+ default:
+ throw new IllegalArgumentException("Invalid option in trigger operation type");
+ }
+ }
+
+ private String[] getTriggerNames(String tableName) {
+ return new String[] {getTriggerName(tableName, SQLOperation.INSERT),
+ getTriggerName(tableName, SQLOperation.DELETE), getTriggerName(tableName, SQLOperation.UPDATE),};
+ }
+
+ @Override
+ public void dropSQLTriggers(String tableName) {
+ try {
+ for (String name : getTriggerNames(tableName)) {
+ logger.debug(EELFLoggerDelegate.applicationLogger, "REMOVE trigger " + name + " from postgres");
+ executeSQLWrite("DROP TRIGGER IF EXISTS " + name + ";");
+ }
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "dropSQLTriggers: " + e);
+ }
+ }
+
+ @Override
+ public void insertRowIntoSqlDb(String tableName, Map<String, Object> map) {
+ throw new org.apache.commons.lang.NotImplementedException("Function not implemented yet in postgres");
+ }
+
+ @Override
+ public void deleteRowFromSqlDb(String tableName, Map<String, Object> map) {
+ throw new org.apache.commons.lang.NotImplementedException("Function not implemented yet in postgres");
+ }
+
+ /**
+ * This method executes a read query in the SQL database. Methods that call this method should be sure to call
+ * resultset.getStatement().close() when done in order to free up resources.
+ *
+ * @param sql the query to run
+ * @return a ResultSet containing the rows returned from the query
+ */
+ @Override
+ public ResultSet executeSQLRead(String sql) {
+ logger.debug(EELFLoggerDelegate.applicationLogger, "Executing sql read in postgres");
+ logger.debug("Executing SQL read:" + sql);
+ ResultSet rs;
+ try {
+ Statement stmt = jdbcConn.createStatement();
+ rs = stmt.executeQuery(sql);
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "executeSQLRead" + e);
+ return null;
+ }
+ return rs;
+ }
+
+ /**
+ * This method executes a write query in the sql database.
+ *
+ * @param sql the SQL to be sent to MySQL
+ * @throws SQLException if an underlying JDBC method throws an exception
+ */
+ protected void executeSQLWrite(String sql) throws SQLException {
+ logger.debug(EELFLoggerDelegate.applicationLogger, "Executing SQL write:" + sql);
+ Statement stmt = jdbcConn.createStatement();
+ stmt.execute(sql);
+ stmt.close();
+ }
+
+ @Override
+ public void preCommitHook() {
+ synchronized (stagingHandlerLock) {
+ // \TODO check if this can potentially block forever in certain scenarios
+ if (stagingHandler != null) {
+ stagingHandler.waitForAllPendingUpdates();
+ }
+ }
+ }
+
+ /**
+ * Code to be run within the DB driver before a SQL statement is executed. This is where tables can be synchronized
+ * before a SELECT, for those databases that do not support SELECT triggers.
+ *
+ * @param sql the SQL statement that is about to be executed
+ * @return list of keys that will be updated, if they can't be determined afterwards (i.e. sql table doesn't have
+ * primary key)
+ */
+ @Override
+ public void preStatementHook(final String sql) {
+ if (sql == null) {
+ return;
+ }
+ // \TODO: check if anything needs to be executed here for postgres
+ }
+
+ /**
+ * Code to be run within the DB driver after a SQL statement has been executed. This is where remote statement
+ * actions can be copied back to Cassandra/MUSIC.
+ *
+ * @param sql the SQL statement that was executed
+ */
+ @Override
+ public void postStatementHook(final String sql, StagingTable transactionDigest) {
+ if (sql != null) {
+ String[] parts = sql.trim().split(" ");
+ String cmd = parts[0].toLowerCase();
+ if ("delete".equals(cmd) || "insert".equals(cmd) || "update".equals(cmd)) {
+ if (useAsyncStagingUpdate) {
+ synchronized (stagingHandlerLock) {
+ if (stagingHandler == null || currentStaging != transactionDigest) {
+ Runnable newRunnable =
+ new PostgresMixin.StagingTableUpdateRunnable(this, transactionDigest);
+ currentStaging = transactionDigest;
+ stagingHandler = new AsyncUpdateHandler(newRunnable);
+ }
+ // else we can keep using the current staging Handler
+ }
+ stagingHandler.processNewUpdate();
+ } else {
+
+ try {
+ this.updateStagingTable(transactionDigest);
+ } catch (NoSuchFieldException | MDBCServiceException e) {
+ // TODO Auto-generated catch block
+ this.logger.error("Error updating the staging table");
+ }
+ }
+ }
+ }
+ }
+
+ private SQLOperation toOpEnum(String operation) throws NoSuchFieldException {
+ switch (operation.toLowerCase()) {
+ case "i":
+ return SQLOperation.INSERT;
+ case "d":
+ return SQLOperation.DELETE;
+ case "u":
+ return SQLOperation.UPDATE;
+ case "s":
+ return SQLOperation.SELECT;
+ default:
+ logger.error(EELFLoggerDelegate.errorLogger, "Invalid operation selected: [" + operation + "]");
+ throw new NoSuchFieldException("Invalid operation enum");
+ }
+
+ }
+
+ /**
+ * Copy data that is in transaction table into music interface
+ *
+ * @param transactionDigests
+ * @throws NoSuchFieldException
+ */
+ private void updateStagingTable(StagingTable transactionDigests) throws NoSuchFieldException, MDBCServiceException {
+ String selectSql = "select ix, op, schema_name, table_name, original_data,new_data FROM " + TRANS_TBL_SCHEMA
+ + "." + TRANS_TBL + " where connection_id = '" + this.connId + "';";
+ Integer biggestIx = Integer.MIN_VALUE;
+ Integer smallestIx = Integer.MAX_VALUE;
+ try {
+ ResultSet rs = executeSQLRead(selectSql);
+ Set<Integer> rows = new TreeSet<Integer>();
+ while (rs.next()) {
+ int ix = rs.getInt("ix");
+ biggestIx = Integer.max(biggestIx, ix);
+ smallestIx = Integer.min(smallestIx, ix);
+ String op = rs.getString("op");
+ SQLOperation opType = toOpEnum(op);
+ String schema = rs.getString("schema_name");
+ String tbl = rs.getString("table_name");
+ String original = rs.getString("original_data");
+ String newData = rs.getString("new_data");
+ Range range = new Range(schema + "." + tbl);
+ transactionDigests.addOperation(range, opType, newData, original);
+ rows.add(ix);
+ }
+ rs.getStatement().close();
+ if (rows.size() > 0) {
+ logger.debug("Staging delete: Executing with vals [" + smallestIx + "," + biggestIx + "," + this.connId
+ + "]");
+ this.deleteStagingStatement.setInt(1, smallestIx);
+ this.deleteStagingStatement.setInt(2, biggestIx);
+ this.deleteStagingStatement.setString(3, this.connId);
+ this.deleteStagingStatement.execute();
+ }
+ } catch (SQLException e) {
+ logger.warn("Exception in postStatementHook: " + e);
+ e.printStackTrace();
+ }
+ }
+
+
+
+ /**
+ * Update music with data from MySQL table
+ *
+ * @param tableName - name of table to update in music
+ */
+ @Override
+ public void synchronizeData(String tableName) {}
+
+ /**
+ * Return a list of "reserved" names, that should not be used by MySQL client/MUSIC These are reserved for mdbc
+ */
+ @Override
+ public List<String> getReservedTblNames() {
+ ArrayList<String> rsvdTables = new ArrayList<String>();
+ rsvdTables.add(TRANS_TBL_SCHEMA + "." + TRANS_TBL);
+ rsvdTables.add(TRANS_TBL);
+ // Add others here as necessary
+ return rsvdTables;
+ }
+
+ @Override
+ public String getPrimaryKey(String sql, String tableName) {
+ return null;
+ }
+
+ /**
+ * Parse the transaction digest into individual events
+ *
+ * @param transaction - base 64 encoded, serialized digest
+ */
+ public void replayTransaction(StagingTable transaction, List<Range> ranges)
+ throws SQLException, MDBCServiceException {
+ boolean autocommit = jdbcConn.getAutoCommit();
+ jdbcConn.setAutoCommit(false);
+ Statement jdbcStmt = jdbcConn.createStatement();
+ final ArrayList<Operation> opList = transaction.getOperationList();
+
+ for (Operation op : opList) {
+ if (Range.overlaps(ranges, op.getTable())) {
+ try {
+ replayOperationIntoDB(jdbcStmt, op);
+ } catch (SQLException | MDBCServiceException e) {
+ // rollback transaction
+ logger.error("Unable to replay: " + op.getOperationType() + "->" + op.getVal() + "."
+ + "Rolling back the entire digest replay.");
+ jdbcConn.rollback();
+ throw e;
+ }
+ }
+ }
+
+ clearReplayedOperations(jdbcStmt);
+ jdbcConn.commit();
+ jdbcStmt.close();
+ jdbcConn.setAutoCommit(autocommit);
+ }
+
+ @Override
+ public void disableForeignKeyChecks() throws SQLException {
+ Statement disable = jdbcConn.createStatement();
+ disable.execute("SET session_replication_role = 'replica';");
+ disable.closeOnCompletion();
+ }
+
+ @Override
+ public void enableForeignKeyChecks() throws SQLException {
+ Statement enable = jdbcConn.createStatement();
+ enable.execute("SET session_replication_role = 'origin';");
+ enable.closeOnCompletion();
+ }
+
+ @Override
+ public void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException {
+ replayTransaction(txDigest, ranges);
+ }
+
+ /**
+ * Replays operation into database, usually from txDigest
+ *
+ * @param jdbcStmt: Connection used to perform the replay
+ * @param op: operation to be replayed
+ * @throws SQLException
+ * @throws MDBCServiceException
+ */
+ private void replayOperationIntoDB(Statement jdbcStmt, Operation op) throws SQLException, MDBCServiceException {
+ logger.debug("Replaying Operation: " + op.getOperationType() + "->" + op.getVal());
+ JSONObject newVal = op.getVal();
+ JSONObject oldVal = null;
+ try {
+ oldVal = op.getKey();
+ } catch (MDBCServiceException e) {
+ // Ignore exception, in postgres the structure of the operation is different
+ }
+
+
+ TableInfo ti = getTableInfo(op.getTable());
+ final List<String> keyColumns = ti.getKeyColumns();
+
+ // build and replay the queries
+ String sql = constructSQL(op, keyColumns, newVal, oldVal);
+ if (sql == null)
+ return;
+
+ try {
+ logger.debug("Replaying operation: " + sql);
+ int updated = jdbcStmt.executeUpdate(sql);
+
+ if (updated == 0) {
+ // This applies only for replaying transactions involving Eventually Consistent tables
+ logger.warn(
+ "Error Replaying operation: " + sql + "; Replacing insert/replace/viceversa and replaying ");
+
+ buildAndExecuteSQLInverse(jdbcStmt, op, keyColumns, newVal, oldVal);
+ }
+ } catch (SQLException sqlE) {
+ // This applies for replaying transactions involving Eventually Consistent tables
+ // or transactions that replay on top of existing keys
+ logger.warn(
+ "Error Replaying operation: " + sql + ";" + "Replacing insert/replace/viceversa and replaying ");
+
+ buildAndExecuteSQLInverse(jdbcStmt, op, keyColumns, newVal, oldVal);
+
+ }
+ }
+
+ protected void buildAndExecuteSQLInverse(Statement jdbcStmt, Operation op, List<String> keyColumns,
+ JSONObject newVals, JSONObject oldVals) throws SQLException, MDBCServiceException {
+ String sqlInverse = constructSQLInverse(op, keyColumns, newVals, oldVals);
+ if (sqlInverse == null)
+ return;
+ logger.debug("Replaying operation: " + sqlInverse);
+ jdbcStmt.executeUpdate(sqlInverse);
+ }
+
+ protected String constructSQLInverse(Operation op, List<String> keyColumns, JSONObject newVals, JSONObject oldVals)
+ throws MDBCServiceException {
+ String sqlInverse = null;
+ switch (op.getOperationType()) {
+ case INSERT:
+ sqlInverse = constructUpdate(op.getTable(), keyColumns, newVals, oldVals);
+ break;
+ case UPDATE:
+ sqlInverse = constructInsert(op.getTable(), newVals);
+ break;
+ default:
+ break;
+ }
+ return sqlInverse;
+ }
+
+ protected String constructSQL(Operation op, List<String> keyColumns, JSONObject newVals, JSONObject oldVals)
+ throws MDBCServiceException {
+ String sql = null;
+ switch (op.getOperationType()) {
+ case INSERT:
+ sql = constructInsert(op.getTable(), newVals);
+ break;
+ case UPDATE:
+ sql = constructUpdate(op.getTable(), keyColumns, newVals, oldVals);
+ break;
+ case DELETE:
+ sql = constructDelete(op.getTable(), keyColumns, oldVals);
+ break;
+ case SELECT:
+ // no update happened, do nothing
+ break;
+ default:
+ logger.error(op.getOperationType() + "not implemented for replay");
+ }
+ return sql;
+ }
+
+ private String constructDelete(String tableName, List<String> keyColumns, JSONObject oldVals)
+ throws MDBCServiceException {
+ if (oldVals == null) {
+ throw new MDBCServiceException("Trying to update row with an empty old val exception");
+ }
+ StringBuilder sql = new StringBuilder();
+ sql.append("DELETE FROM ");
+ sql.append(tableName + " WHERE ");
+ sql.append(getPrimaryKeyConditional(keyColumns, oldVals));
+ sql.append(";");
+ return sql.toString();
+ }
+
+ private String constructInsert(String tableName, JSONObject newVals) {
+ StringBuilder keys = new StringBuilder();
+ StringBuilder vals = new StringBuilder();
+ String sep = "";
+ for (String col : newVals.keySet()) {
+ keys.append(sep + col);
+ vals.append(sep + "'" + newVals.get(col) + "'");
+ sep = ", ";
+ }
+ StringBuilder sql = new StringBuilder();
+ sql.append("INSERT INTO ").append(tableName + " (").append(keys).append(") VALUES (").append(vals).append(");");
+ return sql.toString();
+ }
+
+ private String constructUpdate(String tableName, List<String> keyColumns, JSONObject newVals, JSONObject oldVals)
+ throws MDBCServiceException {
+ if (oldVals == null) {
+ throw new MDBCServiceException("Trying to update row with an empty old val exception");
+ }
+ StringBuilder sql = new StringBuilder();
+ sql.append("UPDATE ").append(tableName).append(" SET ");
+ String sep = "";
+ for (String key : newVals.keySet()) {
+ sql.append(sep).append(key).append("=\"").append(newVals.get(key)).append("\"");
+ sep = ", ";
+ }
+ sql.append(" WHERE ");
+ sql.append(getPrimaryKeyConditional(keyColumns, oldVals));
+ sql.append(";");
+ return sql.toString();
+ }
+
+ /**
+ * Create an SQL string for AND'ing all of the primary keys
+ *
+ * @param keyColumns list with the name of the columns that are key
+ * @param vals json with the contents of the old row
+ * @return string in the form of PK1=Val1 AND PK2=Val2 AND PK3=Val3
+ */
+ private String getPrimaryKeyConditional(List<String> keyColumns, JSONObject vals) {
+ StringBuilder keyCondStmt = new StringBuilder();
+ String and = "";
+ for (String key : keyColumns) {
+ // We cannot use the default primary key for the sql table and operations
+ if (!key.equals(mi.getMusicDefaultPrimaryKeyName())) {
+ Object val = vals.get(key);
+ keyCondStmt.append(and + key + "=\"" + val + "\"");
+ and = " AND ";
+ }
+ }
+ return keyCondStmt.toString();
+ }
+
+ /**
+ * Cleans out the transaction table, removing the replayed operations
+ *
+ * @param jdbcStmt
+ * @throws SQLException
+ */
+ private void clearReplayedOperations(Statement jdbcStmt) throws SQLException {
+ logger.info("Clearing replayed operations");
+ String sql =
+ "DELETE FROM " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " WHERE CONNECTION_ID = '" + this.connId + "';";
+ jdbcStmt.executeUpdate(sql);
+ }
+
+ @Override
+ public Connection getSQLConnection() {
+ return jdbcConn;
+ }
+
+ @Override
+ public Set<String> getSQLTableSet() {
+ Set<String> set = new TreeSet<String>();
+ String sql =
+ "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=CURRENT_SCHEMA() AND TABLE_TYPE='BASE TABLE'";
+ try {
+ Statement stmt = jdbcConn.createStatement();
+ ResultSet rs = stmt.executeQuery(sql);
+ while (rs.next()) {
+ String s = rs.getString("TABLE_NAME");
+ set.add(s);
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "getSQLTableSet: " + e);
+ System.out.println("getSQLTableSet: " + e);
+ e.printStackTrace();
+ }
+ logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set);
+ System.out.println("getSQLTableSet returning: " + set);
+ return set;
+ }
+
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java
index 86088f9..171ad79 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java
@@ -190,28 +190,6 @@ public class Utils {
}
return list;
}
-
- public static void registerDefaultDrivers() {
- Properties pr = null;
- try {
- pr = new Properties();
- pr.load(Utils.class.getResourceAsStream("/mdbc.properties"));
- }
- catch (IOException e) {
- logger.error("Could not load property file > " + e.getMessage());
- }
-
- @SuppressWarnings("unused")
- List<Class<?>> list = new ArrayList<Class<?>>();
- String drivers = pr.getProperty("DEFAULT_DRIVERS");
- for (String driver: drivers.split("[ ,]")) {
- logger.info(EELFLoggerDelegate.applicationLogger, "Registering jdbc driver '" + driver + "'");
- try {
- @SuppressWarnings("unused")
- Class<?> cl = Class.forName(driver.trim());
- } catch (ClassNotFoundException e) {
- logger.error(EELFLoggerDelegate.errorLogger,"Driver class "+driver+" not found.");
- }
- }
- }
+
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
index 4f3a3bf..5c6fae4 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
@@ -89,9 +89,10 @@ public class MusicTxDigestDaemon implements Runnable {
logger.error("Music interface or DB interface is null in background daemon");
return;
}
+ MdbcConnection conn = null;
while (true) {
try {
- MdbcConnection conn = (MdbcConnection) stateManager.getConnection("daemon");
+ conn = (MdbcConnection) stateManager.getConnection("daemon");
if (conn == null) {
logger.error("Connection created is null in background daemon");
return;
@@ -111,18 +112,20 @@ public class MusicTxDigestDaemon implements Runnable {
}
//2) for each partition I don't own
final Set<Range> warmupRanges = stateManager.getRangesToWarmup();
- final List<DatabasePartition> currentPartitions = stateManager.getPartitions();
- List<Range> missingRanges = new ArrayList<>();
- if (currentPartitions.size() != 0) {
- for (DatabasePartition part : currentPartitions) {
- List<Range> partitionRanges = part.getSnapshot();
- warmupRanges.removeAll(partitionRanges);
- }
- try {
- stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
- } catch (MDBCServiceException e) {
- logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
- continue;
+ if (warmupRanges!=null) {
+ final List<DatabasePartition> currentPartitions = stateManager.getPartitions();
+ List<Range> missingRanges = new ArrayList<>();
+ if (currentPartitions.size() != 0) {
+ for (DatabasePartition part : currentPartitions) {
+ List<Range> partitionRanges = part.getSnapshot();
+ warmupRanges.removeAll(partitionRanges);
+ }
+ try {
+ stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
+ } catch (MDBCServiceException e) {
+ logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
+ continue;
+ }
}
}
@@ -138,6 +141,12 @@ public class MusicTxDigestDaemon implements Runnable {
} catch (InterruptedException | SQLException e) {
logger.error("MusicTxDigest background daemon stopped " + e.getMessage(), e);
Thread.currentThread().interrupt();
+ } finally {
+ try {
+ if (conn!=null && !conn.isClosed()) conn.close();
+ } catch (SQLException e) {
+ logger.error("MusicTxDigest background daemon error closing" + e.getMessage(), e);
+ }
}
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
index 8fa49a9..db9e455 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
@@ -22,10 +22,18 @@ package org.onap.music.mdbc.tables;
import java.util.UUID;
public final class MusicTxDigestId {
+ public final UUID mriId;
public final UUID transactionId;
public final int index;
+ public MusicTxDigestId(UUID mriRowId, UUID digestId, int index) {
+ this.mriId=mriRowId;
+ this.transactionId= digestId;
+ this.index=index;
+ }
+
public MusicTxDigestId(UUID digestId, int index) {
+ this.mriId = null;
this.transactionId= digestId;
this.index=index;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
index d406ad3..3258d0f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
@@ -66,7 +66,7 @@ public final class Operation implements Serializable{
}
public JSONObject getKey() throws MDBCServiceException {
- if(KEY==null){
+ if(KEY==null||KEY.isEmpty()){
throw new MDBCServiceException("This operation ["+TYPE.toString()+"] doesn't contain a key");
}
JSONObject keys = new JSONObject(new JSONTokener(KEY));
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
index 4fdd7ac..4ef9d30 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
@@ -186,7 +186,10 @@ public class StagingTable {
}
OpType newType = (type==SQLOperation.INSERT)?OpType.INSERT:(type==SQLOperation.DELETE)?
OpType.DELETE:OpType.UPDATE;
- Row.Builder rowBuilder = Row.newBuilder().setTable(range.getTable()).setType(newType).setVal(newVal);
+ Row.Builder rowBuilder = Row.newBuilder().setTable(range.getTable()).setType(newType);
+ if(newVal!=null) {
+ rowBuilder.setVal(newVal);
+ }
if(keys!=null){
rowBuilder.setKey(keys);
}
@@ -252,10 +255,10 @@ public class StagingTable {
}
synchronized public boolean isEventualEmpty() {
- return (eventuallyBuilder.getRowsCount()==0);
+ return (eventuallyBuilder!=null) && (eventuallyBuilder.getRowsCount()==0);
}
-
- synchronized public void clear() throws MDBCServiceException {
+
+ synchronized public void clear() throws MDBCServiceException {
if(!builderInitialized){
throw new MDBCServiceException("This type of staging table is unmutable, please use the constructor"
+ "with no parameters");
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java
index 7624201..f6239d1 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java
@@ -24,6 +24,8 @@ import org.onap.music.mdbc.configurations.NodeConfiguration;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.UUID;
public class CreatePartition {
@@ -51,7 +53,9 @@ public class CreatePartition {
}
public void convert(){
- config = new NodeConfiguration(tables, eventual,UUID.fromString(mriIndex),"test","");
+ String[] tablesArray=eventual.split(",");
+ ArrayList<String> eventualTables = (new ArrayList<>(Arrays.asList(tablesArray)));
+ config = new NodeConfiguration(tables, eventualTables,UUID.fromString(mriIndex),"test","");
}
public void saveToFile(){
diff --git a/mdbc-server/src/main/resources/mdbc.properties b/mdbc-server/src/main/resources/mdbc.properties
index 73e8f77..49fdfd2 100755
--- a/mdbc-server/src/main/resources/mdbc.properties
+++ b/mdbc-server/src/main/resources/mdbc.properties
@@ -4,10 +4,15 @@
MIXINS= \
org.onap.music.mdbc.mixins.MySQLMixin \
org.onap.music.mdbc.mixins.MusicMixin \
- org.onap.music.mdbc.mixins.Music2Mixin
+ org.onap.music.mdbc.mixins.Music2Mixin \
+ org.onap.music.mdbc.mixins.PostgresMixin
+
+DEFAULT_DB_MIXIN= mysql
+
+DEFAULT_MUSIC_MIXIN= cassandra2
DEFAULT_DRIVERS=\
- org.h2.Driver \
- com.mysql.jdbc.Driver
+ org.mariadb.jdbc.Driver \
+ org.postgresql.Driver
-txdaemonsleeps=15 \ No newline at end of file
+txdaemonsleeps=15
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java
index 4703d0e..87f8445 100755
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java
@@ -36,7 +36,7 @@ public class MDBCUtilsTest {
public void toStringTest1() {
StagingTable table = new StagingTable();
try {
- table.addOperation(new Range("TABLE1"),SQLOperation.INSERT,(new JSONObject(new String[]{"test3", "Test4"})).toString(),null);
+ table.addOperation(new Range("TEST.TABLE1"),SQLOperation.INSERT,(new JSONObject(new String[]{"test3", "Test4"})).toString(),null);
} catch (MDBCServiceException e) {
fail();
}
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java
new file mode 100644
index 0000000..72ec8d3
--- /dev/null
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java
@@ -0,0 +1,280 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import ch.vorburger.exec.ManagedProcessException;
+import ch.vorburger.mariadb4j.DB;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.opentable.db.postgres.embedded.EmbeddedPostgres;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+import javax.sql.DataSource;
+import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.onap.music.datastore.MusicDataStore;
+import org.onap.music.datastore.MusicDataStoreHandle;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.lockingservice.cassandra.CassaLockStore;
+import org.onap.music.mdbc.mixins.MusicMixin;
+import org.onap.music.mdbc.mixins.PostgresMixin;
+
+public class MdbcTestUtils {
+
+ // Postgres variables
+ static EmbeddedPostgres pg=null;
+ static DataSource postgresDatabase=null;
+ final private static int postgresPort = 13307;
+ @Rule
+ public static TemporaryFolder tf = new TemporaryFolder();
+
+ // Cassandra variables
+ //Properties used to connect to music
+ private static Cluster cluster;
+ private static Session session;
+
+ //Mdbc variables
+ final private static String keyspace="metricmusictest";
+ final private static String mdbcServerName = "name";
+ final private static String mtdTableName = "musictxdigest";
+ final private static String eventualMtxdTableName = "musicevetxdigest";
+ final private static String mriTableName = "musicrangeinformation";
+ final private static String rangeDependencyTableName = "musicrangedependency";
+ final private static String nodeInfoTableName = "nodeinfo";
+ //Mariadb variables
+ static DB db=null;
+ final public static String mariaDBDatabaseName="test";
+ final static Integer mariaDbPort=13306;
+
+
+
+ public enum DBType {POSTGRES, MySQL}
+
+ public static String getCassandraUrl(){
+ return cluster.getMetadata().getAllHosts().iterator().next().getAddress().toString();
+
+ }
+
+ public static String getKeyspace(){
+ return keyspace;
+ }
+
+ public static String getServerName(){
+ return mdbcServerName;
+ }
+
+ public static String getMriTableName(){
+ return mriTableName;
+ }
+
+ public static String getMariaDbPort() {
+ return mariaDbPort.toString();
+ }
+
+ public static String getMariaDBDBName(){
+ return mariaDBDatabaseName;
+ }
+
+ static Connection getPostgreConnection() {
+ startPostgres();
+ Connection conn=null;
+ try
+ {
+ conn = postgresDatabase.getConnection();
+ } catch(SQLException e){
+ e.printStackTrace();
+ fail();
+ }
+ return conn;
+ }
+
+ static synchronized public void startPostgres(){
+ if(pg==null) {
+ try {
+ tf.create();
+ pg = EmbeddedPostgres.builder().setPort(postgresPort).setDataDirectory(tf.newFolder("tmp")+"/data-dir").start();
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+ if(postgresDatabase==null) {
+ postgresDatabase = pg.getPostgresDatabase();
+ }
+ }
+
+ static public String getPostgresUrl(){
+ return getPostgresUrlWithoutDb()+"/postgres";
+ }
+
+ static public String getPostgresUrlWithoutDb(){
+ return "jdbc:postgresql://localhost:"+Integer.toString(postgresPort);
+ }
+
+ synchronized static Connection getMariadbConnection(){
+ startMariaDb();
+ Connection conn = null;
+ try {
+ conn = DriverManager
+ .getConnection(getMariadbUrlWithoutDatabase()+"/"+mariaDBDatabaseName, "root", "");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail("Error creating mdbc connection");
+ }
+ return conn;
+ }
+
+ public synchronized static void startMariaDb(){
+ if (db == null) {
+ try {
+ db=DB.newEmbeddedDB(mariaDbPort);
+ db.start();
+ db.createDB(mariaDBDatabaseName);
+ } catch (ManagedProcessException e) {
+ e.printStackTrace();
+ fail("error initializing embedded mariadb");
+ }
+ }
+ }
+
+ static String getMariadbUrlWithoutDatabase(){
+ return "jdbc:mariadb://localhost:"+Integer.toString(mariaDbPort);
+ }
+
+ public static Connection getConnection(DBType type){
+ switch(type){
+ case MySQL:
+ return getMariadbConnection();
+ case POSTGRES:
+ return getPostgreConnection();
+ default:
+ fail("Wrong type for creating connection");
+ }
+ return null;
+ }
+
+ synchronized static void stopPostgres(){
+ postgresDatabase=null;
+ if(pg!=null) {
+ try {
+ pg.close();
+ pg=null;
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Error closing postgres database");
+ }
+ }
+ if(tf!=null){
+ tf.delete();
+ }
+ }
+
+ static void stopMySql(){
+ try {
+ db.stop();
+ } catch (ManagedProcessException e) {
+ e.printStackTrace();
+ fail("Error closing mysql");
+ }
+ }
+
+ public static void cleanDatabase(DBType type){
+ switch(type) {
+ case MySQL:
+ stopMySql();
+ break;
+ case POSTGRES:
+ stopPostgres();
+ break;
+ default:
+ fail("Wrong type for creating connection");
+ }
+ }
+
+ public static void initCassandra(){
+ try {
+ EmbeddedCassandraServerHelper.startEmbeddedCassandra(EmbeddedCassandraServerHelper.CASSANDRA_RNDPORT_YML_FILE);
+ } catch (Exception e) {
+ System.out.println(e);
+ fail("Error starting embedded cassandra");
+ }
+ cluster=EmbeddedCassandraServerHelper.getCluster();
+ //cluster = new Cluster.Builder().addContactPoint(cassaHost).withPort(9142).build();
+ cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(20000);
+ assertNotNull("Invalid configuration for cassandra", cluster);
+ session = EmbeddedCassandraServerHelper.getSession();
+ assertNotNull("Invalid configuration for cassandra", session);
+
+ MusicDataStoreHandle.mDstoreHandle = new MusicDataStore(cluster, session);
+ CassaLockStore store = new CassaLockStore(MusicDataStoreHandle.mDstoreHandle);
+ assertNotNull("Invalid configuration for music", store);
+ }
+
+ public static void stopCassandra(){
+ try {
+ EmbeddedCassandraServerHelper.cleanEmbeddedCassandra();
+ }
+ catch(NullPointerException e){
+ }
+ }
+
+ public static Session getSession(){
+ return session;
+ }
+
+ public static MusicMixin getMusicMixin() throws MDBCServiceException {
+ initNamespaces();
+ initTables();
+ MusicMixin mixin=null;
+ try {
+ Properties properties = new Properties();
+ properties.setProperty(MusicMixin.KEY_MY_ID,MdbcTestUtils.getServerName());
+ properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,MdbcTestUtils.getKeyspace());
+ properties.setProperty(MusicMixin.KEY_MUSIC_RFACTOR,"1");
+ properties.setProperty(MusicMixin.KEY_MUSIC_ADDRESS,MdbcTestUtils.getCassandraUrl());
+ mixin =new MusicMixin(null, MdbcTestUtils.getServerName(),properties);
+ } catch (MDBCServiceException e) {
+ fail("error creating music mixin");
+ }
+ return mixin;
+ }
+
+ public static void initNamespaces() throws MDBCServiceException{
+ MusicMixin.createKeyspace("music_internal",1);
+ MusicMixin.createKeyspace(keyspace,1);
+ }
+
+ public static void initTables() throws MDBCServiceException{
+ MusicMixin.createMusicRangeInformationTable(keyspace, mriTableName);
+ MusicMixin.createMusicTxDigest(mtdTableName,keyspace, -1);
+ MusicMixin.createMusicEventualTxDigest(eventualMtxdTableName,keyspace, -1);
+ MusicMixin.createMusicNodeInfoTable(nodeInfoTableName,keyspace,-1);
+ MusicMixin.createMusicRangeDependencyTable(keyspace,rangeDependencyTableName);
+ }
+
+}
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java
index 2d31939..862e600 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java
@@ -20,12 +20,14 @@
package org.onap.music.mdbc;
+import java.util.Properties;
import org.junit.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
+import org.onap.music.mdbc.MdbcTestUtils.DBType;
import org.onap.music.mdbc.mixins.MySQLMixin;
import ch.vorburger.mariadb4j.DB;
@@ -33,8 +35,8 @@ import ch.vorburger.mariadb4j.DB;
public class MySQLMixinTest {
public static final String DATABASE = "mdbctest";
- public static final String TABLE= "Persons";
- public static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS " + TABLE + " (\n" +
+ public static final String TABLE= MdbcTestUtils.getMariaDBDBName();
+ public static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS " + MdbcTestUtils.getMariaDBDBName()+ " (\n" +
" PersonID int,\n" +
" LastName varchar(255),\n" +
" FirstName varchar(255),\n" +
@@ -52,10 +54,8 @@ public class MySQLMixinTest {
@BeforeClass
public static void init() throws Exception {
Class.forName("org.mariadb.jdbc.Driver");
- //start embedded mariadb
- DB db = DB.newEmbeddedDB(13306);
- db.start();
- db.createDB(DATABASE);
+ MdbcTestUtils.startMariaDb();
+
}
@AfterClass
@@ -65,13 +65,14 @@ public class MySQLMixinTest {
@Before
public void beforeTest() throws SQLException {
- this.conn = DriverManager.getConnection("jdbc:mariadb://localhost:13306/"+DATABASE, "root", "");
- this.mysqlMixin = new MySQLMixin(null, "localhost:13306/"+DATABASE, conn, null);
+ this.conn = MdbcTestUtils.getConnection(DBType.MySQL);
+ Properties info = new Properties();
+ this.mysqlMixin = new MySQLMixin(null, null, conn, info);
}
@Test
public void testGetDataBaseName() throws SQLException {
- Assert.assertEquals(DATABASE, mysqlMixin.getDatabaseName());
+ Assert.assertEquals(MdbcTestUtils.getMariaDBDBName(), mysqlMixin.getDatabaseName());
}
}
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java
index 41a943e..c8d284e 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java
@@ -31,23 +31,35 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Consumer;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.Assert.*;
+
+import com.datastax.driver.core.Session;
+
+import java.util.*;
+
+
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
+
import org.onap.music.datastore.MusicDataStore;
import org.onap.music.datastore.MusicDataStoreHandle;
+
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.exceptions.MusicLockingException;
import org.onap.music.exceptions.MusicQueryException;
import org.onap.music.exceptions.MusicServiceException;
-import org.onap.music.lockingservice.cassandra.CassaLockStore;
import org.onap.music.lockingservice.cassandra.MusicLockState;
import org.onap.music.main.MusicCore;
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
+
import org.onap.music.mdbc.StateManager;
import org.onap.music.mdbc.proto.ProtoDigest.Digest.CompleteDigest;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
@@ -57,37 +69,25 @@ import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.google.protobuf.InvalidProtocolBufferException;
+import org.onap.music.mdbc.MdbcTestUtils;
+import org.onap.music.mdbc.TestUtils;
+import org.onap.music.mdbc.ownership.Dag;
+import org.onap.music.mdbc.ownership.DagNode;
+import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+
public class MusicMixinTest {
- final private static String keyspace="metricmusictest";
- final private static String mriTableName = "musicrangeinformation";
- final private static String mtdTableName = "musictxdigest";
- final private static String mdbcServerName = "name";
+
//Properties used to connect to music
- private static Cluster cluster;
private static Session session;
- private static String cassaHost = "localhost";
private static MusicMixin mixin = null;
private StateManager stateManager;
@BeforeClass
- public static void init() throws MusicServiceException {
- try {
- EmbeddedCassandraServerHelper.startEmbeddedCassandra();
- } catch (Exception e) {
- System.out.println(e);
- }
- cluster=EmbeddedCassandraServerHelper.getCluster();
- //cluster = new Cluster.Builder().addContactPoint(cassaHost).withPort(9142).build();
- cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(20000);
- assertNotNull("Invalid configuration for cassandra", cluster);
- session = EmbeddedCassandraServerHelper.getSession();
- assertNotNull("Invalid configuration for cassandra", session);
+ public static void init() throws MDBCServiceException {
+ MdbcTestUtils.initCassandra();
- MusicDataStoreHandle.mDstoreHandle = new MusicDataStore(cluster, session);
- CassaLockStore store = new CassaLockStore(MusicDataStoreHandle.mDstoreHandle);
- assertNotNull("Invalid configuration for music", store);
}
@AfterClass
@@ -102,59 +102,52 @@ public class MusicMixinTest {
}
@Before
- public void initTest(){
- session.execute("DROP KEYSPACE IF EXISTS "+keyspace);
+ public void initTest() throws MDBCServiceException {
+ session = MdbcTestUtils.getSession();
+ session.execute("DROP KEYSPACE IF EXISTS "+ MdbcTestUtils.getKeyspace());
+ mixin=MdbcTestUtils.getMusicMixin();
+ }
+
+ //@Test(timeout=10000)
+ @Ignore // TODO: Move ownership tests to OwnershipAndCheckpointTest
+ @Test
+ public void own() {
+ Range range = new Range("TEST.TABLE1");
+ List<Range> ranges = new ArrayList<>();
+ ranges.add(range);
+ DatabasePartition partition=null;
try {
- Properties properties = new Properties();
- properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,keyspace);
- properties.setProperty(MusicMixin.KEY_MY_ID,mdbcServerName);
- properties.setProperty(MusicMixin.KEY_COMPRESSION, Boolean.toString(true));
- mixin=new MusicMixin(stateManager, mdbcServerName,properties);
- } catch (MDBCServiceException e) {
- fail("error creating music mixin");
+ partition = TestUtils.createBasicRow(range, mixin, MdbcTestUtils.getServerName());
+ }
+ catch(Exception e){
+ fail("fail to create partition");
+ }
+ try {
+ TestUtils.unlockRow(MdbcTestUtils.getKeyspace(),MdbcTestUtils.getMriTableName(),partition);
+ } catch (MusicLockingException e) {
+ fail(e.getMessage());
}
+ DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey());
+ try {
+ mixin.getStateManager().getOwnAndCheck().own(mixin,ranges,currentPartition, MDBCUtils.generateTimebasedUniqueKey());
+ } catch (MDBCServiceException e) {
+ fail("failure when running own function");
+ }
}
- //Own has been removed from musicMixin
-// @Test(timeout=10000)
-// public void own() {
-// Range range = new Range("TABLE1");
-// List<Range> ranges = new ArrayList<>();
-// ranges.add(range);
-// DatabasePartition partition=null;
-// try {
-// partition = TestUtils.createBasicRow(range, mixin, mdbcServerName);
-// }
-// catch(Exception e){
-// fail(e.getMessage());
-// }
-// try {
-// TestUtils.unlockRow(keyspace,mriTableName,partition);
-// } catch (MusicLockingException e) {
-// fail(e.getMessage());
-// }
-//
-// DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey());
-// try {
-// mixin.own(ranges,currentPartition, MDBCUtils.generateTimebasedUniqueKey());
-// } catch (MDBCServiceException e) {
-// fail("failure when running own function");
-// }
-// }
-
private DatabasePartition addRow(List<Range> ranges,boolean isLatest){
final UUID uuid = MDBCUtils.generateTimebasedUniqueKey();
DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null);
MusicRangeInformationRow newRow = new MusicRangeInformationRow(uuid,dbPartition, new ArrayList<>(), "",
- mdbcServerName, isLatest);
+ MdbcTestUtils.getServerName(), isLatest);
DatabasePartition partition=null;
try {
partition = mixin.createMusicRangeInformation(newRow);
} catch (MDBCServiceException e) {
fail("failure when creating new row");
}
- String fullyQualifiedMriKey = keyspace+"."+ mriTableName+"."+partition.getMRIIndex().toString();
+ String fullyQualifiedMriKey = MdbcTestUtils.getKeyspace()+"."+ MdbcTestUtils.getMriTableName()+"."+partition.getMRIIndex().toString();
try {
MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId());
} catch (MusicLockingException e) {
@@ -163,81 +156,81 @@ public class MusicMixinTest {
return partition;
}
- //Own has been removed from musicMixin
-// @Test(timeout=10000)
-// public void own2() throws InterruptedException, MDBCServiceException {
-// List<Range> range12 = new ArrayList<>( Arrays.asList(
-// new Range("RANGE1"),
-// new Range("RANGE2")
-// ));
-// List<Range> range34 = new ArrayList<>( Arrays.asList(
-// new Range("RANGE3"),
-// new Range("RANGE4")
-// ));
-// List<Range> range24 = new ArrayList<>( Arrays.asList(
-// new Range("RANGE2"),
-// new Range("RANGE4")
-// ));
-// List<Range> range123 = new ArrayList<>( Arrays.asList(
-// new Range("RANGE1"),
-// new Range("RANGE2"),
-// new Range("RANGE3")
-// ));
-// DatabasePartition db1 = addRow(range12, false);
-// DatabasePartition db2 = addRow(range34, false);
-// MILLISECONDS.sleep(10);
-// DatabasePartition db3 = addRow(range12, true);
-// DatabasePartition db4 = addRow(range34, true);
-// MILLISECONDS.sleep(10);
-// DatabasePartition db5 = addRow(range24, true);
-// DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey());
-// MusicInterface.OwnershipReturn own = null;
-// try {
-// own = mixin.own(range123, currentPartition, MDBCUtils.generateTimebasedUniqueKey());
-// } catch (MDBCServiceException e) {
-// fail("failure when running own function");
-// }
-// Dag dag = own.getDag();
-//
-// DagNode node4 = dag.getNode(db4.getMRIIndex());
-// assertFalse(node4.hasNotIncomingEdges());
-// List<DagNode> outgoingEdges = new ArrayList<>(node4.getOutgoingEdges());
-// assertEquals(1,outgoingEdges.size());
-//
-// DagNode missing = outgoingEdges.get(0);
-// Set<Range> missingRanges = missing.getRangeSet();
-// assertEquals(2,missingRanges.size());
-// assertTrue(missingRanges.contains(new Range("RANGE1")));
-// assertTrue(missingRanges.contains(new Range("RANGE3")));
-// List<DagNode> outgoingEdges1 = missing.getOutgoingEdges();
-// assertEquals(1,outgoingEdges1.size());
-//
-// DagNode finalNode = outgoingEdges1.get(0);
-// assertFalse(finalNode.hasNotIncomingEdges());
-// Set<Range> finalSet = finalNode.getRangeSet();
-// assertEquals(3,finalSet.size());
-// assertTrue(finalSet.contains(new Range("RANGE1")));
-// assertTrue(finalSet.contains(new Range("RANGE2")));
-// assertTrue(finalSet.contains(new Range("RANGE3")));
-//
-// DagNode node5 = dag.getNode(db5.getMRIIndex());
-// List<DagNode> toRemoveOutEdges = node5.getOutgoingEdges();
-// assertEquals(1,toRemoveOutEdges.size());
-// toRemoveOutEdges.remove(finalNode);
-// assertEquals(0,toRemoveOutEdges.size());
-//
-// MusicRangeInformationRow row = mixin.getMusicRangeInformation(own.getRangeId());
-// assertTrue(row.getIsLatest());
-// DatabasePartition dbPartition = row.getDBPartition();
-// List<Range> snapshot = dbPartition.getSnapshot();
-// assertEquals(3,snapshot.size());
-// MusicRangeInformationRow node5row = mixin.getMusicRangeInformation(node5.getId());
-// assertFalse(node5row.getIsLatest());
-// MusicRangeInformationRow node4Row = mixin.getMusicRangeInformation(db4.getMRIIndex());
-// assertFalse(node4Row.getIsLatest());
-// MusicRangeInformationRow node3Row = mixin.getMusicRangeInformation(db3.getMRIIndex());
-// assertFalse(node3Row.getIsLatest());
-// }
+ @Ignore // TODO: Move ownership tests to OwnershipAndCheckpointTest
+ @Test(timeout=1000)
+ public void own2() throws InterruptedException, MDBCServiceException {
+ List<Range> range12 = new ArrayList<>( Arrays.asList(
+ new Range("TEST.RANGE1"),
+ new Range("TEST.RANGE2")
+ ));
+ List<Range> range34 = new ArrayList<>( Arrays.asList(
+ new Range("TEST.RANGE3"),
+ new Range("TEST.RANGE4")
+ ));
+ List<Range> range24 = new ArrayList<>( Arrays.asList(
+ new Range("TEST.RANGE2"),
+ new Range("TEST.RANGE4")
+ ));
+ List<Range> range123 = new ArrayList<>( Arrays.asList(
+ new Range("TEST.RANGE1"),
+ new Range("TEST.RANGE2"),
+ new Range("TEST.RANGE3")
+ ));
+ DatabasePartition db1 = addRow(range12, false);
+ DatabasePartition db2 = addRow(range34, false);
+ MILLISECONDS.sleep(10);
+ DatabasePartition db3 = addRow(range12, true);
+ DatabasePartition db4 = addRow(range34, true);
+ MILLISECONDS.sleep(10);
+ DatabasePartition db5 = addRow(range24, true);
+ DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey());
+ MusicInterface.OwnershipReturn own = null;
+ try {
+ own = mixin.getStateManager().getOwnAndCheck().own(mixin,range123, currentPartition, MDBCUtils.generateTimebasedUniqueKey());
+ } catch (MDBCServiceException e) {
+ fail("failure when running own function");
+ }
+ Dag dag = own.getDag();
+
+ DagNode node4 = dag.getNode(db4.getMRIIndex());
+ assertFalse(node4.hasNotIncomingEdges());
+ List<DagNode> outgoingEdges = new ArrayList<>(node4.getOutgoingEdges());
+ assertEquals(1,outgoingEdges.size());
+
+ DagNode missing = outgoingEdges.get(0);
+ Set<Range> missingRanges = missing.getRangeSet();
+ assertEquals(2,missingRanges.size());
+ assertTrue(missingRanges.contains(new Range("TEST.RANGE1")));
+ assertTrue(missingRanges.contains(new Range("TEST.RANGE3")));
+ List<DagNode> outgoingEdges1 = missing.getOutgoingEdges();
+ assertEquals(1,outgoingEdges1.size());
+
+ DagNode finalNode = outgoingEdges1.get(0);
+ assertFalse(finalNode.hasNotIncomingEdges());
+ Set<Range> finalSet = finalNode.getRangeSet();
+ assertEquals(3,finalSet.size());
+ assertTrue(finalSet.contains(new Range("TEST.RANGE1")));
+ assertTrue(finalSet.contains(new Range("TEST.RANGE2")));
+ assertTrue(finalSet.contains(new Range("TEST.RANGE3")));
+
+ DagNode node5 = dag.getNode(db5.getMRIIndex());
+ List<DagNode> toRemoveOutEdges = node5.getOutgoingEdges();
+ assertEquals(1,toRemoveOutEdges.size());
+ toRemoveOutEdges.remove(finalNode);
+ assertEquals(0,toRemoveOutEdges.size());
+
+ MusicRangeInformationRow row = mixin.getMusicRangeInformation(own.getRangeId());
+ assertTrue(row.getIsLatest());
+ DatabasePartition dbPartition = row.getDBPartition();
+ List<Range> snapshot = dbPartition.getSnapshot();
+ assertEquals(3,snapshot.size());
+ MusicRangeInformationRow node5row = mixin.getMusicRangeInformation(node5.getId());
+ assertFalse(node5row.getIsLatest());
+ MusicRangeInformationRow node4Row = mixin.getMusicRangeInformation(db4.getMRIIndex());
+ assertFalse(node4Row.getIsLatest());
+ MusicRangeInformationRow node3Row = mixin.getMusicRangeInformation(db3.getMRIIndex());
+ assertFalse(node3Row.getIsLatest());
+ }
@Test
public void relinquish() {
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java
new file mode 100644
index 0000000..2134a79
--- /dev/null
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java
@@ -0,0 +1,220 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc.mixins;
+
+import static org.junit.Assert.*;
+
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.mdbc.MdbcTestUtils;
+import org.onap.music.mdbc.MdbcTestUtils.DBType;
+import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.TableInfo;
+import org.onap.music.mdbc.tables.StagingTable;
+
+public class PostgresMixinTest {
+ final private static String keyspace="metricmusictest";
+ final private static String mdbcServerName = "name";
+
+ static PostgresMixin mixin;
+
+ private static MusicMixin mi = null;
+ private static Connection conn;
+
+ @BeforeClass
+ public static void init() throws MDBCServiceException {
+ MdbcTestUtils.initCassandra();
+ mi=MdbcTestUtils.getMusicMixin();
+ try {
+ conn = MdbcTestUtils.getConnection(DBType.POSTGRES);
+ Properties info = new Properties();
+ mixin = new PostgresMixin(mi, null, conn, info);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @AfterClass
+ public static void close(){
+ //TODO: shutdown cassandra
+ mixin=null;
+ MdbcTestUtils.cleanDatabase(DBType.POSTGRES);
+ MdbcTestUtils.stopCassandra();
+ }
+
+ @Test
+ public void getMixinName() {
+ final String mixinName = mixin.getMixinName();
+ assertEquals(mixinName.toLowerCase(),"postgres");
+ }
+
+ @Test
+ public void getSQLTableSet() {
+ createTestTable();
+ final Set<String> sqlTableSet = mixin.getSQLTableSet();
+ assertEquals(1,sqlTableSet.size());
+ assertTrue(sqlTableSet.contains("testtable"));
+ }
+
+ @Test
+ public void getTableInfo() {
+ createTestTable();
+ final TableInfo tableInfo = mixin.getTableInfo("testtable");
+ assertNotNull(tableInfo);
+ assertEquals(3,tableInfo.columns.size());
+ int index=0;
+ for(String col: tableInfo.columns) {
+ switch(col.toLowerCase()){
+ case "ix":
+ assertTrue(tableInfo.iskey.get(index));
+ assertEquals(Types.INTEGER, tableInfo.coltype.get(index).intValue());
+ break;
+ case "test1":
+ assertFalse(tableInfo.iskey.get(index));
+ assertEquals(Types.CHAR, tableInfo.coltype.get(index).intValue());
+ break;
+ case "test2":
+ assertFalse(tableInfo.iskey.get(index));
+ assertEquals(Types.VARCHAR, tableInfo.coltype.get(index).intValue());
+ break;
+ default:
+ fail();
+ }
+ index++;
+ }
+ }
+
+ private void createTestTable() {
+ try {
+ final Statement statement = conn.createStatement();
+ statement.execute("CREATE TABLE IF NOT EXISTS testtable (IX SERIAL, test1 CHAR(1), test2 VARCHAR(255), PRIMARY KEY (IX));");
+ statement.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ private void cleanTestTable() {
+ try {
+ final Statement statement = conn.createStatement();
+ statement.execute("DELETE FROM testtable;");
+ statement.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void postStatementHook() {
+ createTestTable();
+ mixin.createSQLTriggers("testtable");
+ final String sqlOperation = "INSERT INTO testtable (test1,test2) VALUES ('u','test');";
+ Statement stm=null;
+ try {
+ stm = conn.createStatement();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail();
+ }
+ mixin.preStatementHook(sqlOperation);
+ try {
+ stm.execute(sqlOperation);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail();
+ }
+ StagingTable st=new StagingTable();
+ mixin.postStatementHook(sqlOperation,st);
+ mixin.preCommitHook();
+ assertFalse(st.isEmpty());
+ }
+
+ void checkEmptyTestTable(){
+ ResultSet resultSet = mixin.executeSQLRead("SELECT * FROM testtable;");
+ try {
+ assertFalse(resultSet.next());
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+
+ void checkOneRowWithContents(String test1Val, String test2Val){
+ ResultSet resultSet = mixin.executeSQLRead("SELECT * FROM testtable;");
+ try {
+ assertTrue(resultSet.next());
+ assertEquals(test1Val, resultSet.getString("test1"));
+ assertEquals(test2Val, resultSet.getString("test2"));
+ assertFalse(resultSet.next());
+ }
+ catch(SQLException e){
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void applyTxDigest() {
+ createTestTable();
+ mixin.createSQLTriggers("testtable");
+ final String sqlOperation = "INSERT INTO testtable (test1,test2) VALUES ('u','test');";
+ Statement stm=null;
+ try {
+ stm = conn.createStatement();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail();
+ }
+ mixin.preStatementHook(sqlOperation);
+ try {
+ stm.execute(sqlOperation);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail();
+ }
+ StagingTable st=new StagingTable();
+ mixin.postStatementHook(sqlOperation,st);
+ mixin.preCommitHook();
+ assertFalse(st.isEmpty());
+ cleanTestTable();
+ checkEmptyTestTable();
+ List<Range> ranges = new ArrayList<>();
+ ranges.add(new Range("public.testtable"));
+ try {
+ mixin.applyTxDigest(st,ranges);
+ } catch (SQLException|MDBCServiceException e) {
+ e.printStackTrace();
+ fail();
+ }
+ checkOneRowWithContents("u","test");
+ }
+} \ No newline at end of file
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
index da64595..9e6161a 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
@@ -59,7 +59,7 @@ public class DagTest {
public void getDag() throws InterruptedException, MDBCServiceException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range1")
+ new Range("schema.range1")
));
rows.add(createNewRow(new ArrayList<>(ranges),"",false));
MILLISECONDS.sleep(10);
@@ -87,14 +87,14 @@ public class DagTest {
public void getDag2() throws InterruptedException, MDBCServiceException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> range1 = new ArrayList<>( Arrays.asList(
- new Range("range1")
+ new Range("schema.range1")
));
List<Range> range2 = new ArrayList<>( Arrays.asList(
- new Range("range2")
+ new Range("schema.range2")
));
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range1")
+ new Range("schema.range2"),
+ new Range("schema.range1")
));
rows.add(createNewRow(new ArrayList<>(range1),"",false));
MILLISECONDS.sleep(10);
@@ -123,7 +123,7 @@ public class DagTest {
public void nextToOwn() throws InterruptedException, MDBCServiceException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range1")
+ new Range("schema.range1")
));
rows.add(createNewRow(new ArrayList<>(ranges),"",false));
MILLISECONDS.sleep(10);
@@ -149,20 +149,20 @@ public class DagTest {
public void nextToApply() throws InterruptedException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range1")
+ new Range("schema.range1")
));
List<MusicTxDigestId> redo1 = new ArrayList<>(Arrays.asList(
- new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0)
+ new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
));
rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo1));
MILLISECONDS.sleep(10);
List<MusicTxDigestId> redo2 = new ArrayList<>(Arrays.asList(
- new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0)
+ new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
));
rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo2));
MILLISECONDS.sleep(10);
List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList(
- new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0)
+ new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
));
rows.add(createNewRow(new ArrayList<>(ranges),"",true,redo3));
Dag dag = Dag.getDag(rows, ranges);
@@ -180,7 +180,7 @@ public class DagTest {
assertEquals(0,pair.getKey().index);
List<Range> value = pair.getValue();
assertEquals(1,value.size());
- assertEquals(new Range("range1"),value.get(0));
+ assertEquals(new Range("schema.range1"),value.get(0));
pair = node.nextNotAppliedTransaction(rangesSet);
transactionCounter++;
}
@@ -195,23 +195,23 @@ public class DagTest {
Map<Range, Pair<MriReference, Integer>> alreadyApplied = new HashMap<>();
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range1")
+ new Range("schema.range1")
));
List<MusicTxDigestId> redo1 = new ArrayList<>(Arrays.asList(
- new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0)
+ new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
));
rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo1));
MILLISECONDS.sleep(10);
List<MusicTxDigestId> redo2 = new ArrayList<>(Arrays.asList(
- new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0),
- new MusicTxDigestId(MDBCUtils.generateUniqueKey(),1)
+ new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0),
+ new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),1)
));
MusicRangeInformationRow newRow = createNewRow(new ArrayList<>(ranges), "", false, redo2);
- alreadyApplied.put(new Range("range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0));
+ alreadyApplied.put(new Range("schema.range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0));
rows.add(newRow);
MILLISECONDS.sleep(10);
List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList(
- new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0)
+ new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
));
rows.add(createNewRow(new ArrayList<>(ranges),"",true,redo3));
Dag dag = Dag.getDag(rows, ranges);
@@ -230,7 +230,7 @@ public class DagTest {
assertEquals(2-nodeCounter,pair.getKey().index);
List<Range> value = pair.getValue();
assertEquals(1,value.size());
- assertEquals(new Range("range1"),value.get(0));
+ assertEquals(new Range("schema.range1"),value.get(0));
pair = node.nextNotAppliedTransaction(rangesSet);
transactionCounter++;
}
@@ -244,14 +244,14 @@ public class DagTest {
public void isDifferent() throws InterruptedException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> range1 = new ArrayList<>( Arrays.asList(
- new Range("range1")
+ new Range("schema.range1")
));
List<Range> range2 = new ArrayList<>( Arrays.asList(
- new Range("range2")
+ new Range("schema.range2")
));
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range1")
+ new Range("schema.range2"),
+ new Range("schema.range1")
));
rows.add(createNewRow(new ArrayList<>(range1),"",false));
MILLISECONDS.sleep(10);
@@ -277,14 +277,14 @@ public class DagTest {
public void getOldestDoubles() throws InterruptedException, MDBCServiceException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> range1 = new ArrayList<>( Arrays.asList(
- new Range("range1")
+ new Range("schema.range1")
));
List<Range> range2 = new ArrayList<>( Arrays.asList(
- new Range("range2")
+ new Range("schema.range2")
));
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range1")
+ new Range("schema.range2"),
+ new Range("schema.range1")
));
rows.add(createNewRow(new ArrayList<>(range1),"",false));
MILLISECONDS.sleep(10);
@@ -306,15 +306,15 @@ public class DagTest {
public void getIncompleteRangesAndDependents() throws InterruptedException, MDBCServiceException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> range1 = new ArrayList<>( Arrays.asList(
- new Range("range1")
+ new Range("schema.range1")
));
List<Range> range2 = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range3")
+ new Range("schema.range2"),
+ new Range("schema.range3")
));
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range1")
+ new Range("schema.range2"),
+ new Range("schema.range1")
));
rows.add(createNewRow(new ArrayList<>(range1),"",false));
MILLISECONDS.sleep(10);
@@ -330,7 +330,7 @@ public class DagTest {
List<Range> incomplete = incompleteRangesAndDependents.getKey();
Set<DagNode> dependents = incompleteRangesAndDependents.getValue();
assertEquals(1,incomplete.size());
- assertTrue(incomplete.contains(new Range("range3")));
+ assertTrue(incomplete.contains(new Range("schema.range3")));
assertEquals(1,dependents.size());
assertTrue(dependents.contains(dag.getNode(rows.get(3).getPartitionIndex())));
}
@@ -339,16 +339,16 @@ public class DagTest {
public void getIncompleteRangesAndDependents2() throws InterruptedException, MDBCServiceException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> range1 = new ArrayList<>( Arrays.asList(
- new Range("range1"),
- new Range("range4")
+ new Range("schema.range1"),
+ new Range("schema.range4")
));
List<Range> range2 = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range3")
+ new Range("schema.range2"),
+ new Range("schema.range3")
));
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range1")
+ new Range("schema.range2"),
+ new Range("schema.range1")
));
rows.add(createNewRow(new ArrayList<>(range1),"",false));
MILLISECONDS.sleep(10);
@@ -364,8 +364,8 @@ public class DagTest {
List<Range> incomplete = incompleteRangesAndDependents.getKey();
Set<DagNode> dependents = incompleteRangesAndDependents.getValue();
assertEquals(2,incomplete.size());
- assertTrue(incomplete.contains(new Range("range3")));
- assertTrue(incomplete.contains(new Range("range4")));
+ assertTrue(incomplete.contains(new Range("schema.range3")));
+ assertTrue(incomplete.contains(new Range("schema.range4")));
assertEquals(2,dependents.size());
assertTrue(dependents.contains(dag.getNode(rows.get(3).getPartitionIndex())));
assertTrue(dependents.contains(dag.getNode(rows.get(2).getPartitionIndex())));
@@ -375,20 +375,20 @@ public class DagTest {
public void addNewNodeWithSearch() throws InterruptedException, MDBCServiceException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> range1 = new ArrayList<>( Arrays.asList(
- new Range("range1")
+ new Range("schema.range1")
));
List<Range> range2 = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range3")
+ new Range("schema.range2"),
+ new Range("schema.range3")
));
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range1")
+ new Range("schema.range2"),
+ new Range("schema.range1")
));
List<Range> allRanges = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range3"),
- new Range("range1")
+ new Range("schema.range2"),
+ new Range("schema.range3"),
+ new Range("schema.range1")
));
rows.add(createNewRow(new ArrayList<>(range1),"",false));
MILLISECONDS.sleep(10);
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java
index eb01bcd..f2fbd1f 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java
@@ -34,7 +34,6 @@ import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
@@ -46,8 +45,10 @@ import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.lockingservice.cassandra.CassaLockStore;
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.MDBCUtils;
+import org.onap.music.mdbc.MdbcTestUtils.DBType;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.StateManager;
+import org.onap.music.mdbc.MdbcTestUtils;
import org.onap.music.mdbc.TestUtils;
import org.onap.music.mdbc.mixins.LockResult;
import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn;
@@ -58,13 +59,8 @@ import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.TxCommitProgress;
public class OwnershipAndCheckpointTest {
- final private static int sqlPort = 13350;
- final private static String keyspace="metricmusictest";
- final private static String mriTableName = "musicrangeinformation";
- final private static String mtdTableName = "musictxdigest";
- final private static String mdbcServerName = "name";
- public static final String DATABASE = "mdbcTest";
- public static final String TABLE= "PERSONS";
+ public static final String DATABASE = MdbcTestUtils.mariaDBDatabaseName;
+ public static final String TABLE= MdbcTestUtils.mariaDBDatabaseName+".PERSONS";
public static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS " + TABLE + " (\n" +
" PersonID int,\n" +
" LastName varchar(255),\n" +
@@ -75,11 +71,7 @@ public class OwnershipAndCheckpointTest {
");";
public static final String DROP_TABLE = "DROP TABLE IF EXISTS " + TABLE + ";";
//Properties used to connect to music
- private static Cluster cluster;
- private static Session session;
- private static String cassaHost = "localhost";
private static MusicMixin musicMixin = null;
- private static DB db;
Connection conn;
MySQLMixin mysqlMixin;
OwnershipAndCheckpoint ownAndCheck;
@@ -90,37 +82,18 @@ public class OwnershipAndCheckpointTest {
@BeforeClass
public static void init() throws MusicServiceException, ClassNotFoundException, ManagedProcessException {
- try {
- EmbeddedCassandraServerHelper.startEmbeddedCassandra();
- } catch (Exception e) {
- fail(e.getMessage());
- }
- cluster=EmbeddedCassandraServerHelper.getCluster();
- //cluster = new Cluster.Builder().addContactPoint(cassaHost).withPort(9142).build();
- cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(20000);
- assertNotNull("Invalid configuration for cassandra", cluster);
- session = EmbeddedCassandraServerHelper.getSession();
- assertNotNull("Invalid configuration for cassandra", session);
+ MdbcTestUtils.initCassandra();
Class.forName("org.mariadb.jdbc.Driver");
- MusicDataStoreHandle.mDstoreHandle = new MusicDataStore(cluster, session);
- CassaLockStore store = new CassaLockStore(MusicDataStoreHandle.mDstoreHandle);
- assertNotNull("Invalid configuration for music", store);
//start embedded mariadb
- db = DB.newEmbeddedDB(sqlPort);
- db.start();
- db.createDB(DATABASE);
+ MdbcTestUtils.startMariaDb();
}
@AfterClass
public static void close() throws MusicServiceException, MusicQueryException, ManagedProcessException {
//TODO: shutdown cassandra
musicMixin=null;
- db.stop();
- try {
- EmbeddedCassandraServerHelper.cleanEmbeddedCassandra();
- }
- catch(NullPointerException e){
- }
+ MdbcTestUtils.cleanDatabase(DBType.MySQL);
+ MdbcTestUtils.stopCassandra();
}
private void dropTable() throws SQLException {
@@ -144,25 +117,34 @@ public class OwnershipAndCheckpointTest {
@Before
public void initTest() throws SQLException {
- session.execute("DROP KEYSPACE IF EXISTS "+keyspace);
+ MdbcTestUtils.getSession().execute("DROP KEYSPACE IF EXISTS "+MdbcTestUtils.getKeyspace());
try {
Properties properties = new Properties();
+/*
properties.setProperty(MusicMixin.KEY_MY_ID,mdbcServerName);
properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,keyspace);
properties.setProperty(MusicMixin.KEY_MUSIC_RFACTOR,"1");
//StateManager stateManager = new StateManager("dbUrl", properties, "serverName", "dbName");
ownAndCheck = new OwnershipAndCheckpoint();
musicMixin =new MusicMixin(stateManager, mdbcServerName,properties);
+*/
+ properties.setProperty(MusicMixin.KEY_MY_ID,MdbcTestUtils.getServerName());
+ properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,MdbcTestUtils.getKeyspace());
+ properties.setProperty(MusicMixin.KEY_MUSIC_RFACTOR,"1");
+ properties.setProperty(MusicMixin.KEY_MUSIC_ADDRESS,MdbcTestUtils.getCassandraUrl());
+ ownAndCheck = new OwnershipAndCheckpoint();
+ musicMixin =new MusicMixin(stateManager, MdbcTestUtils.getServerName(), properties);
} catch (MDBCServiceException e) {
fail("error creating music musicMixin " + e.getMessage());
}
- this.conn = DriverManager.getConnection("jdbc:mariadb://localhost:"+sqlPort+"/"+DATABASE, "root", "");
- this.mysqlMixin = new MySQLMixin(musicMixin, "localhost:"+sqlPort+"/"+DATABASE, conn, null);
+ this.conn = MdbcTestUtils.getConnection(DBType.MySQL);
+ Properties info = new Properties();
+ this.mysqlMixin = new MySQLMixin(musicMixin, "localhost:"+MdbcTestUtils.getMariaDbPort()+"/"+DATABASE, conn, info);
dropAndCreateTable();
}
private void initDatabase(Range range) throws MDBCServiceException, SQLException {
- final DatabasePartition partition = TestUtils.createBasicRow(range, musicMixin, mdbcServerName);
+ final DatabasePartition partition = TestUtils.createBasicRow(range, musicMixin, MdbcTestUtils.getServerName());
String sqlOperation = "INSERT INTO "+TABLE+" (PersonID,LastName,FirstName,Address,City) VALUES "+
"(1,'SAUREZ','ENRIQUE','GATECH','ATLANTA');";
StagingTable stagingTable = new StagingTable();
@@ -171,13 +153,15 @@ public class OwnershipAndCheckpointTest {
executeStatement.execute(sqlOperation);
this.conn.commit();
mysqlMixin.postStatementHook(sqlOperation,stagingTable);
+ mysqlMixin.preCommitHook();
executeStatement.close();
String id = MDBCUtils.generateUniqueKey().toString();
TxCommitProgress progressKeeper = new TxCommitProgress();
progressKeeper.createNewTransactionTracker(id ,this.conn);
musicMixin.commitLog(partition, null, stagingTable, id, progressKeeper);
try {
- TestUtils.unlockRow(keyspace, mriTableName, partition);
+// TestUtils.unlockRow(keyspace, mriTableName, partition);
+ TestUtils.unlockRow(MdbcTestUtils.getKeyspace(), MdbcTestUtils.getMriTableName(), partition);
}
catch(Exception e){
fail(e.getMessage());