aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArthur Martella <arthur.martella.1@att.com>2019-04-29 15:03:07 -0400
committerArthur Martella <arthur.martella.1@att.com>2019-05-03 10:04:56 -0400
commit52159a510dc31ee1dacf612a1e05cc8612a117e5 (patch)
tree167fd4e5357ecab6b7a5fa8675dc7c3b63453b38
parent601050a535e916c0ff52826fbe2c61b798c74530 (diff)
Implement postgres, fixes to eventual, and many bug fixes
Streamline upper and lower cases for Ranges Fix initialization of system Merged version of commit https://gerrit.onap.org/r/#/c/86160/ Change-Id: I169ed56ff79ff0a2e14ab9bc5e0467d1c0b9f0a9 Issue-ID: MUSIC-387 Signed-off-by: Arthur Martella <arthur.martella.1@att.com>
-rwxr-xr-xmdbc-server/pom.xml19
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java4
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java19
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java55
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java9
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/Range.java21
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java61
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java (renamed from mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java)0
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java80
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java16
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java9
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json11
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json8
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java100
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java10
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java5
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java2
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java55
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java156
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java1066
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java26
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java35
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java8
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java2
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java11
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java6
-rwxr-xr-xmdbc-server/src/main/resources/mdbc.properties13
-rwxr-xr-xmdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java2
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java280
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java19
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java263
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java220
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java96
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java64
34 files changed, 2320 insertions, 431 deletions
diff --git a/mdbc-server/pom.xml b/mdbc-server/pom.xml
index cdf7cff..7a46120 100755
--- a/mdbc-server/pom.xml
+++ b/mdbc-server/pom.xml
@@ -89,9 +89,9 @@
<version>20160810</version>
</dependency>
<dependency>
- <groupId>org.mariadb.jdbc</groupId>
- <artifactId>mariadb-java-client</artifactId>
- <version>2.3.0</version>
+ <groupId>org.mariadb.jdbc</groupId>
+ <artifactId>mariadb-java-client</artifactId>
+ <version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
@@ -200,6 +200,19 @@
<artifactId>protobuf-java</artifactId>
<version>3.6.1</version>
</dependency>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>42.2.5</version>
+ </dependency>
+ <dependency>
+ <groupId>com.opentable.components</groupId>
+ <artifactId>otj-pg-embedded</artifactId>
+ <version>0.13.1</version>
+ <scope>test</scope>
+ </dependency>
+
+
</dependencies>
<build>
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
index ced5745..91b13f3 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
@@ -28,6 +28,8 @@ public class Configuration {
public static final String KEY_DB_MIXIN_NAME = "MDBC_DB_MIXIN";
/** The property name to use to select the MUSIC 'mixin'. */
public static final String KEY_MUSIC_MIXIN_NAME = "MDBC_MUSIC_MIXIN";
+ /** The property name to select if async staging table update is used */
+ public static final String KEY_ASYNC_STAGING_TABLE_UPDATE = "ASYNC_STAGING_TABLE_UPDATE";
/** The name of the default mixin to use for the DBInterface. */
public static final String DB_MIXIN_DEFAULT = "mysql";//"h2";
/** The name of the default mixin to use for the MusicInterface. */
@@ -44,4 +46,6 @@ public class Configuration {
public static final long DEFAULT_OWNERSHIP_TIMEOUT = 5*60*60*1000;//default of 5 hours
/** The property name to provide comma separated list of ranges to warmup */
public static final String KEY_WARMUPRANGES = "warmupranges";
+ /** Default async staging table update o ption*/
+ public static final String ASYNC_STAGING_TABLE_UPDATE = "false";
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
index 32d456e..ae2b869 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
@@ -98,10 +98,25 @@ public class MDBCUtils {
}
public static List<Range> getTables(Map<String,List<SQLOperation>> queryParsed){
+ return getTables(null, queryParsed);
+ }
+
+ public static List<Range> getTables(String defaultDatabaseName, Map<String,List<SQLOperation>> queryParsed){
List<Range> ranges = new ArrayList<>();
for(String table: queryParsed.keySet()){
- ranges.add(new Range(table));
- }
+ String[] parts = table.split("\\.");
+ if(parts.length==2){
+ ranges.add(new Range(table));
+ }
+ else if(parts.length==1 && defaultDatabaseName!=null){
+ ranges.add(new Range(defaultDatabaseName+"."+table));
+ }
+ else{
+ throw new IllegalArgumentException("Table should either have only one '.' or none at all, the table "
+ + "received is "+table);
+ }
+
+ }
return ranges;
}
} \ No newline at end of file
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
index 3db6c3f..f1ac851 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
@@ -160,18 +160,7 @@ public class MdbcConnection implements Connection {
if(progressKeeper!=null) progressKeeper.commitRequested(id);
logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b);
if (b) {
- // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction
- if(id == null || id.isEmpty()) {
- logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
- throw new SQLException("tx id is null");
- }
- try {
- mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper);
- } catch (MDBCServiceException e) {
- // TODO Auto-generated catch block
- logger.error("Cannot commit log to music" + e.getStackTrace());
- throw new SQLException(e.getMessage(), e);
- }
+ musicCommit();
}
if(progressKeeper!=null) {
progressKeeper.setMusicDone(id);
@@ -191,13 +180,7 @@ public class MdbcConnection implements Connection {
return jdbcConn.getAutoCommit();
}
- /**
- * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed,
- * they are performed now and copied into MUSIC.
- * @throws SQLException
- */
- @Override
- public void commit() throws SQLException {
+ private void musicCommit() throws SQLException {
if(progressKeeper.isComplete(id)) {
return;
}
@@ -205,6 +188,7 @@ public class MdbcConnection implements Connection {
progressKeeper.commitRequested(id);
}
+ dbi.preCommitHook();
try {
logger.debug(EELFLoggerDelegate.applicationLogger, " commit ");
// transaction was committed -- add all the updates into the REDO-Log in MUSIC
@@ -214,6 +198,16 @@ public class MdbcConnection implements Connection {
logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
throw new SQLException("Failure commiting to MUSIC", e);
}
+ }
+
+ /**
+ * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed,
+ * they are performed now and copied into MUSIC.
+ * @throws SQLException
+ */
+ @Override
+ public void commit() throws SQLException {
+ musicCommit();
if(progressKeeper != null) {
progressKeeper.setMusicDone(id);
@@ -273,9 +267,10 @@ public class MdbcConnection implements Connection {
} catch (MDBCServiceException e) {
throw new SQLException("Failure during relinquish of partition",e);
}
- // Warning! Make sure this call remains AFTER the call to jdbcConn.close(),
- // otherwise you're going to get stuck in an infinite loop.
- statemanager.closeConnection(id);
+
+ // Warning! Make sure this call remains AFTER the call to jdbcConn.close(),
+ // otherwise you're going to get stuck in an infinite loop.
+ statemanager.closeConnection(id);
}
@Override
@@ -507,7 +502,8 @@ public class MdbcConnection implements Connection {
//Parse tables from the sql query
Map<String, List<SQLOperation>> tableToInstruction = QueryProcessor.parseSqlQuery(sql);
//Check ownership of keys
- List<Range> queryTables = MDBCUtils.getTables(tableToInstruction);
+ String defaultSchema = dbi.getSchema();
+ List<Range> queryTables = MDBCUtils.getTables(defaultSchema, tableToInstruction);
if (this.partition!=null) {
List<Range> snapshot = this.partition.getSnapshot();
if(snapshot!=null){
@@ -550,18 +546,15 @@ public class MdbcConnection implements Connection {
logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1);
for (String tableName : set1) {
// This map will be filled in if this table was previously discovered
- tableName = tableName.toUpperCase();
- if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) {
+ if (!table_set.contains(tableName.toUpperCase()) && !dbi.getReservedTblNames().contains(tableName.toUpperCase())) {
logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName);
try {
TableInfo ti = dbi.getTableInfo(tableName);
- mi.initializeMusicForTable(ti,tableName);
- //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted
- ti = dbi.getTableInfo(tableName);
- mi.createDirtyRowTable(ti,tableName);
+ //mi.initializeMusicForTable(ti,tableName);
+ //mi.createDirtyRowTable(ti,tableName);
dbi.createSQLTriggers(tableName);
- table_set.add(tableName);
- dbi.synchronizeData(tableName);
+ table_set.add(tableName.toUpperCase());
+ //dbi.synchronizeData(tableName);
logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" +
table_set.size() + "/" + set1.size() + "tables uploaded");
} catch (Exception e) {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
index 42b9710..500ed81 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
@@ -80,6 +80,15 @@ public class MdbcServer {
Properties connectionProps = new Properties();
connectionProps.put("user", user);
connectionProps.put("password", password);
+ String defaultMusicMixin = Utils.getDefaultMusicMixin();
+ if(defaultMusicMixin!=null){
+ connectionProps.put(Configuration.KEY_MUSIC_MIXIN_NAME,defaultMusicMixin);
+ }
+ String defaultDBMixin = Utils.getDefaultDBMixin();
+ if(defaultMusicMixin!=null){
+ connectionProps.put(Configuration.KEY_DB_MIXIN_NAME,defaultDBMixin);
+ }
+ Utils.registerDefaultDrivers();
meta = new MdbcServerLogic(url,connectionProps,config);
LocalService service = new LocalService(meta);
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
index 16e7170..41aed26 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
@@ -23,6 +23,9 @@ import java.io.Serializable;
import java.util.List;
import java.util.Objects;
+import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.mixins.MusicMixin;
+
/**
* This class represent a range of the whole database
@@ -32,13 +35,21 @@ import java.util.Objects;
*/
public class Range implements Cloneable{
+ private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Range.class);
+
private String table;
public Range(String table) {
- this.table = table.toUpperCase();
+ final String[] split = table.split("\\.");
+ if(split.length!=2){
+ logger.debug("Table should contain schema, received in constructor: " + table);
+// throw new IllegalArgumentException("Table should always contain the schema, table received in "
+// + "constructor is "+table);
+ }
+ this.table = table;
}
- public String toString(){return table.toUpperCase();}
+ public String toString(){return table;}
/**
* Compares to Range types
@@ -55,7 +66,7 @@ public class Range implements Cloneable{
@Override
public int hashCode(){
- return table.hashCode();
+ return table.toUpperCase().hashCode();
}
@Override
@@ -74,11 +85,11 @@ public class Range implements Cloneable{
public static boolean overlaps(List<Range> ranges, String table){
//\TODO check if parallel stream makes sense here
- return ranges.stream().map((Range r) -> r.table.equals(table)).anyMatch((Boolean b) -> b);
+ return ranges.stream().map((Range r) -> r.table.toUpperCase().equals(table.toUpperCase())).anyMatch((Boolean b) -> b);
}
public boolean overlaps(Range other) {
- return table.equals(other.table);
+ return table.toUpperCase().equals(other.table.toUpperCase());
}
public String getTable() {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index e4c4a24..18bc0db 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -21,7 +21,6 @@ package org.onap.music.mdbc;
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.onap.music.exceptions.MDBCServiceException;
@@ -47,6 +46,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -102,10 +102,10 @@ public class StateManager {
public StateManager() {
}
- public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
+ public StateManager(String sqlDBUrl, Properties newInfo, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
this.sqlDBName = sqlDBName;
- this.sqlDBUrl = sqlDBUrl;
- this.info = info;
+ this.sqlDBUrl = cleanSqlUrl(sqlDBUrl);
+ this.info = new Properties();
this.mdbcServerName = mdbcServerName;
this.connectionRanges = new ConcurrentHashMap<>();
@@ -117,6 +117,7 @@ public class StateManager {
} catch (IOException e) {
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
}
+ info.putAll(newInfo);
cassandraUrl = info.getProperty(Configuration.KEY_CASSANDRA_URL, Configuration.CASSANDRA_URL_DEFAULT);
musicmixin = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT);
@@ -129,6 +130,16 @@ public class StateManager {
ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout);
}
+ protected String cleanSqlUrl(String url){
+ if(url!=null) {
+ url = url.trim();
+ if (url.length() > 0 && url.charAt(url.length() - 1) == '/') {
+ url= url.substring(0, url.length() - 1);
+ }
+ }
+ return url;
+ }
+
protected void initTxDaemonThread(){
txDaemon = new Thread(
new MusicTxDigestDaemon(Integer.parseInt(
@@ -149,27 +160,21 @@ public class StateManager {
}
protected void initSqlDatabase() throws MDBCServiceException {
- try {
- //\TODO: pass the driver as a variable
- Class.forName("org.mariadb.jdbc.Driver");
- }
- catch (ClassNotFoundException e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
- ErrorTypes.GENERALSERVICEERROR);
- return;
- }
- try {
- Connection sqlConnection = DriverManager.getConnection(this.sqlDBUrl, this.info);
- StringBuilder sql = new StringBuilder("CREATE DATABASE IF NOT EXISTS ")
+ if(!this.sqlDBUrl.toLowerCase().startsWith("jdbc:postgresql")) {
+ try {
+ Connection sqlConnection = DriverManager.getConnection(this.sqlDBUrl, this.info);
+ StringBuilder sql = new StringBuilder("CREATE DATABASE IF NOT EXISTS ")
.append(sqlDBName)
.append(";");
- Statement stmt = sqlConnection.createStatement();
- stmt.execute(sql.toString());
- sqlConnection.close();
- } catch (SQLException e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
+ Statement stmt = sqlConnection.createStatement();
+ stmt.execute(sql.toString());
+ sqlConnection.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.UNKNOWNERROR,
+ ErrorSeverity.CRITICAL,
ErrorTypes.GENERALSERVICEERROR);
- throw new MDBCServiceException(e.getMessage(), e);
+ throw new MDBCServiceException(e.getMessage(), e);
+ }
}
}
@@ -306,17 +311,9 @@ public class StateManager {
public Connection openConnection(String id) {
Connection sqlConnection;
MdbcConnection newConnection;
- try {
- //TODO: pass the driver as a variable
- Class.forName("org.mariadb.jdbc.Driver");
- }
- catch (ClassNotFoundException e) {
- // TODO Auto-generated catch block
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL,
- ErrorTypes.QUERYERROR);
- }
-
+ Utils.registerDefaultDrivers();
//Create connection to local SQL DB
+
try {
sqlConnection = DriverManager.getConnection(this.sqlDBUrl+"/"+this.sqlDBName, this.info);
} catch (SQLException e) {
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java
index e5a3252..e5a3252 100755
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java
new file mode 100644
index 0000000..7a09dca
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java
@@ -0,0 +1,80 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.onap.music.logging.EELFLoggerDelegate;
+
+public class Utils {
+
+
+ public static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Utils.class);
+
+ static Properties retrieveMdbcProperties() {
+ Properties pr = null;
+ try {
+ pr = new Properties();
+ pr.load(Utils.class.getResourceAsStream("/mdbc.properties"));
+ } catch (IOException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Could not load property file: " + e.getMessage());
+ }
+ return pr;
+ }
+
+ public static String getDefaultMusicMixin() {
+ Properties pr = retrieveMdbcProperties();
+ if (pr == null)
+ return null;
+ String defaultMusicMixin = pr.getProperty("DEFAULT_MUSIC_MIXIN");
+ return defaultMusicMixin;
+ }
+
+ public static String getDefaultDBMixin() {
+ Properties pr = retrieveMdbcProperties();
+ if (pr == null)
+ return null;
+ String defaultMusicMixin = pr.getProperty("DEFAULT_DB_MIXIN");
+ return defaultMusicMixin;
+ }
+
+ public static void registerDefaultDrivers() {
+ Properties pr = null;
+ try {
+ pr = new Properties();
+ pr.load(Utils.class.getResourceAsStream("/mdbc.properties"));
+ } catch (IOException e) {
+ logger.error("Could not load property file > " + e.getMessage());
+ }
+
+ String drivers = pr.getProperty("DEFAULT_DRIVERS");
+ for (String driver : drivers.split("[ ,]")) {
+ logger.info(EELFLoggerDelegate.applicationLogger, "Registering jdbc driver '" + driver + "'");
+ try {
+ Class.forName(driver.trim());
+ } catch (ClassNotFoundException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Driver class " + driver + " not found.");
+ }
+ }
+ }
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
index 38309d5..391ee1a 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
@@ -41,7 +41,7 @@ public class NodeConfiguration {
public String nodeName;
public String sqlDatabaseName;
- public NodeConfiguration(String tables, String eventualTables, UUID mriIndex, String sqlDatabaseName, String node){
+ public NodeConfiguration(String tables, List<String> eventualTables, UUID mriIndex, String sqlDatabaseName, String node){
// public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String mriTable, String lockId, String musicTxDigestTable) {
partition = new DatabasePartition(toRanges(tables), mriIndex, null) ;
eventual = new Eventual(toRanges(eventualTables));
@@ -49,16 +49,20 @@ public class NodeConfiguration {
this.sqlDatabaseName = sqlDatabaseName;
}
+ protected List<Range> toRanges(List<String> tables){
+ List<Range> newRange = new ArrayList<>();
+ for(String table: tables) {
+ newRange.add(new Range(table));
+ }
+ return newRange;
+ }
+
protected List<Range> toRanges(String tables){
if(tables.isEmpty()){
return new ArrayList<>();
}
- List<Range> newRange = new ArrayList<>();
String[] tablesArray=tables.split(",");
- for(String table: tablesArray) {
- newRange.add(new Range(table));
- }
- return newRange;
+ return toRanges(new ArrayList<>(Arrays.asList(tablesArray)));
}
public String toJson() {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
index 343a8b8..0598271 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
@@ -26,6 +26,7 @@ import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.mixins.MusicMixin;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigestDaemon;
import com.google.gson.Gson;
import org.onap.music.datastore.PreparedQueryObject;
@@ -46,6 +47,7 @@ public class TablesConfiguration {
private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TablesConfiguration.class);
private List<PartitionInformation> partitions;
+ private List<String> eventual;
String tableToPartitionName;
private String musicNamespace;
private String partitionInformationTableName;
@@ -83,16 +85,17 @@ public class TablesConfiguration {
logger.info("Creating empty row with id "+partitionId);
MusicMixin.createEmptyMriRow(musicNamespace,partitionInfo.mriTableName,UUID.fromString(partitionId),
- partitionInfo.owner,null,partitionInfo.getTables(),true);
+ partitionInfo.owner,null,
+ partitionInfo.getTables(),true);
//3) Create config for this node
StringBuilder newStr = new StringBuilder();
for(Range r: partitionInfo.tables){
- newStr.append(r.toString().toUpperCase()).append(",");
+ newStr.append(r.toString()).append(",");
}
logger.info("Appending row to configuration "+partitionId);
- nodeConfigs.add(new NodeConfiguration(newStr.toString(),"",UUID.fromString(partitionId),
+ nodeConfigs.add(new NodeConfiguration(newStr.toString(),eventual,UUID.fromString(partitionId),
sqlDatabaseName, partitionInfo.owner));
}
return nodeConfigs;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json
new file mode 100644
index 0000000..430525f
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json
@@ -0,0 +1,11 @@
+{
+ "internalNamespace": "music_internal",
+ "internalReplicationFactor": 1,
+ "musicNamespace": "namespace",
+ "musicReplicationFactor": 1,
+ "mriTableName": "musicrangeinformation",
+ "mtxdTableName": "musictxdigest",
+ "eventualMtxdTableName":"musicevetxdigest",
+ "nodeInfoTableName":"nodeinfo",
+ "rangeDependencyTableName":"musicrangedependency"
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json
index 8cbbfec..559d597 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json
@@ -3,21 +3,19 @@
{
"tables": [
{
- "table": "PERSONS"
+ "table": "public.PERSONS"
}
],
"owner": "",
"mriTableName": "musicrangeinformation",
- "mtxdTableName": "musictxdigest",
"partitionId": ""
}
],
- "internalNamespace": "music_internal",
- "internalReplicationFactor": 1,
+ "eventual": ["public.eventualPERSONS"],
"musicNamespace": "namespace",
- "musicReplicationFactor": 1,
"tableToPartitionName": "tabletopartition",
"partitionInformationTableName": "partitioninfo",
"redoHistoryTableName": "redohistory",
"sqlDatabaseName": "test"
}
+
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java
new file mode 100644
index 0000000..543cbd0
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java
@@ -0,0 +1,100 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc.mixins;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.websocket.RemoteEndpoint.Async;
+import org.onap.music.logging.EELFLoggerDelegate;
+
+public class AsyncUpdateHandler {
+
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(AsyncUpdateHandler.class);
+
+ Object finishMonitor;
+ Object taskMonitor;
+ AtomicBoolean pendingUpdate;
+ Runnable handler;
+
+ AsyncUpdateHandler(Runnable handlerToRun){
+ handler=handlerToRun;
+ finishMonitor=new Object();
+ taskMonitor = new Object();
+ pendingUpdate=new AtomicBoolean(false);
+ createUpdateExecutor();
+ }
+
+ void createUpdateExecutor(){
+ Runnable newRunnable = ()-> UpdateExecutor();
+ Thread newThread = new Thread(newRunnable);
+ newThread.setDaemon(true);
+ newThread.start();
+ }
+
+ public void processNewUpdate(){
+ pendingUpdate.set(true);
+ synchronized (taskMonitor) {
+ taskMonitor.notifyAll();
+ }
+ }
+
+ void UpdateExecutor(){
+ while(true) {
+ synchronized (taskMonitor) {
+ try {
+ if(!pendingUpdate.get()){
+ taskMonitor.wait();
+ }
+ } catch (InterruptedException e) {
+ logger.error("Update Executor received an interrup exception");
+ }
+ }
+ startUpdate();
+ }
+ }
+
+ void startUpdate(){
+ synchronized (finishMonitor) {
+ //Keep running until there are no more requests
+ while (pendingUpdate.getAndSet(false)) {
+ if(handler!=null){
+ handler.run();
+ }
+ }
+ finishMonitor.notifyAll();
+ }
+
+ }
+
+ public void waitForAllPendingUpdates(){
+ //Wait until there are no more requests and the thread is not longer running
+ //We could use a join, but given that the thread could be temporally null, then it is easier to
+ //separate the wait from the thread that is running
+ synchronized(finishMonitor){
+ while(pendingUpdate.get()) {
+ try {
+ finishMonitor.wait();
+ } catch (InterruptedException e) {
+ logger.error("waitForAllPendingUpdate received exception");
+ }
+ }
+ }
+ }
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
index a594918..dd71d97 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
@@ -111,6 +112,13 @@ public interface DBInterface {
* @return a ResultSet containing the rows returned from the query
*/
ResultSet executeSQLRead(String sql);
+
+ /**
+ * This method is used to verify that all data structures needed for commit are ready for commit
+ * Either for the MusicMixin (e.g. staging table) or internally for the DBMixin
+ * Important this is not for actual commit operation, it should be treated as a signal.
+ */
+ void preCommitHook();
void synchronizeData(String tableName);
@@ -133,4 +141,6 @@ public interface DBInterface {
void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException;
Connection getSQLConnection();
+
+ String getSchema();
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
index d822615..324b0f6 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
@@ -61,12 +61,13 @@ public class MixinFactory {
con = cl.getConstructor(MusicInterface.class, String.class, Connection.class, Properties.class);
if (con != null) {
logger.info(EELFLoggerDelegate.applicationLogger,"Found match: "+miname);
- return (DBInterface) con.newInstance(mi, url, conn, info);
+ DBInterface newdbi = (DBInterface) con.newInstance(mi, url, conn, info);
+ return newdbi;
}
}
}
} catch (Exception e) {
- logger.error(EELFLoggerDelegate.errorLogger,"createDBInterface: "+e);
+ logger.error(EELFLoggerDelegate.errorLogger,"createDBInterface error for "+cl.getName(),e);
}
}
return null;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
index 71f1b8b..fc8897f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
@@ -303,8 +303,8 @@ public interface MusicInterface {
void deleteOldMriRows(Map<UUID,String> oldRowsAndLocks) throws MDBCServiceException;
List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException;
-
+
void deleteMriRow(MusicRangeInformationRow row) throws MDBCServiceException;
void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index cb37a55..5a322f3 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -61,7 +61,6 @@ import org.onap.music.mdbc.StateManager;
import org.onap.music.mdbc.TableInfo;
import org.onap.music.mdbc.ownership.Dag;
import org.onap.music.mdbc.ownership.DagNode;
-import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.MusicTxDigestId;
@@ -99,6 +98,8 @@ public class MusicMixin implements MusicInterface {
public static final String KEY_MUSIC_RFACTOR = "music_rfactor";
/** The property name to use to provide the replication factor for Cassandra. */
public static final String KEY_MUSIC_NAMESPACE = "music_namespace";
+ /** The property name to use to provide a timeout to mdbc (ownership) */
+ public static final String KEY_TIMEOUT = "mdbc_timeout";
/** The property name to use to provide a flag indicating if compression is required */
public static final String KEY_COMPRESSION = "mdbc_compression";
/** Namespace for the tables in MUSIC (Cassandra) */
@@ -106,13 +107,15 @@ public class MusicMixin implements MusicInterface {
/** The default property value to use for the Cassandra IP address. */
public static final String DEFAULT_MUSIC_ADDRESS = "localhost";
/** The default property value to use for the Cassandra replication factor. */
- public static final int DEFAULT_MUSIC_RFACTOR = 1;
+ public static final int DEFAULT_MUSIC_RFACTOR = 3;
+ /** The default property value to use for the MDBC timeout */
+ public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours
/** The default primary string column, if none is provided. */
public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid";
/** Type of the primary key, if none is defined by the user */
public static final String MDBC_PRIMARYKEY_TYPE = "uuid";
public static final boolean DEFAULT_COMPRESSION = true;
-
+ //TODO: Control network topology strategy with a configuration file entry
public static final boolean ENABLE_NETWORK_TOPOLOGY_STRATEGY = false;
//\TODO Add logic to change the names when required and create the tables when necessary
@@ -254,6 +257,7 @@ public class MusicMixin implements MusicInterface {
public static void createKeyspace(String keyspace, int replicationFactor) throws MDBCServiceException {
Map<String,Object> replicationInfo = new HashMap<>();
replicationInfo.put("'class'", "'NetworkTopologyStrategy'");
+
if (ENABLE_NETWORK_TOPOLOGY_STRATEGY && replicationFactor==3) {
replicationInfo.put("'dc1'", 1);
replicationInfo.put("'dc2'", 1);
@@ -673,7 +677,7 @@ public class MusicMixin implements MusicInterface {
}
if(rs!=null) {
for (Row row : rs) {
- set.add(row.getString("TABLE_NAME").toUpperCase());
+ set.add(row.getString("TABLE_NAME"));
}
}
return set;
@@ -1295,15 +1299,13 @@ public class MusicMixin implements MusicInterface {
logger.warn("Trying tcommit log with null partition");
return;
}
+
List<Range> snapshot = partition.getSnapshot();
if(snapshot==null || snapshot.isEmpty()){
logger.warn("Trying to commit log with empty ranges");
return;
}
- // first deal with commit for eventually consistent tables
- filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper);
-
UUID mriIndex = partition.getMRIIndex();
String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex;
//0. See if reference to lock was already created
@@ -1475,7 +1477,7 @@ public class MusicMixin implements MusicInterface {
for(TupleValue t: log){
//final String tableName = t.getString(0);
final UUID id = t.getUUID(1);
- digestIds.add(new MusicTxDigestId(id,index++));
+ digestIds.add(new MusicTxDigestId(partitionIndex,id,index++));
}
List<Range> partitions = new ArrayList<>();
Set<String> tables = newRow.getSet("keys",String.class);
@@ -1526,11 +1528,14 @@ public class MusicMixin implements MusicInterface {
pQueryObject.appendQueryString(cql);
pQueryObject.addValue(baseRange.getTable());
Row newRow;
+ //TODO Change this when music fix the "." problem in the primary key
+ final String table = baseRange.getTable();
+ final String tableWithoutDot = table.replaceAll("\\.","");
try {
- newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,baseRange.getTable(),null);
+ newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,tableWithoutDot,null);
} catch (MDBCServiceException e) {
- logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName);
- throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e);
+ logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName+" trying for table "+tableWithoutDot);
+ throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information for table "+tableWithoutDot, e);
}
return getRangeDependenciesFromCassandraRow(newRow);
}
@@ -1838,7 +1843,6 @@ public class MusicMixin implements MusicInterface {
PreparedQueryObject query = new PreparedQueryObject();
int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR);
-
String cql = String.format("INSERT INTO %s.%s (txid,transactiondigest,compressed,year,txTimeId ) VALUES (?,?,?,?,now());",this.music_ns,
this.musicEventualTxDigestTableName);
query.appendQueryString(cql);
@@ -1856,6 +1860,7 @@ public class MusicMixin implements MusicInterface {
}
}
+
@Override
public StagingTable getTxDigest(MusicTxDigestId id) throws MDBCServiceException {
String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName);
@@ -1883,8 +1888,7 @@ public class MusicMixin implements MusicInterface {
}
return changes;
}
-
-
+
@Override
public LinkedHashMap<UUID, StagingTable> getEveTxDigest(String nodeName) throws MDBCServiceException {
int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR);
@@ -1901,7 +1905,7 @@ public class MusicMixin implements MusicInterface {
LinkedHashMap<UUID, StagingTable> ecDigestInformation = new LinkedHashMap<>();
UUID musicevetxdigestNodeinfoTimeID = getTxTimeIdFromNodeInfo(nodeName);
PreparedQueryObject pQueryObject = new PreparedQueryObject();
-
+
if (musicevetxdigestNodeinfoTimeID != null) {
// this will fetch only few records based on the time-stamp condition.
cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) AND txtimeid > ? LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString());
@@ -1912,7 +1916,7 @@ public class MusicMixin implements MusicInterface {
cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString());
pQueryObject.appendQueryString(cql);
}
-
+
// I need to get a ResultSet of all the records and give each row to the below HashMap.
ResultSet rs = executeMusicRead(pQueryObject);
while (!rs.isExhausted()) {
@@ -1920,8 +1924,8 @@ public class MusicMixin implements MusicInterface {
ByteBuffer digest = row.getBytes("transactiondigest");
Boolean compressed = row.getBool("compressed");
//String txTimeId = row.getString("txtimeid"); //???
- UUID txTimeId = row.getUUID("txtimeid");
-
+ UUID txTimeId = row.getUUID("txtimeid");
+
try {
if(compressed){
digest=StagingTable.Decompress(digest);
@@ -1932,7 +1936,7 @@ public class MusicMixin implements MusicInterface {
throw e;
}
ecDigestInformation.put(txTimeId, changes);
- }
+ }
return ecDigestInformation;
}
@@ -2218,7 +2222,7 @@ public class MusicMixin implements MusicInterface {
* @param cql the CQL to be sent to Cassandra
*/
private static void executeMusicWriteQuery(String keyspace, String table, String cql)
- throws MDBCServiceException {
+ throws MDBCServiceException {
PreparedQueryObject pQueryObject = new PreparedQueryObject();
pQueryObject.appendQueryString(cql);
ResultType rt = null;
@@ -2254,6 +2258,9 @@ public class MusicMixin implements MusicInterface {
throw new MDBCServiceException("Error executing atomic get", e);
}
}
+ if(result==null){
+ throw new MDBCServiceException("Error executing atomic get for primary key: "+primaryKey);
+ }
if(result.isExhausted()){
return null;
}
@@ -2418,11 +2425,17 @@ public class MusicMixin implements MusicInterface {
pQueryObject.addValue(row.getPartitionIndex());
ReturnType rt ;
try {
- rt = MusicCore.atomicPut(music_ns, musicRangeDependencyTableName, row.getPartitionIndex().toString(),
+ rt = MusicCore.atomicPut(music_ns, musicRangeInformationTableName, row.getPartitionIndex().toString(),
pQueryObject, null);
} catch (MusicLockingException|MusicQueryException|MusicServiceException e) {
logger.error("Failure when deleting mri row");
new MDBCServiceException("Error deleting mri row",e);
}
}
+
+ public StateManager getStateManager() {
+ return stateManager;
+ }
+
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
index 6d6e691..15caf3f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
@@ -40,6 +40,7 @@ import org.json.JSONTokener;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.Configuration;
import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
@@ -75,7 +76,7 @@ public class MySQLMixin implements DBInterface {
public static final String TRANS_TBL = "MDBC_TRANSLOG";
private static final String CREATE_TBL_SQL =
"CREATE TABLE IF NOT EXISTS "+TRANS_TBL+
- " (IX INT AUTO_INCREMENT, OP CHAR(1), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA VARCHAR(1024), " +
+ " (IX INT AUTO_INCREMENT, OP CHAR(1), SCHEMANAME VARCHAR(255), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA VARCHAR(1024), " +
"CONNECTION_ID INT, PRIMARY KEY (IX));";
private final MusicInterface mi;
@@ -85,6 +86,10 @@ public class MySQLMixin implements DBInterface {
private final Map<String, TableInfo> tables;
private PreparedStatement deleteStagingStatement;
private boolean server_tbl_created = false;
+ private boolean useAsyncStagingUpdate = false;
+ private Object stagingHandlerLock = new Object();
+ private AsyncUpdateHandler stagingHandler = null;
+ private StagingTable currentStaging=null;
public MySQLMixin() {
this.mi = null;
@@ -100,12 +105,35 @@ public class MySQLMixin implements DBInterface {
this.dbName = getDBName(conn);
this.jdbcConn = conn;
this.tables = new HashMap<String, TableInfo>();
+ useAsyncStagingUpdate = Boolean.parseBoolean(info.getProperty(Configuration.KEY_ASYNC_STAGING_TABLE_UPDATE,
+ Configuration.ASYNC_STAGING_TABLE_UPDATE));
+ }
+
+ class StagingTableUpdateRunnable implements Runnable{
+
+ private MySQLMixin mixin;
+ private StagingTable staging;
+
+ StagingTableUpdateRunnable(MySQLMixin mixin, StagingTable staging){
+ this.mixin=mixin;
+ this.staging=staging;
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.mixin.updateStagingTable(staging);
+ } catch (NoSuchFieldException|MDBCServiceException e) {
+ this.mixin.logger.error("Error when updating the staging table");
+ }
+ }
}
private PreparedStatement getStagingDeletePreparedStatement() throws SQLException {
return jdbcConn.prepareStatement("DELETE FROM "+TRANS_TBL+" WHERE (IX BETWEEN ? AND ? ) AND " +
"CONNECTION_ID = ?;");
}
+
// This is used to generate a unique connId for this connection to the DB.
private int generateConnID(Connection conn) {
int rv = (int) System.currentTimeMillis(); // random-ish
@@ -156,12 +184,20 @@ public class MySQLMixin implements DBInterface {
}
return dbname;
}
-
+
+ @Override
public String getDatabaseName() {
return this.dbName;
}
@Override
+ public String getSchema() {return this.dbName;}
+
+ /**
+ * Get a set of the table names in the database.
+ * @return the set
+ */
+ @Override
public Set<String> getSQLTableSet() {
Set<String> set = new TreeSet<String>();
String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
@@ -183,7 +219,7 @@ public class MySQLMixin implements DBInterface {
@Override
public Set<Range> getSQLRangeSet() {
Set<String> set = new TreeSet<String>();
- String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
+ String sql = "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
try {
Statement stmt = jdbcConn.createStatement();
ResultSet rs = stmt.executeQuery(sql);
@@ -244,9 +280,19 @@ mysql> describe tables;
TableInfo ti = tables.get(tableName);
if (ti == null) {
try {
- String tbl = tableName;//.toUpperCase();
- String sql = "SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME='"+tbl+"'";
- ResultSet rs = executeSQLRead(sql);
+ final String[] split = tableName.split("\\.");
+ String tbl = (split.length==2)?split[1]:tableName;
+ String localSchema = (split.length==2)?split[0]:getSchema();
+ StringBuilder sql=new StringBuilder();
+ sql.append("SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=");
+ if(localSchema==null) {
+ sql.append("DATABASE() AND TABLE_NAME='");
+ }
+ else {
+ sql.append("'").append(localSchema).append("' AND TABLE_NAME='");
+ }
+ sql.append(tbl).append("';");
+ ResultSet rs = executeSQLRead(sql.toString());
if (rs != null) {
ti = new TableInfo();
while (rs.next()) {
@@ -296,9 +342,13 @@ mysql> describe tables;
}
}
@Override
- public void createSQLTriggers(String tableName) {
- // Don't create triggers for the table the triggers write into!!!
+ public void createSQLTriggers(String table) {
+ final String[] split = table.split("\\.");
+ String schemaName = (split.length==2)?split[0]:getSchema();
+ String tableName = (split.length==2)?split[1]:table;
+
if (tableName.equals(TRANS_TBL))
+ // Don't create triggers for the table the triggers write into!!!
return;
try {
if (!server_tbl_created) {
@@ -321,12 +371,12 @@ mysql> describe tables;
//msm.register(name);
}
// No SELECT trigger
- executeSQLWrite(generateTrigger(tableName, "INSERT"));
- executeSQLWrite(generateTrigger(tableName, "UPDATE"));
+ executeSQLWrite(generateTrigger(schemaName,tableName, "INSERT"));
+ executeSQLWrite(generateTrigger(schemaName,tableName, "UPDATE"));
//\TODO: save key row instead of the whole row for delete
- executeSQLWrite(generateTrigger(tableName, "DELETE"));
+ executeSQLWrite(generateTrigger(schemaName,tableName, "DELETE"));
} catch (SQLException e) {
- if (e.getMessage().equals("Trigger already exists")) {
+ if (e.getMessage().equals("Trigger already exists") || e.getMessage().endsWith("already exists")){
//only warn if trigger already exists
logger.warn(EELFLoggerDelegate.applicationLogger, "createSQLTriggers" + e);
} else {
@@ -343,7 +393,7 @@ END;
OLD.field refers to the old value
NEW.field refers to the new value
*/
- private String generateTrigger(String tableName, String op) {
+ private String generateTrigger(String schema, String tableName, String op) {
boolean isdelete = op.equals("DELETE");
boolean isinsert = op.equals("INSERT");
boolean isupdate = op.equals("UPDATE");
@@ -371,25 +421,27 @@ NEW.field refers to the new value
//\TODO check if using mysql driver, so instead check the exception
//\TODO add conditional for update, if primary key is still the same, use null in the KEYDATA col
StringBuilder sb = new StringBuilder()
- .append("CREATE TRIGGER ") // IF NOT EXISTS not supported by MySQL!
- .append(String.format("%s_%s", op.substring(0, 1), tableName))
- .append(" AFTER ")
- .append(op)
- .append(" ON ")
- .append(tableName.toUpperCase())
- .append(" FOR EACH ROW INSERT INTO ")
- .append(TRANS_TBL)
- .append(" (TABLENAME, OP, KEYDATA, ROWDATA, CONNECTION_ID) VALUES('")
- .append(tableName.toUpperCase())
- .append("', ")
- .append(isdelete ? "'D'" : (op.equals("INSERT") ? "'I'" : "'U'"))
- .append(", ")
- .append( (keyJson.length()>"JSON_OBJECT()".length()) ? keyJson.toString() : "NULL")
- .append(", ")
- .append(newJson.toString())
- .append(", ")
- .append("CONNECTION_ID()")
- .append(")");
+ .append("CREATE TRIGGER ") // IF NOT EXISTS not supported by MySQL!
+ .append(String.format("%s_%s", op.substring(0, 1), tableName))
+ .append(" AFTER ")
+ .append(op)
+ .append(" ON ")
+ .append(tableName)
+ .append(" FOR EACH ROW INSERT INTO ")
+ .append(TRANS_TBL)
+ .append(" (SCHEMANAME, TABLENAME, OP, KEYDATA, ROWDATA, CONNECTION_ID) VALUES('")
+ .append( (schema==null)?this.getSchema():schema )
+ .append("', '")
+ .append(tableName)
+ .append("', ")
+ .append(isdelete ? "'D'" : (op.equals("INSERT") ? "'I'" : "'U'"))
+ .append(", ")
+ .append( (keyJson.length()>"JSON_OBJECT()".length()) ? keyJson.toString() : "NULL")
+ .append(", ")
+ .append(newJson.toString())
+ .append(", ")
+ .append("CONNECTION_ID()")
+ .append(")");
return sb.toString();
}
private String[] getTriggerNames(String tableName) {
@@ -521,6 +573,16 @@ NEW.field refers to the new value
return rs;
}
+ @Override
+ public void preCommitHook() {
+ synchronized (stagingHandlerLock){
+ //\TODO check if this can potentially block forever in certain scenarios
+ if(stagingHandler!=null){
+ stagingHandler.waitForAllPendingUpdates();
+ }
+ }
+ }
+
/**
* This method executes a write query in the sql database.
* @param sql the SQL to be sent to MySQL
@@ -569,11 +631,24 @@ NEW.field refers to the new value
String[] parts = sql.trim().split(" ");
String cmd = parts[0].toLowerCase();
if ("delete".equals(cmd) || "insert".equals(cmd) || "update".equals(cmd)) {
- try {
- this.updateStagingTable(transactionDigest);
- } catch (NoSuchFieldException|MDBCServiceException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ if (useAsyncStagingUpdate) {
+ synchronized (stagingHandlerLock){
+ if(stagingHandler==null||currentStaging!=transactionDigest){
+ Runnable newRunnable = new StagingTableUpdateRunnable(this, transactionDigest);
+ currentStaging=transactionDigest;
+ stagingHandler=new AsyncUpdateHandler(newRunnable);
+ }
+ //else we can keep using the current staging Handler
+ }
+ stagingHandler.processNewUpdate();
+ } else {
+
+ try {
+ this.updateStagingTable(transactionDigest);
+ } catch (NoSuchFieldException | MDBCServiceException e) {
+ // TODO Auto-generated catch block
+ this.logger.error("Error updating the staging table");
+ }
}
}
}
@@ -604,7 +679,7 @@ NEW.field refers to the new value
throws NoSuchFieldException, MDBCServiceException {
// copy from DB.MDBC_TRANSLOG where connid == myconnid
// then delete from MDBC_TRANSLOG
- String sql2 = "SELECT IX, TABLENAME, OP, ROWDATA,KEYDATA FROM "+TRANS_TBL +" WHERE CONNECTION_ID = " + this.connId;
+ String sql2 = "SELECT IX, SCHEMANAME, TABLENAME, OP, ROWDATA, KEYDATA FROM " + TRANS_TBL +" WHERE CONNECTION_ID = " + this.connId;
Integer biggestIx = Integer.MIN_VALUE;
Integer smallestIx = Integer.MAX_VALUE;
try {
@@ -616,10 +691,11 @@ NEW.field refers to the new value
smallestIx = Integer.min(smallestIx,ix);
String op = rs.getString("OP");
SQLOperation opType = toOpEnum(op);
+ String schema= rs.getString("SCHEMANAME");
String tbl = rs.getString("TABLENAME");
String newRowStr = rs.getString("ROWDATA");
String rowStr = rs.getString("KEYDATA");
- Range range = new Range(tbl);
+ Range range = new Range(schema+"."+tbl);
transactionDigests.addOperation(range,opType,newRowStr,rowStr);
rows.add(ix);
}
@@ -1056,7 +1132,7 @@ NEW.field refers to the new value
private void clearReplayedOperations(Statement jdbcStmt) throws SQLException {
logger.info("Clearing replayed operations");
String sql = "DELETE FROM " + TRANS_TBL + " WHERE CONNECTION_ID = " + this.connId;
- jdbcStmt.executeQuery(sql);
+ jdbcStmt.executeUpdate(sql);
}
@Override
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java
new file mode 100755
index 0000000..0f66731
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java
@@ -0,0 +1,1066 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+package org.onap.music.mdbc.mixins;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.*;
+import java.util.Map.Entry;
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.delete.Delete;
+import net.sf.jsqlparser.statement.insert.Insert;
+import net.sf.jsqlparser.statement.update.Update;
+import org.json.JSONObject;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.Configuration;
+import org.onap.music.mdbc.MDBCUtils;
+import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.TableInfo;
+import org.onap.music.mdbc.mixins.MySQLMixin.StagingTableUpdateRunnable;
+import org.onap.music.mdbc.tables.Operation;
+import org.onap.music.mdbc.query.SQLOperation;
+import org.onap.music.mdbc.tables.StagingTable;
+import org.postgresql.util.PGInterval;
+import org.postgresql.util.PGobject;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+/**
+ * This class provides the methods that MDBC needs in order to mirror data to/from a
+ * <a href="https://dev.mysql.com/">MySQL</a> or <a href="http://mariadb.org/">MariaDB</a> database instance. This class
+ * uses the <code>JSON_OBJECT()</code> database function, which means it requires the following minimum versions of
+ * either database:
+ * <table summary="">
+ * <tr>
+ * <th>DATABASE</th>
+ * <th>VERSION</th>
+ * </tr>
+ * <tr>
+ * <td>MySQL</td>
+ * <td>5.7.8</td>
+ * </tr>
+ * <tr>
+ * <td>MariaDB</td>
+ * <td>10.2.3 (Note: 10.2.3 is currently (July 2017) a <i>beta</i> release)</td>
+ * </tr>
+ * </table>
+ *
+ * @author Robert P. Eby
+ */
+public class PostgresMixin implements DBInterface {
+
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(PostgresMixin.class);
+
+ public static final String MIXIN_NAME = "postgres";
+ public static final String TRANS_TBL_SCHEMA = "audit";
+ public static final String TRANS_TBL = "mdbc_translog";
+
+ private final MusicInterface mi;
+ private final String connId;
+ private final String dbName;
+ private final String schema;
+ private final Connection jdbcConn;
+ private final Map<String, TableInfo> tables;
+ private PreparedStatement deleteStagingStatement;
+ private boolean useAsyncStagingUpdate = false;
+ private Object stagingHandlerLock = new Object();
+ private AsyncUpdateHandler stagingHandler = null;
+ private StagingTable currentStaging = null;
+
+ public PostgresMixin() {
+ this.mi = null;
+ this.connId = "";
+ this.dbName = null;
+ this.schema = null;
+ this.jdbcConn = null;
+ this.tables = null;
+ this.deleteStagingStatement = null;
+ }
+
+ private void initializeDeleteStatement() throws SQLException {
+ deleteStagingStatement = jdbcConn.prepareStatement("DELETE FROM " + TRANS_TBL_SCHEMA + "." + TRANS_TBL
+ + " WHERE (ix BETWEEN ? AND ? ) AND " + "connection_id = ?;");
+ }
+
+ public PostgresMixin(MusicInterface mi, String url, Connection conn, Properties info) throws SQLException {
+ this.mi = mi;
+ this.connId = generateConnID(conn);
+ this.dbName = getDBName(conn);
+ this.schema = getSchema(conn);
+ this.jdbcConn = conn;
+ this.tables = new HashMap<>();
+ useAsyncStagingUpdate = Boolean.parseBoolean(info.getProperty(Configuration.KEY_ASYNC_STAGING_TABLE_UPDATE,
+ Configuration.ASYNC_STAGING_TABLE_UPDATE));
+ initializePostgresTriggersStructures();
+ initializeDeleteStatement();
+ }
+
+ class StagingTableUpdateRunnable implements Runnable {
+
+ private PostgresMixin mixin;
+ private StagingTable staging;
+
+ StagingTableUpdateRunnable(PostgresMixin mixin, StagingTable staging) {
+ this.mixin = mixin;
+ this.staging = staging;
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.mixin.updateStagingTable(staging);
+ } catch (NoSuchFieldException | MDBCServiceException e) {
+ this.mixin.logger.error("Error when updating the staging table");
+ }
+ }
+ }
+
+
+ private void createTriggerTable() throws SQLException {
+ final String createSchemaSQL = "CREATE SCHEMA IF NOT EXISTS " + TRANS_TBL_SCHEMA + ";";
+ final String revokeCreatePrivilegesSQL = "REVOKE CREATE ON schema " + TRANS_TBL_SCHEMA + " FROM public;";
+ final String createTableSQL = "CREATE TABLE IF NOT EXISTS " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " ("
+ + "ix serial," + "op TEXT NOT NULL CHECK (op IN ('I','D','U'))," + "schema_name text NOT NULL,"
+ + "table_name text NOT NULL," + "original_data json," + "new_data json," + "connection_id text,"
+ + "PRIMARY KEY (connection_id,ix)" + ") WITH (fillfactor=100);";
+ final String revokeSQL = "REVOKE INSERT,UPDATE,DELETE,TRUNCATE,REFERENCES,TRIGGER ON " + TRANS_TBL_SCHEMA + "."
+ + TRANS_TBL + " FROM public;";
+ final String grantSelectSQL = "GRANT SELECT ON " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " TO public;";
+ final String createIndexSQL = "CREATE INDEX IF NOT EXISTS logged_actions_connection_id_idx" + " ON "
+ + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " (connection_id);";
+ Map<String, String> sqlStatements = new LinkedHashMap<String, String>() {
+ {
+ put("create_schema", createSchemaSQL);
+ put("revoke_privileges", revokeCreatePrivilegesSQL);
+ put("create_table", createTableSQL);
+ put("revoke_sql", revokeSQL);
+ put("grant_select", grantSelectSQL);
+ put("create_index", createIndexSQL);
+ }
+ };
+ for (Entry<String, String> query : sqlStatements.entrySet()) {
+ int retryCount = 0;
+ boolean ready = false;
+ while (retryCount < 3 && !ready) {
+ try {
+ Statement statement = jdbcConn.createStatement();
+ statement.executeUpdate(query.getValue());
+ if (!jdbcConn.getAutoCommit()) {
+ jdbcConn.commit();
+ }
+ statement.close();
+ ready = true;
+ } catch (SQLException e) {
+ if (e.getMessage().equalsIgnoreCase("ERROR: tuple concurrently updated") || e.getMessage()
+ .toLowerCase().startsWith("error: duplicate key value violates unique constraint")) {
+ logger.warn("Error creating schema, retrying. for " + query.getKey(), e);
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e1) {
+ }
+ } else {
+ logger.error("Error executing " + query.getKey(), e);
+ throw e;
+ }
+ }
+ retryCount++;
+ }
+ }
+ }
+
+ private String updateTriggerSection() {
+ return "IF (TG_OP = 'UPDATE') THEN\n" + "v_old_data := row_to_json(OLD);\n"
+ + "v_new_data := row_to_json(NEW);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA + "." + TRANS_TBL
+ + " (op,schema_name,table_name,original_data,new_data,connection_id)\n"
+ + "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_old_data,v_new_data,pg_backend_pid());\n"
+ + "RETURN NEW; ";
+ }
+
+ private String insertTriggerSection() {
+ // \TODO add additinoal conditional on change "IF NEW IS DISTINCT FROM OLD THEN"
+ return "IF (TG_OP = 'INSERT') THEN\n" + "v_new_data := row_to_json(NEW);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA
+ + "." + TRANS_TBL + " (op,schema_name,table_name,new_data,connection_id)\n"
+ + "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_new_data,pg_backend_pid());\n"
+ + "RETURN NEW; ";
+ }
+
+ private String deleteTriggerSection() {
+ return "IF (TG_OP = 'DELETE') THEN\n" + "v_old_data := row_to_json(OLD);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA
+ + "." + TRANS_TBL + " (op,schema_name,table_name,original_data,connection_id)\n"
+ + "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_old_data,pg_backend_pid());\n"
+ + "RETURN OLD; ";
+ }
+
+ private String functionName(SQLOperation type) {
+ final String functionName = (type.equals(SQLOperation.UPDATE)) ? "if_updated_func"
+ : (type.equals(SQLOperation.INSERT)) ? "if_inserted_func" : "if_deleted_func";
+ return "audit." + functionName + "()";
+ }
+
+ private void createTriggerFunctions(SQLOperation type) throws SQLException {
+ StringBuilder functionSQL =
+ new StringBuilder("CREATE OR REPLACE FUNCTION " + functionName(type) + " RETURNS TRIGGER AS $body$\n"
+ + "DECLARE\n" + "v_old_data json;\n" + "v_new_data json;\n" + "BEGIN\n");
+ switch (type) {
+ case UPDATE:
+ functionSQL.append(updateTriggerSection());
+ break;
+ case INSERT:
+ functionSQL.append(insertTriggerSection());
+ break;
+ case DELETE:
+ functionSQL.append(deleteTriggerSection());
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid operation type for creation of trigger functions");
+ }
+ functionSQL.append("ELSE\n"
+ + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - Other action occurred: %, at %',TG_OP,now();\n"
+ + "RETURN NULL;\n" + "END IF;\n" + "EXCEPTION\n" + "WHEN data_exception THEN\n"
+ + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [DATA EXCEPTION] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n"
+ + "RETURN NULL;\n" + "WHEN unique_violation THEN\n"
+ + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [UNIQUE] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n"
+ + "RETURN NULL;\n" + "WHEN OTHERS THEN\n"
+ + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [OTHER] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n"
+ + "RETURN NULL;\n" + "END;\n" + "$body$\n" + "LANGUAGE plpgsql\n" + "SECURITY DEFINER\n"
+ + "SET search_path = pg_catalog, audit;");
+ int retryCount = 0;
+ boolean ready = false;
+ while (retryCount < 3 && !ready) {
+ try {
+ executeSQLWrite(functionSQL.toString());
+ ready = true;
+ } catch (SQLException e) {
+ if (e.getMessage().equalsIgnoreCase("ERROR: tuple concurrently updated")) {
+ logger.warn("Error creating schema, retrying. ", e);
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e1) {
+ }
+ } else {
+ logger.error("Error executing creation of trigger function", e);
+ throw e;
+ }
+ }
+ retryCount++;
+ }
+ }
+
+ private void initializePostgresTriggersStructures() throws SQLException {
+ try {
+ createTriggerTable();
+ } catch (SQLException e) {
+ logger.error("Error creating the trigger tables in postgres", e);
+ throw e;
+ }
+ try {
+ createTriggerFunctions(SQLOperation.INSERT);
+ createTriggerFunctions(SQLOperation.UPDATE);
+ createTriggerFunctions(SQLOperation.DELETE);
+ } catch (SQLException e) {
+ logger.error("Error creating the trigger functions in postgres", e);
+ throw e;
+ }
+ }
+
+ // This is used to generate a unique connId for this connection to the DB.
+ private String generateConnID(Connection conn) {
+ String rv = Integer.toString((int) System.currentTimeMillis()); // random-ish
+ try {
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT pg_backend_pid() AS IX");
+ if (rs.next()) {
+ rv = rs.getString("IX");
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "generateConnID: problem generating a connection ID!");
+ }
+ return rv;
+ }
+
+ /**
+ * Get the name of this DBnterface mixin object.
+ *
+ * @return the name
+ */
+ @Override
+ public String getMixinName() {
+ return MIXIN_NAME;
+ }
+
+ @Override
+ public void close() {
+ // nothing yet
+ }
+
+ /**
+ * Determines the db name associated with the connection This is the private/internal method that actually
+ * determines the name
+ *
+ * @param conn
+ * @return
+ */
+ private String getDBName(Connection conn) {
+ String dbname = "mdbc"; // default name
+ try {
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT current_database();");
+ if (rs.next()) {
+ dbname = rs.getString("current_database");
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "getDBName: problem getting database name from mysql");
+ }
+ return dbname;
+ }
+
+ /**
+ * Determines the db name associated with the connection This is the private/internal method that actually
+ * determines the name
+ *
+ * @param conn
+ * @return
+ */
+ private String getSchema(Connection conn) {
+ String schema = "public"; // default name
+ try {
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT current_schema();");
+ if (rs.next()) {
+ schema = rs.getString("current_schema");
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "getDBName: problem getting database name from mysql");
+ }
+ return schema;
+ }
+
+ @Override
+ public String getSchema() {
+ return schema;
+ }
+
+ @Override
+ public String getDatabaseName() {
+ return this.dbName;
+ }
+
+ /**
+ * Get a set of the table names in the database.
+ *
+ * @return the set
+ */
+ @Override
+ public Set<Range> getSQLRangeSet() {
+ Set<String> set = new TreeSet<String>();
+ String sql =
+ "SELECT table_name FROM information_schema.tables WHERE table_type='BASE TABLE' AND table_schema=current_schema();";
+ try {
+ Statement stmt = jdbcConn.createStatement();
+ ResultSet rs = stmt.executeQuery(sql);
+ while (rs.next()) {
+ String s = rs.getString("table_name");
+ set.add(s);
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "getSQLTableSet: " + e);
+ }
+ logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set);
+ Set<Range> rangeSet = new HashSet<>();
+ for (String table : set) {
+ rangeSet.add(new Range(table));
+ }
+ return rangeSet;
+ }
+
+ /**
+ * Return a TableInfo object for the specified table. This method first looks in a cache of previously constructed
+ * TableInfo objects for the table. If not found, it queries the INFORMATION_SCHEMA.COLUMNS table to obtain the
+ * column names, types, and indexes of the table. It creates a new TableInfo object with the results.
+ *
+ * @param tableName the table to look up
+ * @return a TableInfo object containing the info we need, or null if the table does not exist
+ */
+ @Override
+ public TableInfo getTableInfo(String tableName) {
+ // \TODO: invalidate "tables" when a table schema is modified (uncommon), but needs to be handled
+ TableInfo ti = tables.get(tableName);
+ if (ti == null) {
+ try {
+ String tbl, localSchema;
+ final String[] split = tableName.split("\\.");
+ if (split.length == 2) {
+ localSchema = split[0];
+ tbl = split[1];
+ } else {
+ tbl = tableName;
+ localSchema = this.schema;
+ }
+ String sql;
+ if (schema == null) {
+ sql = "select column_name, data_type from information_schema.columns where table_schema=current_schema() and table_name='"
+ + tbl + "';";
+ } else {
+ sql = "select column_name, data_type from information_schema.columns where table_schema='"
+ + localSchema + "' and table_name='" + tbl + "';";
+ }
+ ResultSet rs = executeSQLRead(sql);
+ if (rs != null) {
+ ti = new TableInfo();
+ while (rs.next()) {
+ String name = rs.getString("column_name");
+ String type = rs.getString("data_type");
+ ti.columns.add(name);
+ ti.coltype.add(mapDatatypeNameToType(type));
+ }
+ rs.getStatement().close();
+ } else {
+ logger.error(EELFLoggerDelegate.errorLogger,
+ "Cannot retrieve table info for table " + tableName + " from POSTGRES.");
+ return null;
+ }
+ final String keysql =
+ "SELECT a.attname as column_name FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid"
+ + " AND a.attnum = ANY(i.indkey) WHERE i.indrelid = '" + tbl + "'::regclass "
+ + " AND i.indisprimary;";
+ ResultSet rs2 = executeSQLRead(keysql);
+ Set<String> keycols = new HashSet<>();
+ if (rs2 != null) {
+ while (rs2.next()) {
+ String name = rs2.getString("column_name");
+ keycols.add(name);
+ }
+ rs2.getStatement().close();
+ } else {
+ logger.error(EELFLoggerDelegate.errorLogger,
+ "Cannot retrieve table info for table " + tableName + " from MySQL.");
+ }
+ for (String col : ti.columns) {
+ if (keycols.contains(col)) {
+ ti.iskey.add(true);
+ } else {
+ ti.iskey.add(false);
+ }
+ }
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,
+ "Cannot retrieve table info for table " + tableName + " from MySQL: " + e);
+ return null;
+ }
+ tables.put(tableName, ti);
+ }
+ return ti;
+ }
+
+ // Map Postgres data type names to the java.sql.Types equivalent
+ private int mapDatatypeNameToType(String nm) {
+ switch (nm) {
+ case "character":
+ return Types.CHAR;
+ case "national character":
+ return Types.NCHAR;
+ case "character varying":
+ return Types.VARCHAR;
+ case "national character varying":
+ return Types.NVARCHAR;
+ case "text":
+ return Types.VARCHAR;
+ case "bytea":
+ return Types.BINARY;
+ case "smallint":
+ return Types.SMALLINT;
+ case "integer":
+ return Types.INTEGER;
+ case "bigint":
+ return Types.BIGINT;
+ case "smallserial":
+ return Types.SMALLINT;
+ case "serial":
+ return Types.INTEGER;
+ case "bigserial":
+ return Types.BIGINT;
+ case "real":
+ return Types.REAL;
+ case "double precision":
+ return Types.DOUBLE;
+ case "numeric":
+ return Types.NUMERIC;
+ case "decimal":
+ return Types.DECIMAL;
+ case "date":
+ return Types.DATE;
+ case "time with time zone":
+ return Types.TIME_WITH_TIMEZONE;
+ case "time without time zone":
+ return Types.TIME;
+ case "timestamp without time zone":
+ return Types.TIMESTAMP;
+ case "timestamp with time zone":
+ return Types.TIMESTAMP_WITH_TIMEZONE;
+ case "boolean":
+ return Types.BIT;
+ case "bit":
+ return Types.BIT;
+ case "oid":
+ return Types.BIGINT;
+ case "xml":
+ return Types.SQLXML;
+ case "array":
+ return Types.ARRAY;
+ case "tinyint":
+ return Types.TINYINT;
+ case "uuid":
+ case "money":
+ case "interval":
+ case "bit varying":
+ case "box":
+ case "point":
+ case "lseg":
+ case "path":
+ case "polygon":
+ case "circle":
+ case "json":
+ case "inet":
+ case "cidr":
+ case "macaddr":
+ case "tsvector":
+ case "tsquery":
+ return Types.OTHER;
+ default:
+ logger.error(EELFLoggerDelegate.errorLogger, "unrecognized and/or unsupported data type " + nm);
+ return Types.VARCHAR;
+ }
+ }
+
+ @Override
+ public void createSQLTriggers(String tableName) {
+ // Don't create triggers for the table the triggers write into!!!
+ if (tableName.equals(TRANS_TBL) || tableName.equals(TRANS_TBL_SCHEMA + "." + TRANS_TBL))
+ return;
+ try {
+ // No SELECT trigger
+ executeSQLWrite(generateTrigger(tableName, SQLOperation.INSERT));
+ executeSQLWrite(generateTrigger(tableName, SQLOperation.DELETE));
+ executeSQLWrite(generateTrigger(tableName, SQLOperation.UPDATE));
+ } catch (SQLException e) {
+ if (e.getMessage().trim().endsWith("already exists")) {
+ // only warn if trigger already exists
+ logger.warn(EELFLoggerDelegate.applicationLogger, "createSQLTriggers" + e);
+ } else {
+ logger.error(EELFLoggerDelegate.errorLogger, "createSQLTriggers: " + e);
+ }
+ }
+ }
+
+ private String generateTrigger(String tableName, SQLOperation op) {
+ StringBuilder triggerSql = new StringBuilder("CREATE TRIGGER ").append(getTriggerName(tableName, op))
+ .append(" AFTER " + op + " ON ").append(tableName).append(" FOR EACH ROW EXECUTE PROCEDURE ")
+ .append(functionName(op)).append(';');
+ return triggerSql.toString();
+ }
+
+ private String getTriggerName(String tableName, SQLOperation op) {
+ switch (op) {
+ case DELETE:
+ return "D_" + tableName;
+ case UPDATE:
+ return "U_" + tableName;
+ case INSERT:
+ return "I_" + tableName;
+ default:
+ throw new IllegalArgumentException("Invalid option in trigger operation type");
+ }
+ }
+
+ private String[] getTriggerNames(String tableName) {
+ return new String[] {getTriggerName(tableName, SQLOperation.INSERT),
+ getTriggerName(tableName, SQLOperation.DELETE), getTriggerName(tableName, SQLOperation.UPDATE),};
+ }
+
+ @Override
+ public void dropSQLTriggers(String tableName) {
+ try {
+ for (String name : getTriggerNames(tableName)) {
+ logger.debug(EELFLoggerDelegate.applicationLogger, "REMOVE trigger " + name + " from postgres");
+ executeSQLWrite("DROP TRIGGER IF EXISTS " + name + ";");
+ }
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "dropSQLTriggers: " + e);
+ }
+ }
+
+ @Override
+ public void insertRowIntoSqlDb(String tableName, Map<String, Object> map) {
+ throw new org.apache.commons.lang.NotImplementedException("Function not implemented yet in postgres");
+ }
+
+ @Override
+ public void deleteRowFromSqlDb(String tableName, Map<String, Object> map) {
+ throw new org.apache.commons.lang.NotImplementedException("Function not implemented yet in postgres");
+ }
+
+ /**
+ * This method executes a read query in the SQL database. Methods that call this method should be sure to call
+ * resultset.getStatement().close() when done in order to free up resources.
+ *
+ * @param sql the query to run
+ * @return a ResultSet containing the rows returned from the query
+ */
+ @Override
+ public ResultSet executeSQLRead(String sql) {
+ logger.debug(EELFLoggerDelegate.applicationLogger, "Executing sql read in postgres");
+ logger.debug("Executing SQL read:" + sql);
+ ResultSet rs;
+ try {
+ Statement stmt = jdbcConn.createStatement();
+ rs = stmt.executeQuery(sql);
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "executeSQLRead" + e);
+ return null;
+ }
+ return rs;
+ }
+
+ /**
+ * This method executes a write query in the sql database.
+ *
+ * @param sql the SQL to be sent to MySQL
+ * @throws SQLException if an underlying JDBC method throws an exception
+ */
+ protected void executeSQLWrite(String sql) throws SQLException {
+ logger.debug(EELFLoggerDelegate.applicationLogger, "Executing SQL write:" + sql);
+ Statement stmt = jdbcConn.createStatement();
+ stmt.execute(sql);
+ stmt.close();
+ }
+
+ @Override
+ public void preCommitHook() {
+ synchronized (stagingHandlerLock) {
+ // \TODO check if this can potentially block forever in certain scenarios
+ if (stagingHandler != null) {
+ stagingHandler.waitForAllPendingUpdates();
+ }
+ }
+ }
+
+ /**
+ * Code to be run within the DB driver before a SQL statement is executed. This is where tables can be synchronized
+ * before a SELECT, for those databases that do not support SELECT triggers.
+ *
+ * @param sql the SQL statement that is about to be executed
+ * @return list of keys that will be updated, if they can't be determined afterwards (i.e. sql table doesn't have
+ * primary key)
+ */
+ @Override
+ public void preStatementHook(final String sql) {
+ if (sql == null) {
+ return;
+ }
+ // \TODO: check if anything needs to be executed here for postgres
+ }
+
+ /**
+ * Code to be run within the DB driver after a SQL statement has been executed. This is where remote statement
+ * actions can be copied back to Cassandra/MUSIC.
+ *
+ * @param sql the SQL statement that was executed
+ */
+ @Override
+ public void postStatementHook(final String sql, StagingTable transactionDigest) {
+ if (sql != null) {
+ String[] parts = sql.trim().split(" ");
+ String cmd = parts[0].toLowerCase();
+ if ("delete".equals(cmd) || "insert".equals(cmd) || "update".equals(cmd)) {
+ if (useAsyncStagingUpdate) {
+ synchronized (stagingHandlerLock) {
+ if (stagingHandler == null || currentStaging != transactionDigest) {
+ Runnable newRunnable =
+ new PostgresMixin.StagingTableUpdateRunnable(this, transactionDigest);
+ currentStaging = transactionDigest;
+ stagingHandler = new AsyncUpdateHandler(newRunnable);
+ }
+ // else we can keep using the current staging Handler
+ }
+ stagingHandler.processNewUpdate();
+ } else {
+
+ try {
+ this.updateStagingTable(transactionDigest);
+ } catch (NoSuchFieldException | MDBCServiceException e) {
+ // TODO Auto-generated catch block
+ this.logger.error("Error updating the staging table");
+ }
+ }
+ }
+ }
+ }
+
+ private SQLOperation toOpEnum(String operation) throws NoSuchFieldException {
+ switch (operation.toLowerCase()) {
+ case "i":
+ return SQLOperation.INSERT;
+ case "d":
+ return SQLOperation.DELETE;
+ case "u":
+ return SQLOperation.UPDATE;
+ case "s":
+ return SQLOperation.SELECT;
+ default:
+ logger.error(EELFLoggerDelegate.errorLogger, "Invalid operation selected: [" + operation + "]");
+ throw new NoSuchFieldException("Invalid operation enum");
+ }
+
+ }
+
+ /**
+ * Copy data that is in transaction table into music interface
+ *
+ * @param transactionDigests
+ * @throws NoSuchFieldException
+ */
+ private void updateStagingTable(StagingTable transactionDigests) throws NoSuchFieldException, MDBCServiceException {
+ String selectSql = "select ix, op, schema_name, table_name, original_data,new_data FROM " + TRANS_TBL_SCHEMA
+ + "." + TRANS_TBL + " where connection_id = '" + this.connId + "';";
+ Integer biggestIx = Integer.MIN_VALUE;
+ Integer smallestIx = Integer.MAX_VALUE;
+ try {
+ ResultSet rs = executeSQLRead(selectSql);
+ Set<Integer> rows = new TreeSet<Integer>();
+ while (rs.next()) {
+ int ix = rs.getInt("ix");
+ biggestIx = Integer.max(biggestIx, ix);
+ smallestIx = Integer.min(smallestIx, ix);
+ String op = rs.getString("op");
+ SQLOperation opType = toOpEnum(op);
+ String schema = rs.getString("schema_name");
+ String tbl = rs.getString("table_name");
+ String original = rs.getString("original_data");
+ String newData = rs.getString("new_data");
+ Range range = new Range(schema + "." + tbl);
+ transactionDigests.addOperation(range, opType, newData, original);
+ rows.add(ix);
+ }
+ rs.getStatement().close();
+ if (rows.size() > 0) {
+ logger.debug("Staging delete: Executing with vals [" + smallestIx + "," + biggestIx + "," + this.connId
+ + "]");
+ this.deleteStagingStatement.setInt(1, smallestIx);
+ this.deleteStagingStatement.setInt(2, biggestIx);
+ this.deleteStagingStatement.setString(3, this.connId);
+ this.deleteStagingStatement.execute();
+ }
+ } catch (SQLException e) {
+ logger.warn("Exception in postStatementHook: " + e);
+ e.printStackTrace();
+ }
+ }
+
+
+
+ /**
+ * Update music with data from MySQL table
+ *
+ * @param tableName - name of table to update in music
+ */
+ @Override
+ public void synchronizeData(String tableName) {}
+
+ /**
+ * Return a list of "reserved" names, that should not be used by MySQL client/MUSIC These are reserved for mdbc
+ */
+ @Override
+ public List<String> getReservedTblNames() {
+ ArrayList<String> rsvdTables = new ArrayList<String>();
+ rsvdTables.add(TRANS_TBL_SCHEMA + "." + TRANS_TBL);
+ rsvdTables.add(TRANS_TBL);
+ // Add others here as necessary
+ return rsvdTables;
+ }
+
+ @Override
+ public String getPrimaryKey(String sql, String tableName) {
+ return null;
+ }
+
+ /**
+ * Parse the transaction digest into individual events
+ *
+ * @param transaction - base 64 encoded, serialized digest
+ */
+ public void replayTransaction(StagingTable transaction, List<Range> ranges)
+ throws SQLException, MDBCServiceException {
+ boolean autocommit = jdbcConn.getAutoCommit();
+ jdbcConn.setAutoCommit(false);
+ Statement jdbcStmt = jdbcConn.createStatement();
+ final ArrayList<Operation> opList = transaction.getOperationList();
+
+ for (Operation op : opList) {
+ if (Range.overlaps(ranges, op.getTable())) {
+ try {
+ replayOperationIntoDB(jdbcStmt, op);
+ } catch (SQLException | MDBCServiceException e) {
+ // rollback transaction
+ logger.error("Unable to replay: " + op.getOperationType() + "->" + op.getVal() + "."
+ + "Rolling back the entire digest replay.");
+ jdbcConn.rollback();
+ throw e;
+ }
+ }
+ }
+
+ clearReplayedOperations(jdbcStmt);
+ jdbcConn.commit();
+ jdbcStmt.close();
+ jdbcConn.setAutoCommit(autocommit);
+ }
+
+ @Override
+ public void disableForeignKeyChecks() throws SQLException {
+ Statement disable = jdbcConn.createStatement();
+ disable.execute("SET session_replication_role = 'replica';");
+ disable.closeOnCompletion();
+ }
+
+ @Override
+ public void enableForeignKeyChecks() throws SQLException {
+ Statement enable = jdbcConn.createStatement();
+ enable.execute("SET session_replication_role = 'origin';");
+ enable.closeOnCompletion();
+ }
+
+ @Override
+ public void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException {
+ replayTransaction(txDigest, ranges);
+ }
+
+ /**
+ * Replays operation into database, usually from txDigest
+ *
+ * @param jdbcStmt: Connection used to perform the replay
+ * @param op: operation to be replayed
+ * @throws SQLException
+ * @throws MDBCServiceException
+ */
+ private void replayOperationIntoDB(Statement jdbcStmt, Operation op) throws SQLException, MDBCServiceException {
+ logger.debug("Replaying Operation: " + op.getOperationType() + "->" + op.getVal());
+ JSONObject newVal = op.getVal();
+ JSONObject oldVal = null;
+ try {
+ oldVal = op.getKey();
+ } catch (MDBCServiceException e) {
+ // Ignore exception, in postgres the structure of the operation is different
+ }
+
+
+ TableInfo ti = getTableInfo(op.getTable());
+ final List<String> keyColumns = ti.getKeyColumns();
+
+ // build and replay the queries
+ String sql = constructSQL(op, keyColumns, newVal, oldVal);
+ if (sql == null)
+ return;
+
+ try {
+ logger.debug("Replaying operation: " + sql);
+ int updated = jdbcStmt.executeUpdate(sql);
+
+ if (updated == 0) {
+ // This applies only for replaying transactions involving Eventually Consistent tables
+ logger.warn(
+ "Error Replaying operation: " + sql + "; Replacing insert/replace/viceversa and replaying ");
+
+ buildAndExecuteSQLInverse(jdbcStmt, op, keyColumns, newVal, oldVal);
+ }
+ } catch (SQLException sqlE) {
+ // This applies for replaying transactions involving Eventually Consistent tables
+ // or transactions that replay on top of existing keys
+ logger.warn(
+ "Error Replaying operation: " + sql + ";" + "Replacing insert/replace/viceversa and replaying ");
+
+ buildAndExecuteSQLInverse(jdbcStmt, op, keyColumns, newVal, oldVal);
+
+ }
+ }
+
+ protected void buildAndExecuteSQLInverse(Statement jdbcStmt, Operation op, List<String> keyColumns,
+ JSONObject newVals, JSONObject oldVals) throws SQLException, MDBCServiceException {
+ String sqlInverse = constructSQLInverse(op, keyColumns, newVals, oldVals);
+ if (sqlInverse == null)
+ return;
+ logger.debug("Replaying operation: " + sqlInverse);
+ jdbcStmt.executeUpdate(sqlInverse);
+ }
+
+ protected String constructSQLInverse(Operation op, List<String> keyColumns, JSONObject newVals, JSONObject oldVals)
+ throws MDBCServiceException {
+ String sqlInverse = null;
+ switch (op.getOperationType()) {
+ case INSERT:
+ sqlInverse = constructUpdate(op.getTable(), keyColumns, newVals, oldVals);
+ break;
+ case UPDATE:
+ sqlInverse = constructInsert(op.getTable(), newVals);
+ break;
+ default:
+ break;
+ }
+ return sqlInverse;
+ }
+
+ protected String constructSQL(Operation op, List<String> keyColumns, JSONObject newVals, JSONObject oldVals)
+ throws MDBCServiceException {
+ String sql = null;
+ switch (op.getOperationType()) {
+ case INSERT:
+ sql = constructInsert(op.getTable(), newVals);
+ break;
+ case UPDATE:
+ sql = constructUpdate(op.getTable(), keyColumns, newVals, oldVals);
+ break;
+ case DELETE:
+ sql = constructDelete(op.getTable(), keyColumns, oldVals);
+ break;
+ case SELECT:
+ // no update happened, do nothing
+ break;
+ default:
+ logger.error(op.getOperationType() + "not implemented for replay");
+ }
+ return sql;
+ }
+
+ private String constructDelete(String tableName, List<String> keyColumns, JSONObject oldVals)
+ throws MDBCServiceException {
+ if (oldVals == null) {
+ throw new MDBCServiceException("Trying to update row with an empty old val exception");
+ }
+ StringBuilder sql = new StringBuilder();
+ sql.append("DELETE FROM ");
+ sql.append(tableName + " WHERE ");
+ sql.append(getPrimaryKeyConditional(keyColumns, oldVals));
+ sql.append(";");
+ return sql.toString();
+ }
+
+ private String constructInsert(String tableName, JSONObject newVals) {
+ StringBuilder keys = new StringBuilder();
+ StringBuilder vals = new StringBuilder();
+ String sep = "";
+ for (String col : newVals.keySet()) {
+ keys.append(sep + col);
+ vals.append(sep + "'" + newVals.get(col) + "'");
+ sep = ", ";
+ }
+ StringBuilder sql = new StringBuilder();
+ sql.append("INSERT INTO ").append(tableName + " (").append(keys).append(") VALUES (").append(vals).append(");");
+ return sql.toString();
+ }
+
+ private String constructUpdate(String tableName, List<String> keyColumns, JSONObject newVals, JSONObject oldVals)
+ throws MDBCServiceException {
+ if (oldVals == null) {
+ throw new MDBCServiceException("Trying to update row with an empty old val exception");
+ }
+ StringBuilder sql = new StringBuilder();
+ sql.append("UPDATE ").append(tableName).append(" SET ");
+ String sep = "";
+ for (String key : newVals.keySet()) {
+ sql.append(sep).append(key).append("=\"").append(newVals.get(key)).append("\"");
+ sep = ", ";
+ }
+ sql.append(" WHERE ");
+ sql.append(getPrimaryKeyConditional(keyColumns, oldVals));
+ sql.append(";");
+ return sql.toString();
+ }
+
+ /**
+ * Create an SQL string for AND'ing all of the primary keys
+ *
+ * @param keyColumns list with the name of the columns that are key
+ * @param vals json with the contents of the old row
+ * @return string in the form of PK1=Val1 AND PK2=Val2 AND PK3=Val3
+ */
+ private String getPrimaryKeyConditional(List<String> keyColumns, JSONObject vals) {
+ StringBuilder keyCondStmt = new StringBuilder();
+ String and = "";
+ for (String key : keyColumns) {
+ // We cannot use the default primary key for the sql table and operations
+ if (!key.equals(mi.getMusicDefaultPrimaryKeyName())) {
+ Object val = vals.get(key);
+ keyCondStmt.append(and + key + "=\"" + val + "\"");
+ and = " AND ";
+ }
+ }
+ return keyCondStmt.toString();
+ }
+
+ /**
+ * Cleans out the transaction table, removing the replayed operations
+ *
+ * @param jdbcStmt
+ * @throws SQLException
+ */
+ private void clearReplayedOperations(Statement jdbcStmt) throws SQLException {
+ logger.info("Clearing replayed operations");
+ String sql =
+ "DELETE FROM " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " WHERE CONNECTION_ID = '" + this.connId + "';";
+ jdbcStmt.executeUpdate(sql);
+ }
+
+ @Override
+ public Connection getSQLConnection() {
+ return jdbcConn;
+ }
+
+ @Override
+ public Set<String> getSQLTableSet() {
+ Set<String> set = new TreeSet<String>();
+ String sql =
+ "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=CURRENT_SCHEMA() AND TABLE_TYPE='BASE TABLE'";
+ try {
+ Statement stmt = jdbcConn.createStatement();
+ ResultSet rs = stmt.executeQuery(sql);
+ while (rs.next()) {
+ String s = rs.getString("TABLE_NAME");
+ set.add(s);
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "getSQLTableSet: " + e);
+ System.out.println("getSQLTableSet: " + e);
+ e.printStackTrace();
+ }
+ logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set);
+ System.out.println("getSQLTableSet returning: " + set);
+ return set;
+ }
+
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java
index 86088f9..171ad79 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java
@@ -190,28 +190,6 @@ public class Utils {
}
return list;
}
-
- public static void registerDefaultDrivers() {
- Properties pr = null;
- try {
- pr = new Properties();
- pr.load(Utils.class.getResourceAsStream("/mdbc.properties"));
- }
- catch (IOException e) {
- logger.error("Could not load property file > " + e.getMessage());
- }
-
- @SuppressWarnings("unused")
- List<Class<?>> list = new ArrayList<Class<?>>();
- String drivers = pr.getProperty("DEFAULT_DRIVERS");
- for (String driver: drivers.split("[ ,]")) {
- logger.info(EELFLoggerDelegate.applicationLogger, "Registering jdbc driver '" + driver + "'");
- try {
- @SuppressWarnings("unused")
- Class<?> cl = Class.forName(driver.trim());
- } catch (ClassNotFoundException e) {
- logger.error(EELFLoggerDelegate.errorLogger,"Driver class "+driver+" not found.");
- }
- }
- }
+
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
index 4f3a3bf..5c6fae4 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
@@ -89,9 +89,10 @@ public class MusicTxDigestDaemon implements Runnable {
logger.error("Music interface or DB interface is null in background daemon");
return;
}
+ MdbcConnection conn = null;
while (true) {
try {
- MdbcConnection conn = (MdbcConnection) stateManager.getConnection("daemon");
+ conn = (MdbcConnection) stateManager.getConnection("daemon");
if (conn == null) {
logger.error("Connection created is null in background daemon");
return;
@@ -111,18 +112,20 @@ public class MusicTxDigestDaemon implements Runnable {
}
//2) for each partition I don't own
final Set<Range> warmupRanges = stateManager.getRangesToWarmup();
- final List<DatabasePartition> currentPartitions = stateManager.getPartitions();
- List<Range> missingRanges = new ArrayList<>();
- if (currentPartitions.size() != 0) {
- for (DatabasePartition part : currentPartitions) {
- List<Range> partitionRanges = part.getSnapshot();
- warmupRanges.removeAll(partitionRanges);
- }
- try {
- stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
- } catch (MDBCServiceException e) {
- logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
- continue;
+ if (warmupRanges!=null) {
+ final List<DatabasePartition> currentPartitions = stateManager.getPartitions();
+ List<Range> missingRanges = new ArrayList<>();
+ if (currentPartitions.size() != 0) {
+ for (DatabasePartition part : currentPartitions) {
+ List<Range> partitionRanges = part.getSnapshot();
+ warmupRanges.removeAll(partitionRanges);
+ }
+ try {
+ stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
+ } catch (MDBCServiceException e) {
+ logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
+ continue;
+ }
}
}
@@ -138,6 +141,12 @@ public class MusicTxDigestDaemon implements Runnable {
} catch (InterruptedException | SQLException e) {
logger.error("MusicTxDigest background daemon stopped " + e.getMessage(), e);
Thread.currentThread().interrupt();
+ } finally {
+ try {
+ if (conn!=null && !conn.isClosed()) conn.close();
+ } catch (SQLException e) {
+ logger.error("MusicTxDigest background daemon error closing" + e.getMessage(), e);
+ }
}
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
index 8fa49a9..db9e455 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
@@ -22,10 +22,18 @@ package org.onap.music.mdbc.tables;
import java.util.UUID;
public final class MusicTxDigestId {
+ public final UUID mriId;
public final UUID transactionId;
public final int index;
+ public MusicTxDigestId(UUID mriRowId, UUID digestId, int index) {
+ this.mriId=mriRowId;
+ this.transactionId= digestId;
+ this.index=index;
+ }
+
public MusicTxDigestId(UUID digestId, int index) {
+ this.mriId = null;
this.transactionId= digestId;
this.index=index;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
index d406ad3..3258d0f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
@@ -66,7 +66,7 @@ public final class Operation implements Serializable{
}
public JSONObject getKey() throws MDBCServiceException {
- if(KEY==null){
+ if(KEY==null||KEY.isEmpty()){
throw new MDBCServiceException("This operation ["+TYPE.toString()+"] doesn't contain a key");
}
JSONObject keys = new JSONObject(new JSONTokener(KEY));
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
index 4fdd7ac..4ef9d30 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
@@ -186,7 +186,10 @@ public class StagingTable {
}
OpType newType = (type==SQLOperation.INSERT)?OpType.INSERT:(type==SQLOperation.DELETE)?
OpType.DELETE:OpType.UPDATE;
- Row.Builder rowBuilder = Row.newBuilder().setTable(range.getTable()).setType(newType).setVal(newVal);
+ Row.Builder rowBuilder = Row.newBuilder().setTable(range.getTable()).setType(newType);
+ if(newVal!=null) {
+ rowBuilder.setVal(newVal);
+ }
if(keys!=null){
rowBuilder.setKey(keys);
}
@@ -252,10 +255,10 @@ public class StagingTable {
}
synchronized public boolean isEventualEmpty() {
- return (eventuallyBuilder.getRowsCount()==0);
+ return (eventuallyBuilder!=null) && (eventuallyBuilder.getRowsCount()==0);
}
-
- synchronized public void clear() throws MDBCServiceException {
+
+ synchronized public void clear() throws MDBCServiceException {
if(!builderInitialized){
throw new MDBCServiceException("This type of staging table is unmutable, please use the constructor"
+ "with no parameters");
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java
index 7624201..f6239d1 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java
@@ -24,6 +24,8 @@ import org.onap.music.mdbc.configurations.NodeConfiguration;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.UUID;
public class CreatePartition {
@@ -51,7 +53,9 @@ public class CreatePartition {
}
public void convert(){
- config = new NodeConfiguration(tables, eventual,UUID.fromString(mriIndex),"test","");
+ String[] tablesArray=eventual.split(",");
+ ArrayList<String> eventualTables = (new ArrayList<>(Arrays.asList(tablesArray)));
+ config = new NodeConfiguration(tables, eventualTables,UUID.fromString(mriIndex),"test","");
}
public void saveToFile(){
diff --git a/mdbc-server/src/main/resources/mdbc.properties b/mdbc-server/src/main/resources/mdbc.properties
index 73e8f77..49fdfd2 100755
--- a/mdbc-server/src/main/resources/mdbc.properties
+++ b/mdbc-server/src/main/resources/mdbc.properties
@@ -4,10 +4,15 @@
MIXINS= \
org.onap.music.mdbc.mixins.MySQLMixin \
org.onap.music.mdbc.mixins.MusicMixin \
- org.onap.music.mdbc.mixins.Music2Mixin
+ org.onap.music.mdbc.mixins.Music2Mixin \
+ org.onap.music.mdbc.mixins.PostgresMixin
+
+DEFAULT_DB_MIXIN= mysql
+
+DEFAULT_MUSIC_MIXIN= cassandra2
DEFAULT_DRIVERS=\
- org.h2.Driver \
- com.mysql.jdbc.Driver
+ org.mariadb.jdbc.Driver \
+ org.postgresql.Driver
-txdaemonsleeps=15 \ No newline at end of file
+txdaemonsleeps=15
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java
index 4703d0e..87f8445 100755
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java
@@ -36,7 +36,7 @@ public class MDBCUtilsTest {
public void toStringTest1() {
StagingTable table = new StagingTable();
try {
- table.addOperation(new Range("TABLE1"),SQLOperation.INSERT,(new JSONObject(new String[]{"test3", "Test4"})).toString(),null);
+ table.addOperation(new Range("TEST.TABLE1"),SQLOperation.INSERT,(new JSONObject(new String[]{"test3", "Test4"})).toString(),null);
} catch (MDBCServiceException e) {
fail();
}
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java
new file mode 100644
index 0000000..72ec8d3
--- /dev/null
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java
@@ -0,0 +1,280 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import ch.vorburger.exec.ManagedProcessException;
+import ch.vorburger.mariadb4j.DB;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.opentable.db.postgres.embedded.EmbeddedPostgres;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+import javax.sql.DataSource;
+import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.onap.music.datastore.MusicDataStore;
+import org.onap.music.datastore.MusicDataStoreHandle;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.lockingservice.cassandra.CassaLockStore;
+import org.onap.music.mdbc.mixins.MusicMixin;
+import org.onap.music.mdbc.mixins.PostgresMixin;
+
+public class MdbcTestUtils {
+
+ // Postgres variables
+ static EmbeddedPostgres pg=null;
+ static DataSource postgresDatabase=null;
+ final private static int postgresPort = 13307;
+ @Rule
+ public static TemporaryFolder tf = new TemporaryFolder();
+
+ // Cassandra variables
+ //Properties used to connect to music
+ private static Cluster cluster;
+ private static Session session;
+
+ //Mdbc variables
+ final private static String keyspace="metricmusictest";
+ final private static String mdbcServerName = "name";
+ final private static String mtdTableName = "musictxdigest";
+ final private static String eventualMtxdTableName = "musicevetxdigest";
+ final private static String mriTableName = "musicrangeinformation";
+ final private static String rangeDependencyTableName = "musicrangedependency";
+ final private static String nodeInfoTableName = "nodeinfo";
+ //Mariadb variables
+ static DB db=null;
+ final public static String mariaDBDatabaseName="test";
+ final static Integer mariaDbPort=13306;
+
+
+
+ public enum DBType {POSTGRES, MySQL}
+
+ public static String getCassandraUrl(){
+ return cluster.getMetadata().getAllHosts().iterator().next().getAddress().toString();
+
+ }
+
+ public static String getKeyspace(){
+ return keyspace;
+ }
+
+ public static String getServerName(){
+ return mdbcServerName;
+ }
+
+ public static String getMriTableName(){
+ return mriTableName;
+ }
+
+ public static String getMariaDbPort() {
+ return mariaDbPort.toString();
+ }
+
+ public static String getMariaDBDBName(){
+ return mariaDBDatabaseName;
+ }
+
+ static Connection getPostgreConnection() {
+ startPostgres();
+ Connection conn=null;
+ try
+ {
+ conn = postgresDatabase.getConnection();
+ } catch(SQLException e){
+ e.printStackTrace();
+ fail();
+ }
+ return conn;
+ }
+
+ static synchronized public void startPostgres(){
+ if(pg==null) {
+ try {
+ tf.create();
+ pg = EmbeddedPostgres.builder().setPort(postgresPort).setDataDirectory(tf.newFolder("tmp")+"/data-dir").start();
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+ if(postgresDatabase==null) {
+ postgresDatabase = pg.getPostgresDatabase();
+ }
+ }
+
+ static public String getPostgresUrl(){
+ return getPostgresUrlWithoutDb()+"/postgres";
+ }
+
+ static public String getPostgresUrlWithoutDb(){
+ return "jdbc:postgresql://localhost:"+Integer.toString(postgresPort);
+ }
+
+ synchronized static Connection getMariadbConnection(){
+ startMariaDb();
+ Connection conn = null;
+ try {
+ conn = DriverManager
+ .getConnection(getMariadbUrlWithoutDatabase()+"/"+mariaDBDatabaseName, "root", "");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail("Error creating mdbc connection");
+ }
+ return conn;
+ }
+
+ public synchronized static void startMariaDb(){
+ if (db == null) {
+ try {
+ db=DB.newEmbeddedDB(mariaDbPort);
+ db.start();
+ db.createDB(mariaDBDatabaseName);
+ } catch (ManagedProcessException e) {
+ e.printStackTrace();
+ fail("error initializing embedded mariadb");
+ }
+ }
+ }
+
+ static String getMariadbUrlWithoutDatabase(){
+ return "jdbc:mariadb://localhost:"+Integer.toString(mariaDbPort);
+ }
+
+ public static Connection getConnection(DBType type){
+ switch(type){
+ case MySQL:
+ return getMariadbConnection();
+ case POSTGRES:
+ return getPostgreConnection();
+ default:
+ fail("Wrong type for creating connection");
+ }
+ return null;
+ }
+
+ synchronized static void stopPostgres(){
+ postgresDatabase=null;
+ if(pg!=null) {
+ try {
+ pg.close();
+ pg=null;
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Error closing postgres database");
+ }
+ }
+ if(tf!=null){
+ tf.delete();
+ }
+ }
+
+ static void stopMySql(){
+ try {
+ db.stop();
+ } catch (ManagedProcessException e) {
+ e.printStackTrace();
+ fail("Error closing mysql");
+ }
+ }
+
+ public static void cleanDatabase(DBType type){
+ switch(type) {
+ case MySQL:
+ stopMySql();
+ break;
+ case POSTGRES:
+ stopPostgres();
+ break;
+ default:
+ fail("Wrong type for creating connection");
+ }
+ }
+
+ public static void initCassandra(){
+ try {
+ EmbeddedCassandraServerHelper.startEmbeddedCassandra(EmbeddedCassandraServerHelper.CASSANDRA_RNDPORT_YML_FILE);
+ } catch (Exception e) {
+ System.out.println(e);
+ fail("Error starting embedded cassandra");
+ }
+ cluster=EmbeddedCassandraServerHelper.getCluster();
+ //cluster = new Cluster.Builder().addContactPoint(cassaHost).withPort(9142).build();
+ cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(20000);
+ assertNotNull("Invalid configuration for cassandra", cluster);
+ session = EmbeddedCassandraServerHelper.getSession();
+ assertNotNull("Invalid configuration for cassandra", session);
+
+ MusicDataStoreHandle.mDstoreHandle = new MusicDataStore(cluster, session);
+ CassaLockStore store = new CassaLockStore(MusicDataStoreHandle.mDstoreHandle);
+ assertNotNull("Invalid configuration for music", store);
+ }
+
+ public static void stopCassandra(){
+ try {
+ EmbeddedCassandraServerHelper.cleanEmbeddedCassandra();
+ }
+ catch(NullPointerException e){
+ }
+ }
+
+ public static Session getSession(){
+ return session;
+ }
+
+ public static MusicMixin getMusicMixin() throws MDBCServiceException {
+ initNamespaces();
+ initTables();
+ MusicMixin mixin=null;
+ try {
+ Properties properties = new Properties();
+ properties.setProperty(MusicMixin.KEY_MY_ID,MdbcTestUtils.getServerName());
+ properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,MdbcTestUtils.getKeyspace());
+ properties.setProperty(MusicMixin.KEY_MUSIC_RFACTOR,"1");
+ properties.setProperty(MusicMixin.KEY_MUSIC_ADDRESS,MdbcTestUtils.getCassandraUrl());
+ mixin =new MusicMixin(null, MdbcTestUtils.getServerName(),properties);
+ } catch (MDBCServiceException e) {
+ fail("error creating music mixin");
+ }
+ return mixin;
+ }
+
+ public static void initNamespaces() throws MDBCServiceException{
+ MusicMixin.createKeyspace("music_internal",1);
+ MusicMixin.createKeyspace(keyspace,1);
+ }
+
+ public static void initTables() throws MDBCServiceException{
+ MusicMixin.createMusicRangeInformationTable(keyspace, mriTableName);
+ MusicMixin.createMusicTxDigest(mtdTableName,keyspace, -1);
+ MusicMixin.createMusicEventualTxDigest(eventualMtxdTableName,keyspace, -1);
+ MusicMixin.createMusicNodeInfoTable(nodeInfoTableName,keyspace,-1);
+ MusicMixin.createMusicRangeDependencyTable(keyspace,rangeDependencyTableName);
+ }
+
+}
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java
index 2d31939..862e600 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java
@@ -20,12 +20,14 @@
package org.onap.music.mdbc;
+import java.util.Properties;
import org.junit.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
+import org.onap.music.mdbc.MdbcTestUtils.DBType;
import org.onap.music.mdbc.mixins.MySQLMixin;
import ch.vorburger.mariadb4j.DB;
@@ -33,8 +35,8 @@ import ch.vorburger.mariadb4j.DB;
public class MySQLMixinTest {
public static final String DATABASE = "mdbctest";
- public static final String TABLE= "Persons";
- public static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS " + TABLE + " (\n" +
+ public static final String TABLE= MdbcTestUtils.getMariaDBDBName();
+ public static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS " + MdbcTestUtils.getMariaDBDBName()+ " (\n" +
" PersonID int,\n" +
" LastName varchar(255),\n" +
" FirstName varchar(255),\n" +
@@ -52,10 +54,8 @@ public class MySQLMixinTest {
@BeforeClass
public static void init() throws Exception {
Class.forName("org.mariadb.jdbc.Driver");
- //start embedded mariadb
- DB db = DB.newEmbeddedDB(13306);
- db.start();
- db.createDB(DATABASE);
+ MdbcTestUtils.startMariaDb();
+
}
@AfterClass
@@ -65,13 +65,14 @@ public class MySQLMixinTest {
@Before
public void beforeTest() throws SQLException {
- this.conn = DriverManager.getConnection("jdbc:mariadb://localhost:13306/"+DATABASE, "root", "");
- this.mysqlMixin = new MySQLMixin(null, "localhost:13306/"+DATABASE, conn, null);
+ this.conn = MdbcTestUtils.getConnection(DBType.MySQL);
+ Properties info = new Properties();
+ this.mysqlMixin = new MySQLMixin(null, null, conn, info);
}
@Test
public void testGetDataBaseName() throws SQLException {
- Assert.assertEquals(DATABASE, mysqlMixin.getDatabaseName());
+ Assert.assertEquals(MdbcTestUtils.getMariaDBDBName(), mysqlMixin.getDatabaseName());
}
}
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java
index 41a943e..c8d284e 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java
@@ -31,23 +31,35 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Consumer;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.Assert.*;
+
+import com.datastax.driver.core.Session;
+
+import java.util.*;
+
+
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
+
import org.onap.music.datastore.MusicDataStore;
import org.onap.music.datastore.MusicDataStoreHandle;
+
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.exceptions.MusicLockingException;
import org.onap.music.exceptions.MusicQueryException;
import org.onap.music.exceptions.MusicServiceException;
-import org.onap.music.lockingservice.cassandra.CassaLockStore;
import org.onap.music.lockingservice.cassandra.MusicLockState;
import org.onap.music.main.MusicCore;
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
+
import org.onap.music.mdbc.StateManager;
import org.onap.music.mdbc.proto.ProtoDigest.Digest.CompleteDigest;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
@@ -57,37 +69,25 @@ import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.google.protobuf.InvalidProtocolBufferException;
+import org.onap.music.mdbc.MdbcTestUtils;
+import org.onap.music.mdbc.TestUtils;
+import org.onap.music.mdbc.ownership.Dag;
+import org.onap.music.mdbc.ownership.DagNode;
+import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+
public class MusicMixinTest {
- final private static String keyspace="metricmusictest";
- final private static String mriTableName = "musicrangeinformation";
- final private static String mtdTableName = "musictxdigest";
- final private static String mdbcServerName = "name";
+
//Properties used to connect to music
- private static Cluster cluster;
private static Session session;
- private static String cassaHost = "localhost";
private static MusicMixin mixin = null;
private StateManager stateManager;
@BeforeClass
- public static void init() throws MusicServiceException {
- try {
- EmbeddedCassandraServerHelper.startEmbeddedCassandra();
- } catch (Exception e) {
- System.out.println(e);
- }
- cluster=EmbeddedCassandraServerHelper.getCluster();
- //cluster = new Cluster.Builder().addContactPoint(cassaHost).withPort(9142).build();
- cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(20000);
- assertNotNull("Invalid configuration for cassandra", cluster);
- session = EmbeddedCassandraServerHelper.getSession();
- assertNotNull("Invalid configuration for cassandra", session);
+ public static void init() throws MDBCServiceException {
+ MdbcTestUtils.initCassandra();
- MusicDataStoreHandle.mDstoreHandle = new MusicDataStore(cluster, session);
- CassaLockStore store = new CassaLockStore(MusicDataStoreHandle.mDstoreHandle);
- assertNotNull("Invalid configuration for music", store);
}
@AfterClass
@@ -102,59 +102,52 @@ public class MusicMixinTest {
}
@Before
- public void initTest(){
- session.execute("DROP KEYSPACE IF EXISTS "+keyspace);
+ public void initTest() throws MDBCServiceException {
+ session = MdbcTestUtils.getSession();
+ session.execute("DROP KEYSPACE IF EXISTS "+ MdbcTestUtils.getKeyspace());
+ mixin=MdbcTestUtils.getMusicMixin();
+ }
+
+ //@Test(timeout=10000)
+ @Ignore // TODO: Move ownership tests to OwnershipAndCheckpointTest
+ @Test
+ public void own() {
+ Range range = new Range("TEST.TABLE1");
+ List<Range> ranges = new ArrayList<>();
+ ranges.add(range);
+ DatabasePartition partition=null;
try {
- Properties properties = new Properties();
- properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,keyspace);
- properties.setProperty(MusicMixin.KEY_MY_ID,mdbcServerName);
- properties.setProperty(MusicMixin.KEY_COMPRESSION, Boolean.toString(true));
- mixin=new MusicMixin(stateManager, mdbcServerName,properties);
- } catch (MDBCServiceException e) {
- fail("error creating music mixin");
+ partition = TestUtils.createBasicRow(range, mixin, MdbcTestUtils.getServerName());
+ }
+ catch(Exception e){
+ fail("fail to create partition");
+ }
+ try {
+ TestUtils.unlockRow(MdbcTestUtils.getKeyspace(),MdbcTestUtils.getMriTableName(),partition);
+ } catch (MusicLockingException e) {
+ fail(e.getMessage());
}
+ DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey());
+ try {
+ mixin.getStateManager().getOwnAndCheck().own(mixin,ranges,currentPartition, MDBCUtils.generateTimebasedUniqueKey());
+ } catch (MDBCServiceException e) {
+ fail("failure when running own function");
+ }
}
- //Own has been removed from musicMixin
-// @Test(timeout=10000)
-// public void own() {
-// Range range = new Range("TABLE1");
-// List<Range> ranges = new ArrayList<>();
-// ranges.add(range);
-// DatabasePartition partition=null;
-// try {
-// partition = TestUtils.createBasicRow(range, mixin, mdbcServerName);
-// }
-// catch(Exception e){
-// fail(e.getMessage());
-// }
-// try {
-// TestUtils.unlockRow(keyspace,mriTableName,partition);
-// } catch (MusicLockingException e) {
-// fail(e.getMessage());
-// }
-//
-// DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey());
-// try {
-// mixin.own(ranges,currentPartition, MDBCUtils.generateTimebasedUniqueKey());
-// } catch (MDBCServiceException e) {
-// fail("failure when running own function");
-// }
-// }
-
private DatabasePartition addRow(List<Range> ranges,boolean isLatest){
final UUID uuid = MDBCUtils.generateTimebasedUniqueKey();
DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null);
MusicRangeInformationRow newRow = new MusicRangeInformationRow(uuid,dbPartition, new ArrayList<>(), "",
- mdbcServerName, isLatest);
+ MdbcTestUtils.getServerName(), isLatest);
DatabasePartition partition=null;
try {
partition = mixin.createMusicRangeInformation(newRow);
} catch (MDBCServiceException e) {
fail("failure when creating new row");
}
- String fullyQualifiedMriKey = keyspace+"."+ mriTableName+"."+partition.getMRIIndex().toString();
+ String fullyQualifiedMriKey = MdbcTestUtils.getKeyspace()+"."+ MdbcTestUtils.getMriTableName()+"."+partition.getMRIIndex().toString();
try {
MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId());
} catch (MusicLockingException e) {
@@ -163,81 +156,81 @@ public class MusicMixinTest {
return partition;
}
- //Own has been removed from musicMixin
-// @Test(timeout=10000)
-// public void own2() throws InterruptedException, MDBCServiceException {
-// List<Range> range12 = new ArrayList<>( Arrays.asList(
-// new Range("RANGE1"),
-// new Range("RANGE2")
-// ));
-// List<Range> range34 = new ArrayList<>( Arrays.asList(
-// new Range("RANGE3"),
-// new Range("RANGE4")
-// ));
-// List<Range> range24 = new ArrayList<>( Arrays.asList(
-// new Range("RANGE2"),
-// new Range("RANGE4")
-// ));
-// List<Range> range123 = new ArrayList<>( Arrays.asList(
-// new Range("RANGE1"),
-// new Range("RANGE2"),
-// new Range("RANGE3")
-// ));
-// DatabasePartition db1 = addRow(range12, false);
-// DatabasePartition db2 = addRow(range34, false);
-// MILLISECONDS.sleep(10);
-// DatabasePartition db3 = addRow(range12, true);
-// DatabasePartition db4 = addRow(range34, true);
-// MILLISECONDS.sleep(10);
-// DatabasePartition db5 = addRow(range24, true);
-// DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey());
-// MusicInterface.OwnershipReturn own = null;
-// try {
-// own = mixin.own(range123, currentPartition, MDBCUtils.generateTimebasedUniqueKey());
-// } catch (MDBCServiceException e) {
-// fail("failure when running own function");
-// }
-// Dag dag = own.getDag();
-//
-// DagNode node4 = dag.getNode(db4.getMRIIndex());
-// assertFalse(node4.hasNotIncomingEdges());
-// List<DagNode> outgoingEdges = new ArrayList<>(node4.getOutgoingEdges());
-// assertEquals(1,outgoingEdges.size());
-//
-// DagNode missing = outgoingEdges.get(0);
-// Set<Range> missingRanges = missing.getRangeSet();
-// assertEquals(2,missingRanges.size());
-// assertTrue(missingRanges.contains(new Range("RANGE1")));
-// assertTrue(missingRanges.contains(new Range("RANGE3")));
-// List<DagNode> outgoingEdges1 = missing.getOutgoingEdges();
-// assertEquals(1,outgoingEdges1.size());
-//
-// DagNode finalNode = outgoingEdges1.get(0);
-// assertFalse(finalNode.hasNotIncomingEdges());
-// Set<Range> finalSet = finalNode.getRangeSet();
-// assertEquals(3,finalSet.size());
-// assertTrue(finalSet.contains(new Range("RANGE1")));
-// assertTrue(finalSet.contains(new Range("RANGE2")));
-// assertTrue(finalSet.contains(new Range("RANGE3")));
-//
-// DagNode node5 = dag.getNode(db5.getMRIIndex());
-// List<DagNode> toRemoveOutEdges = node5.getOutgoingEdges();
-// assertEquals(1,toRemoveOutEdges.size());
-// toRemoveOutEdges.remove(finalNode);
-// assertEquals(0,toRemoveOutEdges.size());
-//
-// MusicRangeInformationRow row = mixin.getMusicRangeInformation(own.getRangeId());
-// assertTrue(row.getIsLatest());
-// DatabasePartition dbPartition = row.getDBPartition();
-// List<Range> snapshot = dbPartition.getSnapshot();
-// assertEquals(3,snapshot.size());
-// MusicRangeInformationRow node5row = mixin.getMusicRangeInformation(node5.getId());
-// assertFalse(node5row.getIsLatest());
-// MusicRangeInformationRow node4Row = mixin.getMusicRangeInformation(db4.getMRIIndex());
-// assertFalse(node4Row.getIsLatest());
-// MusicRangeInformationRow node3Row = mixin.getMusicRangeInformation(db3.getMRIIndex());
-// assertFalse(node3Row.getIsLatest());
-// }
+ @Ignore // TODO: Move ownership tests to OwnershipAndCheckpointTest
+ @Test(timeout=1000)
+ public void own2() throws InterruptedException, MDBCServiceException {
+ List<Range> range12 = new ArrayList<>( Arrays.asList(
+ new Range("TEST.RANGE1"),
+ new Range("TEST.RANGE2")
+ ));
+ List<Range> range34 = new ArrayList<>( Arrays.asList(
+ new Range("TEST.RANGE3"),
+ new Range("TEST.RANGE4")
+ ));
+ List<Range> range24 = new ArrayList<>( Arrays.asList(
+ new Range("TEST.RANGE2"),
+ new Range("TEST.RANGE4")
+ ));
+ List<Range> range123 = new ArrayList<>( Arrays.asList(
+ new Range("TEST.RANGE1"),
+ new Range("TEST.RANGE2"),
+ new Range("TEST.RANGE3")
+ ));
+ DatabasePartition db1 = addRow(range12, false);
+ DatabasePartition db2 = addRow(range34, false);
+ MILLISECONDS.sleep(10);
+ DatabasePartition db3 = addRow(range12, true);
+ DatabasePartition db4 = addRow(range34, true);
+ MILLISECONDS.sleep(10);
+ DatabasePartition db5 = addRow(range24, true);
+ DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey());
+ MusicInterface.OwnershipReturn own = null;
+ try {
+ own = mixin.getStateManager().getOwnAndCheck().own(mixin,range123, currentPartition, MDBCUtils.generateTimebasedUniqueKey());
+ } catch (MDBCServiceException e) {
+ fail("failure when running own function");
+ }
+ Dag dag = own.getDag();
+
+ DagNode node4 = dag.getNode(db4.getMRIIndex());
+ assertFalse(node4.hasNotIncomingEdges());
+ List<DagNode> outgoingEdges = new ArrayList<>(node4.getOutgoingEdges());
+ assertEquals(1,outgoingEdges.size());
+
+ DagNode missing = outgoingEdges.get(0);
+ Set<Range> missingRanges = missing.getRangeSet();
+ assertEquals(2,missingRanges.size());
+ assertTrue(missingRanges.contains(new Range("TEST.RANGE1")));
+ assertTrue(missingRanges.contains(new Range("TEST.RANGE3")));
+ List<DagNode> outgoingEdges1 = missing.getOutgoingEdges();
+ assertEquals(1,outgoingEdges1.size());
+
+ DagNode finalNode = outgoingEdges1.get(0);
+ assertFalse(finalNode.hasNotIncomingEdges());
+ Set<Range> finalSet = finalNode.getRangeSet();
+ assertEquals(3,finalSet.size());
+ assertTrue(finalSet.contains(new Range("TEST.RANGE1")));
+ assertTrue(finalSet.contains(new Range("TEST.RANGE2")));
+ assertTrue(finalSet.contains(new Range("TEST.RANGE3")));
+
+ DagNode node5 = dag.getNode(db5.getMRIIndex());
+ List<DagNode> toRemoveOutEdges = node5.getOutgoingEdges();
+ assertEquals(1,toRemoveOutEdges.size());
+ toRemoveOutEdges.remove(finalNode);
+ assertEquals(0,toRemoveOutEdges.size());
+
+ MusicRangeInformationRow row = mixin.getMusicRangeInformation(own.getRangeId());
+ assertTrue(row.getIsLatest());
+ DatabasePartition dbPartition = row.getDBPartition();
+ List<Range> snapshot = dbPartition.getSnapshot();
+ assertEquals(3,snapshot.size());
+ MusicRangeInformationRow node5row = mixin.getMusicRangeInformation(node5.getId());
+ assertFalse(node5row.getIsLatest());
+ MusicRangeInformationRow node4Row = mixin.getMusicRangeInformation(db4.getMRIIndex());
+ assertFalse(node4Row.getIsLatest());
+ MusicRangeInformationRow node3Row = mixin.getMusicRangeInformation(db3.getMRIIndex());
+ assertFalse(node3Row.getIsLatest());
+ }
@Test
public void relinquish() {
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java
new file mode 100644
index 0000000..2134a79
--- /dev/null
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java
@@ -0,0 +1,220 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc.mixins;
+
+import static org.junit.Assert.*;
+
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.mdbc.MdbcTestUtils;
+import org.onap.music.mdbc.MdbcTestUtils.DBType;
+import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.TableInfo;
+import org.onap.music.mdbc.tables.StagingTable;
+
+public class PostgresMixinTest {
+ final private static String keyspace="metricmusictest";
+ final private static String mdbcServerName = "name";
+
+ static PostgresMixin mixin;
+
+ private static MusicMixin mi = null;
+ private static Connection conn;
+
+ @BeforeClass
+ public static void init() throws MDBCServiceException {
+ MdbcTestUtils.initCassandra();
+ mi=MdbcTestUtils.getMusicMixin();
+ try {
+ conn = MdbcTestUtils.getConnection(DBType.POSTGRES);
+ Properties info = new Properties();
+ mixin = new PostgresMixin(mi, null, conn, info);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @AfterClass
+ public static void close(){
+ //TODO: shutdown cassandra
+ mixin=null;
+ MdbcTestUtils.cleanDatabase(DBType.POSTGRES);
+ MdbcTestUtils.stopCassandra();
+ }
+
+ @Test
+ public void getMixinName() {
+ final String mixinName = mixin.getMixinName();
+ assertEquals(mixinName.toLowerCase(),"postgres");
+ }
+
+ @Test
+ public void getSQLTableSet() {
+ createTestTable();
+ final Set<String> sqlTableSet = mixin.getSQLTableSet();
+ assertEquals(1,sqlTableSet.size());
+ assertTrue(sqlTableSet.contains("testtable"));
+ }
+
+ @Test
+ public void getTableInfo() {
+ createTestTable();
+ final TableInfo tableInfo = mixin.getTableInfo("testtable");
+ assertNotNull(tableInfo);
+ assertEquals(3,tableInfo.columns.size());
+ int index=0;
+ for(String col: tableInfo.columns) {
+ switch(col.toLowerCase()){
+ case "ix":
+ assertTrue(tableInfo.iskey.get(index));
+ assertEquals(Types.INTEGER, tableInfo.coltype.get(index).intValue());
+ break;
+ case "test1":
+ assertFalse(tableInfo.iskey.get(index));
+ assertEquals(Types.CHAR, tableInfo.coltype.get(index).intValue());
+ break;
+ case "test2":
+ assertFalse(tableInfo.iskey.get(index));
+ assertEquals(Types.VARCHAR, tableInfo.coltype.get(index).intValue());
+ break;
+ default:
+ fail();
+ }
+ index++;
+ }
+ }
+
+ private void createTestTable() {
+ try {
+ final Statement statement = conn.createStatement();
+ statement.execute("CREATE TABLE IF NOT EXISTS testtable (IX SERIAL, test1 CHAR(1), test2 VARCHAR(255), PRIMARY KEY (IX));");
+ statement.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ private void cleanTestTable() {
+ try {
+ final Statement statement = conn.createStatement();
+ statement.execute("DELETE FROM testtable;");
+ statement.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void postStatementHook() {
+ createTestTable();
+ mixin.createSQLTriggers("testtable");
+ final String sqlOperation = "INSERT INTO testtable (test1,test2) VALUES ('u','test');";
+ Statement stm=null;
+ try {
+ stm = conn.createStatement();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail();
+ }
+ mixin.preStatementHook(sqlOperation);
+ try {
+ stm.execute(sqlOperation);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail();
+ }
+ StagingTable st=new StagingTable();
+ mixin.postStatementHook(sqlOperation,st);
+ mixin.preCommitHook();
+ assertFalse(st.isEmpty());
+ }
+
+ void checkEmptyTestTable(){
+ ResultSet resultSet = mixin.executeSQLRead("SELECT * FROM testtable;");
+ try {
+ assertFalse(resultSet.next());
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+
+ void checkOneRowWithContents(String test1Val, String test2Val){
+ ResultSet resultSet = mixin.executeSQLRead("SELECT * FROM testtable;");
+ try {
+ assertTrue(resultSet.next());
+ assertEquals(test1Val, resultSet.getString("test1"));
+ assertEquals(test2Val, resultSet.getString("test2"));
+ assertFalse(resultSet.next());
+ }
+ catch(SQLException e){
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void applyTxDigest() {
+ createTestTable();
+ mixin.createSQLTriggers("testtable");
+ final String sqlOperation = "INSERT INTO testtable (test1,test2) VALUES ('u','test');";
+ Statement stm=null;
+ try {
+ stm = conn.createStatement();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail();
+ }
+ mixin.preStatementHook(sqlOperation);
+ try {
+ stm.execute(sqlOperation);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail();
+ }
+ StagingTable st=new StagingTable();
+ mixin.postStatementHook(sqlOperation,st);
+ mixin.preCommitHook();
+ assertFalse(st.isEmpty());
+ cleanTestTable();
+ checkEmptyTestTable();
+ List<Range> ranges = new ArrayList<>();
+ ranges.add(new Range("public.testtable"));
+ try {
+ mixin.applyTxDigest(st,ranges);
+ } catch (SQLException|MDBCServiceException e) {
+ e.printStackTrace();
+ fail();
+ }
+ checkOneRowWithContents("u","test");
+ }
+} \ No newline at end of file
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
index da64595..9e6161a 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
@@ -59,7 +59,7 @@ public class DagTest {
public void getDag() throws InterruptedException, MDBCServiceException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range1")
+ new Range("schema.range1")
));
rows.add(createNewRow(new ArrayList<>(ranges),"",false));
MILLISECONDS.sleep(10);
@@ -87,14 +87,14 @@ public class DagTest {
public void getDag2() throws InterruptedException, MDBCServiceException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> range1 = new ArrayList<>( Arrays.asList(
- new Range("range1")
+ new Range("schema.range1")
));
List<Range> range2 = new ArrayList<>( Arrays.asList(
- new Range("range2")
+ new Range("schema.range2")
));
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range1")
+ new Range("schema.range2"),
+ new Range("schema.range1")
));
rows.add(createNewRow(new ArrayList<>(range1),"",false));
MILLISECONDS.sleep(10);
@@ -123,7 +123,7 @@ public class DagTest {
public void nextToOwn() throws InterruptedException, MDBCServiceException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range1")
+ new Range("schema.range1")
));
rows.add(createNewRow(new ArrayList<>(ranges),"",false));
MILLISECONDS.sleep(10);
@@ -149,20 +149,20 @@ public class DagTest {
public void nextToApply() throws InterruptedException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range1")
+ new Range("schema.range1")
));
List<MusicTxDigestId> redo1 = new ArrayList<>(Arrays.asList(
- new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0)
+ new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
));
rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo1));
MILLISECONDS.sleep(10);
List<MusicTxDigestId> redo2 = new ArrayList<>(Arrays.asList(
- new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0)
+ new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
));
rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo2));
MILLISECONDS.sleep(10);
List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList(
- new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0)
+ new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
));
rows.add(createNewRow(new ArrayList<>(ranges),"",true,redo3));
Dag dag = Dag.getDag(rows, ranges);
@@ -180,7 +180,7 @@ public class DagTest {
assertEquals(0,pair.getKey().index);
List<Range> value = pair.getValue();
assertEquals(1,value.size());
- assertEquals(new Range("range1"),value.get(0));
+ assertEquals(new Range("schema.range1"),value.get(0));
pair = node.nextNotAppliedTransaction(rangesSet);
transactionCounter++;
}
@@ -195,23 +195,23 @@ public class DagTest {
Map<Range, Pair<MriReference, Integer>> alreadyApplied = new HashMap<>();
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range1")
+ new Range("schema.range1")
));
List<MusicTxDigestId> redo1 = new ArrayList<>(Arrays.asList(
- new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0)
+ new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
));
rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo1));
MILLISECONDS.sleep(10);
List<MusicTxDigestId> redo2 = new ArrayList<>(Arrays.asList(
- new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0),
- new MusicTxDigestId(MDBCUtils.generateUniqueKey(),1)
+ new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0),
+ new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),1)
));
MusicRangeInformationRow newRow = createNewRow(new ArrayList<>(ranges), "", false, redo2);
- alreadyApplied.put(new Range("range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0));
+ alreadyApplied.put(new Range("schema.range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0));
rows.add(newRow);
MILLISECONDS.sleep(10);
List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList(
- new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0)
+ new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
));
rows.add(createNewRow(new ArrayList<>(ranges),"",true,redo3));
Dag dag = Dag.getDag(rows, ranges);
@@ -230,7 +230,7 @@ public class DagTest {
assertEquals(2-nodeCounter,pair.getKey().index);
List<Range> value = pair.getValue();
assertEquals(1,value.size());
- assertEquals(new Range("range1"),value.get(0));
+ assertEquals(new Range("schema.range1"),value.get(0));
pair = node.nextNotAppliedTransaction(rangesSet);
transactionCounter++;
}
@@ -244,14 +244,14 @@ public class DagTest {
public void isDifferent() throws InterruptedException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> range1 = new ArrayList<>( Arrays.asList(
- new Range("range1")
+ new Range("schema.range1")
));
List<Range> range2 = new ArrayList<>( Arrays.asList(
- new Range("range2")
+ new Range("schema.range2")
));
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range1")
+ new Range("schema.range2"),
+ new Range("schema.range1")
));
rows.add(createNewRow(new ArrayList<>(range1),"",false));
MILLISECONDS.sleep(10);
@@ -277,14 +277,14 @@ public class DagTest {
public void getOldestDoubles() throws InterruptedException, MDBCServiceException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> range1 = new ArrayList<>( Arrays.asList(
- new Range("range1")
+ new Range("schema.range1")
));
List<Range> range2 = new ArrayList<>( Arrays.asList(
- new Range("range2")
+ new Range("schema.range2")
));
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range1")
+ new Range("schema.range2"),
+ new Range("schema.range1")
));
rows.add(createNewRow(new ArrayList<>(range1),"",false));
MILLISECONDS.sleep(10);
@@ -306,15 +306,15 @@ public class DagTest {
public void getIncompleteRangesAndDependents() throws InterruptedException, MDBCServiceException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> range1 = new ArrayList<>( Arrays.asList(
- new Range("range1")
+ new Range("schema.range1")
));
List<Range> range2 = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range3")
+ new Range("schema.range2"),
+ new Range("schema.range3")
));
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range1")
+ new Range("schema.range2"),
+ new Range("schema.range1")
));
rows.add(createNewRow(new ArrayList<>(range1),"",false));
MILLISECONDS.sleep(10);
@@ -330,7 +330,7 @@ public class DagTest {
List<Range> incomplete = incompleteRangesAndDependents.getKey();
Set<DagNode> dependents = incompleteRangesAndDependents.getValue();
assertEquals(1,incomplete.size());
- assertTrue(incomplete.contains(new Range("range3")));
+ assertTrue(incomplete.contains(new Range("schema.range3")));
assertEquals(1,dependents.size());
assertTrue(dependents.contains(dag.getNode(rows.get(3).getPartitionIndex())));
}
@@ -339,16 +339,16 @@ public class DagTest {
public void getIncompleteRangesAndDependents2() throws InterruptedException, MDBCServiceException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> range1 = new ArrayList<>( Arrays.asList(
- new Range("range1"),
- new Range("range4")
+ new Range("schema.range1"),
+ new Range("schema.range4")
));
List<Range> range2 = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range3")
+ new Range("schema.range2"),
+ new Range("schema.range3")
));
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range1")
+ new Range("schema.range2"),
+ new Range("schema.range1")
));
rows.add(createNewRow(new ArrayList<>(range1),"",false));
MILLISECONDS.sleep(10);
@@ -364,8 +364,8 @@ public class DagTest {
List<Range> incomplete = incompleteRangesAndDependents.getKey();
Set<DagNode> dependents = incompleteRangesAndDependents.getValue();
assertEquals(2,incomplete.size());
- assertTrue(incomplete.contains(new Range("range3")));
- assertTrue(incomplete.contains(new Range("range4")));
+ assertTrue(incomplete.contains(new Range("schema.range3")));
+ assertTrue(incomplete.contains(new Range("schema.range4")));
assertEquals(2,dependents.size());
assertTrue(dependents.contains(dag.getNode(rows.get(3).getPartitionIndex())));
assertTrue(dependents.contains(dag.getNode(rows.get(2).getPartitionIndex())));
@@ -375,20 +375,20 @@ public class DagTest {
public void addNewNodeWithSearch() throws InterruptedException, MDBCServiceException {
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> range1 = new ArrayList<>( Arrays.asList(
- new Range("range1")
+ new Range("schema.range1")
));
List<Range> range2 = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range3")
+ new Range("schema.range2"),
+ new Range("schema.range3")
));
List<Range> ranges = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range1")
+ new Range("schema.range2"),
+ new Range("schema.range1")
));
List<Range> allRanges = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range3"),
- new Range("range1")
+ new Range("schema.range2"),
+ new Range("schema.range3"),
+ new Range("schema.range1")
));
rows.add(createNewRow(new ArrayList<>(range1),"",false));
MILLISECONDS.sleep(10);
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java
index eb01bcd..f2fbd1f 100644
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java
@@ -34,7 +34,6 @@ import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
@@ -46,8 +45,10 @@ import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.lockingservice.cassandra.CassaLockStore;
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.MDBCUtils;
+import org.onap.music.mdbc.MdbcTestUtils.DBType;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.StateManager;
+import org.onap.music.mdbc.MdbcTestUtils;
import org.onap.music.mdbc.TestUtils;
import org.onap.music.mdbc.mixins.LockResult;
import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn;
@@ -58,13 +59,8 @@ import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.TxCommitProgress;
public class OwnershipAndCheckpointTest {
- final private static int sqlPort = 13350;
- final private static String keyspace="metricmusictest";
- final private static String mriTableName = "musicrangeinformation";
- final private static String mtdTableName = "musictxdigest";
- final private static String mdbcServerName = "name";
- public static final String DATABASE = "mdbcTest";
- public static final String TABLE= "PERSONS";
+ public static final String DATABASE = MdbcTestUtils.mariaDBDatabaseName;
+ public static final String TABLE= MdbcTestUtils.mariaDBDatabaseName+".PERSONS";
public static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS " + TABLE + " (\n" +
" PersonID int,\n" +
" LastName varchar(255),\n" +
@@ -75,11 +71,7 @@ public class OwnershipAndCheckpointTest {
");";
public static final String DROP_TABLE = "DROP TABLE IF EXISTS " + TABLE + ";";
//Properties used to connect to music
- private static Cluster cluster;
- private static Session session;
- private static String cassaHost = "localhost";
private static MusicMixin musicMixin = null;
- private static DB db;
Connection conn;
MySQLMixin mysqlMixin;
OwnershipAndCheckpoint ownAndCheck;
@@ -90,37 +82,18 @@ public class OwnershipAndCheckpointTest {
@BeforeClass
public static void init() throws MusicServiceException, ClassNotFoundException, ManagedProcessException {
- try {
- EmbeddedCassandraServerHelper.startEmbeddedCassandra();
- } catch (Exception e) {
- fail(e.getMessage());
- }
- cluster=EmbeddedCassandraServerHelper.getCluster();
- //cluster = new Cluster.Builder().addContactPoint(cassaHost).withPort(9142).build();
- cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(20000);
- assertNotNull("Invalid configuration for cassandra", cluster);
- session = EmbeddedCassandraServerHelper.getSession();
- assertNotNull("Invalid configuration for cassandra", session);
+ MdbcTestUtils.initCassandra();
Class.forName("org.mariadb.jdbc.Driver");
- MusicDataStoreHandle.mDstoreHandle = new MusicDataStore(cluster, session);
- CassaLockStore store = new CassaLockStore(MusicDataStoreHandle.mDstoreHandle);
- assertNotNull("Invalid configuration for music", store);
//start embedded mariadb
- db = DB.newEmbeddedDB(sqlPort);
- db.start();
- db.createDB(DATABASE);
+ MdbcTestUtils.startMariaDb();
}
@AfterClass
public static void close() throws MusicServiceException, MusicQueryException, ManagedProcessException {
//TODO: shutdown cassandra
musicMixin=null;
- db.stop();
- try {
- EmbeddedCassandraServerHelper.cleanEmbeddedCassandra();
- }
- catch(NullPointerException e){
- }
+ MdbcTestUtils.cleanDatabase(DBType.MySQL);
+ MdbcTestUtils.stopCassandra();
}
private void dropTable() throws SQLException {
@@ -144,25 +117,34 @@ public class OwnershipAndCheckpointTest {
@Before
public void initTest() throws SQLException {
- session.execute("DROP KEYSPACE IF EXISTS "+keyspace);
+ MdbcTestUtils.getSession().execute("DROP KEYSPACE IF EXISTS "+MdbcTestUtils.getKeyspace());
try {
Properties properties = new Properties();
+/*
properties.setProperty(MusicMixin.KEY_MY_ID,mdbcServerName);
properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,keyspace);
properties.setProperty(MusicMixin.KEY_MUSIC_RFACTOR,"1");
//StateManager stateManager = new StateManager("dbUrl", properties, "serverName", "dbName");
ownAndCheck = new OwnershipAndCheckpoint();
musicMixin =new MusicMixin(stateManager, mdbcServerName,properties);
+*/
+ properties.setProperty(MusicMixin.KEY_MY_ID,MdbcTestUtils.getServerName());
+ properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,MdbcTestUtils.getKeyspace());
+ properties.setProperty(MusicMixin.KEY_MUSIC_RFACTOR,"1");
+ properties.setProperty(MusicMixin.KEY_MUSIC_ADDRESS,MdbcTestUtils.getCassandraUrl());
+ ownAndCheck = new OwnershipAndCheckpoint();
+ musicMixin =new MusicMixin(stateManager, MdbcTestUtils.getServerName(), properties);
} catch (MDBCServiceException e) {
fail("error creating music musicMixin " + e.getMessage());
}
- this.conn = DriverManager.getConnection("jdbc:mariadb://localhost:"+sqlPort+"/"+DATABASE, "root", "");
- this.mysqlMixin = new MySQLMixin(musicMixin, "localhost:"+sqlPort+"/"+DATABASE, conn, null);
+ this.conn = MdbcTestUtils.getConnection(DBType.MySQL);
+ Properties info = new Properties();
+ this.mysqlMixin = new MySQLMixin(musicMixin, "localhost:"+MdbcTestUtils.getMariaDbPort()+"/"+DATABASE, conn, info);
dropAndCreateTable();
}
private void initDatabase(Range range) throws MDBCServiceException, SQLException {
- final DatabasePartition partition = TestUtils.createBasicRow(range, musicMixin, mdbcServerName);
+ final DatabasePartition partition = TestUtils.createBasicRow(range, musicMixin, MdbcTestUtils.getServerName());
String sqlOperation = "INSERT INTO "+TABLE+" (PersonID,LastName,FirstName,Address,City) VALUES "+
"(1,'SAUREZ','ENRIQUE','GATECH','ATLANTA');";
StagingTable stagingTable = new StagingTable();
@@ -171,13 +153,15 @@ public class OwnershipAndCheckpointTest {
executeStatement.execute(sqlOperation);
this.conn.commit();
mysqlMixin.postStatementHook(sqlOperation,stagingTable);
+ mysqlMixin.preCommitHook();
executeStatement.close();
String id = MDBCUtils.generateUniqueKey().toString();
TxCommitProgress progressKeeper = new TxCommitProgress();
progressKeeper.createNewTransactionTracker(id ,this.conn);
musicMixin.commitLog(partition, null, stagingTable, id, progressKeeper);
try {
- TestUtils.unlockRow(keyspace, mriTableName, partition);
+// TestUtils.unlockRow(keyspace, mriTableName, partition);
+ TestUtils.unlockRow(MdbcTestUtils.getKeyspace(), MdbcTestUtils.getMriTableName(), partition);
}
catch(Exception e){
fail(e.getMessage());