aboutsummaryrefslogtreecommitdiffstats
path: root/mdbc-server/src/main/java
diff options
context:
space:
mode:
authorBrendan Tschaen <ctschaen@att.com>2019-05-15 15:08:05 +0000
committerGerrit Code Review <gerrit@onap.org>2019-05-15 15:08:05 +0000
commit96933528fdd51e606e48d79941ce43249ffb48b6 (patch)
treeefec05eb72232c030e442eb12afa2260d6b6bd67 /mdbc-server/src/main/java
parent8a3db86e697de59a29e712d23e771c8718f9778a (diff)
parent52159a510dc31ee1dacf612a1e05cc8612a117e5 (diff)
Merge "Implement postgres, fixes to eventual, and many bug fixes"
Diffstat (limited to 'mdbc-server/src/main/java')
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java4
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java19
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java55
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java9
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/Range.java21
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java61
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java213
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java80
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java16
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java9
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json11
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json8
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java100
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java10
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java5
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java2
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java55
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java156
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java1066
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java26
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java35
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java8
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java2
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java11
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java6
25 files changed, 1797 insertions, 191 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
index ced5745..91b13f3 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
@@ -28,6 +28,8 @@ public class Configuration {
public static final String KEY_DB_MIXIN_NAME = "MDBC_DB_MIXIN";
/** The property name to use to select the MUSIC 'mixin'. */
public static final String KEY_MUSIC_MIXIN_NAME = "MDBC_MUSIC_MIXIN";
+ /** The property name to select if async staging table update is used */
+ public static final String KEY_ASYNC_STAGING_TABLE_UPDATE = "ASYNC_STAGING_TABLE_UPDATE";
/** The name of the default mixin to use for the DBInterface. */
public static final String DB_MIXIN_DEFAULT = "mysql";//"h2";
/** The name of the default mixin to use for the MusicInterface. */
@@ -44,4 +46,6 @@ public class Configuration {
public static final long DEFAULT_OWNERSHIP_TIMEOUT = 5*60*60*1000;//default of 5 hours
/** The property name to provide comma separated list of ranges to warmup */
public static final String KEY_WARMUPRANGES = "warmupranges";
+ /** Default async staging table update o ption*/
+ public static final String ASYNC_STAGING_TABLE_UPDATE = "false";
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
index 32d456e..ae2b869 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
@@ -98,10 +98,25 @@ public class MDBCUtils {
}
public static List<Range> getTables(Map<String,List<SQLOperation>> queryParsed){
+ return getTables(null, queryParsed);
+ }
+
+ public static List<Range> getTables(String defaultDatabaseName, Map<String,List<SQLOperation>> queryParsed){
List<Range> ranges = new ArrayList<>();
for(String table: queryParsed.keySet()){
- ranges.add(new Range(table));
- }
+ String[] parts = table.split("\\.");
+ if(parts.length==2){
+ ranges.add(new Range(table));
+ }
+ else if(parts.length==1 && defaultDatabaseName!=null){
+ ranges.add(new Range(defaultDatabaseName+"."+table));
+ }
+ else{
+ throw new IllegalArgumentException("Table should either have only one '.' or none at all, the table "
+ + "received is "+table);
+ }
+
+ }
return ranges;
}
} \ No newline at end of file
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
index cb2df7f..b4d7bb9 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
@@ -160,18 +160,7 @@ public class MdbcConnection implements Connection {
if(progressKeeper!=null) progressKeeper.commitRequested(id);
logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b);
if (b) {
- // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction
- if(id == null || id.isEmpty()) {
- logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
- throw new SQLException("tx id is null");
- }
- try {
- mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper);
- } catch (MDBCServiceException e) {
- // TODO Auto-generated catch block
- logger.error("Cannot commit log to music" + e.getStackTrace());
- throw new SQLException(e.getMessage(), e);
- }
+ musicCommit();
}
if(progressKeeper!=null) {
progressKeeper.setMusicDone(id);
@@ -191,13 +180,7 @@ public class MdbcConnection implements Connection {
return jdbcConn.getAutoCommit();
}
- /**
- * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed,
- * they are performed now and copied into MUSIC.
- * @throws SQLException
- */
- @Override
- public void commit() throws SQLException {
+ private void musicCommit() throws SQLException {
if(progressKeeper.isComplete(id)) {
return;
}
@@ -205,6 +188,7 @@ public class MdbcConnection implements Connection {
progressKeeper.commitRequested(id);
}
+ dbi.preCommitHook();
try {
logger.debug(EELFLoggerDelegate.applicationLogger, " commit ");
// transaction was committed -- add all the updates into the REDO-Log in MUSIC
@@ -214,6 +198,16 @@ public class MdbcConnection implements Connection {
logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
throw new SQLException("Failure commiting to MUSIC", e);
}
+ }
+
+ /**
+ * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed,
+ * they are performed now and copied into MUSIC.
+ * @throws SQLException
+ */
+ @Override
+ public void commit() throws SQLException {
+ musicCommit();
if(progressKeeper != null) {
progressKeeper.setMusicDone(id);
@@ -273,9 +267,10 @@ public class MdbcConnection implements Connection {
} catch (MDBCServiceException e) {
throw new SQLException("Failure during relinquish of partition",e);
}
- // Warning! Make sure this call remains AFTER the call to jdbcConn.close(),
- // otherwise you're going to get stuck in an infinite loop.
- statemanager.closeConnection(id);
+
+ // Warning! Make sure this call remains AFTER the call to jdbcConn.close(),
+ // otherwise you're going to get stuck in an infinite loop.
+ statemanager.closeConnection(id);
}
@Override
@@ -507,7 +502,8 @@ public class MdbcConnection implements Connection {
//Parse tables from the sql query
Map<String, List<SQLOperation>> tableToInstruction = QueryProcessor.parseSqlQuery(sql, table_set);
//Check ownership of keys
- List<Range> queryTables = MDBCUtils.getTables(tableToInstruction);
+ String defaultSchema = dbi.getSchema();
+ List<Range> queryTables = MDBCUtils.getTables(defaultSchema, tableToInstruction);
if (this.partition!=null) {
List<Range> snapshot = this.partition.getSnapshot();
if(snapshot!=null){
@@ -550,18 +546,15 @@ public class MdbcConnection implements Connection {
logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1);
for (String tableName : set1) {
// This map will be filled in if this table was previously discovered
- tableName = tableName.toUpperCase();
- if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) {
+ if (!table_set.contains(tableName.toUpperCase()) && !dbi.getReservedTblNames().contains(tableName.toUpperCase())) {
logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName);
try {
TableInfo ti = dbi.getTableInfo(tableName);
- mi.initializeMusicForTable(ti,tableName);
- //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted
- ti = dbi.getTableInfo(tableName);
- mi.createDirtyRowTable(ti,tableName);
+ //mi.initializeMusicForTable(ti,tableName);
+ //mi.createDirtyRowTable(ti,tableName);
dbi.createSQLTriggers(tableName);
- table_set.add(tableName);
- dbi.synchronizeData(tableName);
+ table_set.add(tableName.toUpperCase());
+ //dbi.synchronizeData(tableName);
logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" +
table_set.size() + "/" + set1.size() + "tables uploaded");
} catch (Exception e) {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
index 42b9710..500ed81 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
@@ -80,6 +80,15 @@ public class MdbcServer {
Properties connectionProps = new Properties();
connectionProps.put("user", user);
connectionProps.put("password", password);
+ String defaultMusicMixin = Utils.getDefaultMusicMixin();
+ if(defaultMusicMixin!=null){
+ connectionProps.put(Configuration.KEY_MUSIC_MIXIN_NAME,defaultMusicMixin);
+ }
+ String defaultDBMixin = Utils.getDefaultDBMixin();
+ if(defaultMusicMixin!=null){
+ connectionProps.put(Configuration.KEY_DB_MIXIN_NAME,defaultDBMixin);
+ }
+ Utils.registerDefaultDrivers();
meta = new MdbcServerLogic(url,connectionProps,config);
LocalService service = new LocalService(meta);
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
index 16e7170..41aed26 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
@@ -23,6 +23,9 @@ import java.io.Serializable;
import java.util.List;
import java.util.Objects;
+import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.mixins.MusicMixin;
+
/**
* This class represent a range of the whole database
@@ -32,13 +35,21 @@ import java.util.Objects;
*/
public class Range implements Cloneable{
+ private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Range.class);
+
private String table;
public Range(String table) {
- this.table = table.toUpperCase();
+ final String[] split = table.split("\\.");
+ if(split.length!=2){
+ logger.debug("Table should contain schema, received in constructor: " + table);
+// throw new IllegalArgumentException("Table should always contain the schema, table received in "
+// + "constructor is "+table);
+ }
+ this.table = table;
}
- public String toString(){return table.toUpperCase();}
+ public String toString(){return table;}
/**
* Compares to Range types
@@ -55,7 +66,7 @@ public class Range implements Cloneable{
@Override
public int hashCode(){
- return table.hashCode();
+ return table.toUpperCase().hashCode();
}
@Override
@@ -74,11 +85,11 @@ public class Range implements Cloneable{
public static boolean overlaps(List<Range> ranges, String table){
//\TODO check if parallel stream makes sense here
- return ranges.stream().map((Range r) -> r.table.equals(table)).anyMatch((Boolean b) -> b);
+ return ranges.stream().map((Range r) -> r.table.toUpperCase().equals(table.toUpperCase())).anyMatch((Boolean b) -> b);
}
public boolean overlaps(Range other) {
- return table.equals(other.table);
+ return table.toUpperCase().equals(other.table.toUpperCase());
}
public String getTable() {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index e4c4a24..18bc0db 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -21,7 +21,6 @@ package org.onap.music.mdbc;
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.onap.music.exceptions.MDBCServiceException;
@@ -47,6 +46,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -102,10 +102,10 @@ public class StateManager {
public StateManager() {
}
- public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
+ public StateManager(String sqlDBUrl, Properties newInfo, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
this.sqlDBName = sqlDBName;
- this.sqlDBUrl = sqlDBUrl;
- this.info = info;
+ this.sqlDBUrl = cleanSqlUrl(sqlDBUrl);
+ this.info = new Properties();
this.mdbcServerName = mdbcServerName;
this.connectionRanges = new ConcurrentHashMap<>();
@@ -117,6 +117,7 @@ public class StateManager {
} catch (IOException e) {
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
}
+ info.putAll(newInfo);
cassandraUrl = info.getProperty(Configuration.KEY_CASSANDRA_URL, Configuration.CASSANDRA_URL_DEFAULT);
musicmixin = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT);
@@ -129,6 +130,16 @@ public class StateManager {
ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout);
}
+ protected String cleanSqlUrl(String url){
+ if(url!=null) {
+ url = url.trim();
+ if (url.length() > 0 && url.charAt(url.length() - 1) == '/') {
+ url= url.substring(0, url.length() - 1);
+ }
+ }
+ return url;
+ }
+
protected void initTxDaemonThread(){
txDaemon = new Thread(
new MusicTxDigestDaemon(Integer.parseInt(
@@ -149,27 +160,21 @@ public class StateManager {
}
protected void initSqlDatabase() throws MDBCServiceException {
- try {
- //\TODO: pass the driver as a variable
- Class.forName("org.mariadb.jdbc.Driver");
- }
- catch (ClassNotFoundException e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
- ErrorTypes.GENERALSERVICEERROR);
- return;
- }
- try {
- Connection sqlConnection = DriverManager.getConnection(this.sqlDBUrl, this.info);
- StringBuilder sql = new StringBuilder("CREATE DATABASE IF NOT EXISTS ")
+ if(!this.sqlDBUrl.toLowerCase().startsWith("jdbc:postgresql")) {
+ try {
+ Connection sqlConnection = DriverManager.getConnection(this.sqlDBUrl, this.info);
+ StringBuilder sql = new StringBuilder("CREATE DATABASE IF NOT EXISTS ")
.append(sqlDBName)
.append(";");
- Statement stmt = sqlConnection.createStatement();
- stmt.execute(sql.toString());
- sqlConnection.close();
- } catch (SQLException e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
+ Statement stmt = sqlConnection.createStatement();
+ stmt.execute(sql.toString());
+ sqlConnection.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.UNKNOWNERROR,
+ ErrorSeverity.CRITICAL,
ErrorTypes.GENERALSERVICEERROR);
- throw new MDBCServiceException(e.getMessage(), e);
+ throw new MDBCServiceException(e.getMessage(), e);
+ }
}
}
@@ -306,17 +311,9 @@ public class StateManager {
public Connection openConnection(String id) {
Connection sqlConnection;
MdbcConnection newConnection;
- try {
- //TODO: pass the driver as a variable
- Class.forName("org.mariadb.jdbc.Driver");
- }
- catch (ClassNotFoundException e) {
- // TODO Auto-generated catch block
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL,
- ErrorTypes.QUERYERROR);
- }
-
+ Utils.registerDefaultDrivers();
//Create connection to local SQL DB
+
try {
sqlConnection = DriverManager.getConnection(this.sqlDBUrl+"/"+this.sqlDBName, this.info);
} catch (SQLException e) {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java
new file mode 100755
index 0000000..e5a3252
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java
@@ -0,0 +1,213 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+package org.onap.music.mdbc;
+
+import com.datastax.driver.core.*;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.exceptions.MusicLockingException;
+import org.onap.music.lockingservice.cassandra.MusicLockState;
+import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.main.MusicCore;
+import org.onap.music.main.MusicUtil;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+import org.onap.music.mdbc.mixins.MusicInterface;
+import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+
+public class TestUtils {
+
+ private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TestUtils.class);
+
+ public static DatabasePartition createBasicRow(Range range, MusicInterface mixin, String mdbcServerName)
+ throws MDBCServiceException {
+ final UUID uuid = MDBCUtils.generateTimebasedUniqueKey();
+ List<Range> ranges = new ArrayList<>();
+ ranges.add(range);
+ DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null);
+ MusicRangeInformationRow newRow = new MusicRangeInformationRow(uuid,dbPartition, new ArrayList<>(), "",
+ mdbcServerName, true);
+ DatabasePartition partition=null;
+ partition = mixin.createMusicRangeInformation(newRow);
+ return partition;
+ }
+
+ public static void unlockRow(String keyspace, String mriTableName, DatabasePartition partition)
+ throws MusicLockingException {
+ String fullyQualifiedMriKey = keyspace+"."+ mriTableName+"."+partition.getMRIIndex().toString();
+ MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId());
+ }
+
+ public static void createKeyspace(String keyspace, Session session) {
+ String queryOp = "CREATE KEYSPACE " +
+ keyspace +
+ " WITH REPLICATION " +
+ "= {'class':'SimpleStrategy', 'replication_factor':1}; ";
+ ResultSet res=null;
+ res = session.execute(queryOp);
+ }
+
+ public static void deleteKeyspace(String keyspace, Session session){
+ String queryBuilder = "DROP KEYSPACE " +
+ keyspace +
+ ";";
+ ResultSet res = session.execute(queryBuilder);
+ }
+
+ public static HashSet<String> getMriColNames(){
+ return new HashSet<>(
+ Arrays.asList("rangeid","keys","txredolog","ownerid","metricprocessid")
+ );
+ }
+
+ public static HashSet<String> getMtdColNames(){
+ return new HashSet<>(
+ Arrays.asList("txid","transactiondigest")
+ );
+ }
+
+ public static HashMap<String, DataType> getMriColTypes(Cluster cluster) throws Exception {
+ HashMap<String, DataType> expectedTypes = new HashMap<>();
+ expectedTypes.put("rangeid",DataType.uuid());
+ expectedTypes.put("keys",DataType.set(DataType.text()));
+ ProtocolVersion currentVer = cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
+ if(currentVer != null) {
+ throw new Exception("Protocol version for cluster is invalid");
+ }
+ CodecRegistry registry = cluster.getConfiguration().getCodecRegistry();
+ if(registry!= null) {
+ throw new Exception("Codec registry for cluster is invalid");
+ }
+ expectedTypes.put("txredolog",DataType.list(TupleType.of(currentVer,registry,DataType.text(),DataType.uuid())));
+ expectedTypes.put("ownerid",DataType.text());
+ expectedTypes.put("metricprocessid",DataType.text());
+ return expectedTypes;
+ }
+
+ public static HashMap<String, DataType> getMtdColTypes(){
+ HashMap<String,DataType> expectedTypes = new HashMap<>();
+ expectedTypes.put("txid",DataType.uuid());
+ expectedTypes.put("transactiondigest",DataType.text());
+ return expectedTypes;
+ }
+
+ public static void checkDataTypeForTable(List<ColumnMetadata> columnsMeta, HashSet<String> expectedColumns,
+ HashMap<String,DataType> expectedTypes) throws Exception {
+ for(ColumnMetadata cMeta : columnsMeta){
+ String columnName = cMeta.getName();
+ DataType type = cMeta.getType();
+ if(!expectedColumns.contains(columnName)){
+ throw new Exception("Invalid column name: ");
+ }
+ if(!expectedTypes.containsKey(columnName)){
+ throw new Exception("Fix the contents of expectedtypes for column: "+columnName);
+ }
+ if(expectedTypes.get(columnName)!=type) {
+ throw new Exception("Invalid type for column: "+columnName);
+ }
+ }
+ }
+
+ public static void readPropertiesFile(Properties prop) {
+ try {
+ String fileLocation = MusicUtil.getMusicPropertiesFilePath();
+ InputStream fstream = new FileInputStream(fileLocation);
+ prop.load(fstream);
+ fstream.close();
+ } catch (FileNotFoundException e) {
+ logger.error("Configuration file not found");
+
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ logger.error("Exception when reading file: "+e.toString());
+ }
+ }
+
+
+ public static void populateMusicUtilsWithProperties(Properties prop){
+ //TODO: Learn how to do this properly within music
+ String[] propKeys = MusicUtil.getPropkeys();
+ for (int k = 0; k < propKeys.length; k++) {
+ String key = propKeys[k];
+ if (prop.containsKey(key) && prop.get(key) != null) {
+ switch (key) {
+ case "cassandra.host":
+ MusicUtil.setMyCassaHost(prop.getProperty(key));
+ break;
+ case "music.ip":
+ MusicUtil.setDefaultMusicIp(prop.getProperty(key));
+ break;
+ case "debug":
+ MusicUtil.setDebug(Boolean
+ .getBoolean(prop.getProperty(key).toLowerCase()));
+ break;
+ case "version":
+ MusicUtil.setVersion(prop.getProperty(key));
+ break;
+ case "music.rest.ip":
+ MusicUtil.setMusicRestIp(prop.getProperty(key));
+ break;
+ case "music.properties":
+ MusicUtil.setMusicPropertiesFilePath(prop.getProperty(key));
+ break;
+ case "lock.lease.period":
+ MusicUtil.setDefaultLockLeasePeriod(
+ Long.parseLong(prop.getProperty(key)));
+ break;
+ case "my.id":
+ MusicUtil.setMyId(Integer.parseInt(prop.getProperty(key)));
+ break;
+ case "all.ids":
+ String[] ids = prop.getProperty(key).split(":");
+ MusicUtil.setAllIds(new ArrayList<String>(Arrays.asList(ids)));
+ break;
+ case "public.ip":
+ MusicUtil.setPublicIp(prop.getProperty(key));
+ break;
+ case "all.public.ips":
+ String[] ips = prop.getProperty(key).split(":");
+ if (ips.length== 1) {
+ // Future use
+ } else if (ips.length > 1) {
+ MusicUtil.setAllPublicIps(
+ new ArrayList<String>(Arrays.asList(ips)));
+ }
+ break;
+ case "cassandra.user":
+ MusicUtil.setCassName(prop.getProperty(key));
+ break;
+ case "cassandra.password":
+ MusicUtil.setCassPwd(prop.getProperty(key));
+ break;
+ case "aaf.endpoint.url":
+ MusicUtil.setAafEndpointUrl(prop.getProperty(key));
+ break;
+ default:
+ System.out.println("No case found for " + key);
+ }
+ }
+ }
+
+
+ }
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java
new file mode 100644
index 0000000..7a09dca
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java
@@ -0,0 +1,80 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.onap.music.logging.EELFLoggerDelegate;
+
+public class Utils {
+
+
+ public static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Utils.class);
+
+ static Properties retrieveMdbcProperties() {
+ Properties pr = null;
+ try {
+ pr = new Properties();
+ pr.load(Utils.class.getResourceAsStream("/mdbc.properties"));
+ } catch (IOException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Could not load property file: " + e.getMessage());
+ }
+ return pr;
+ }
+
+ public static String getDefaultMusicMixin() {
+ Properties pr = retrieveMdbcProperties();
+ if (pr == null)
+ return null;
+ String defaultMusicMixin = pr.getProperty("DEFAULT_MUSIC_MIXIN");
+ return defaultMusicMixin;
+ }
+
+ public static String getDefaultDBMixin() {
+ Properties pr = retrieveMdbcProperties();
+ if (pr == null)
+ return null;
+ String defaultMusicMixin = pr.getProperty("DEFAULT_DB_MIXIN");
+ return defaultMusicMixin;
+ }
+
+ public static void registerDefaultDrivers() {
+ Properties pr = null;
+ try {
+ pr = new Properties();
+ pr.load(Utils.class.getResourceAsStream("/mdbc.properties"));
+ } catch (IOException e) {
+ logger.error("Could not load property file > " + e.getMessage());
+ }
+
+ String drivers = pr.getProperty("DEFAULT_DRIVERS");
+ for (String driver : drivers.split("[ ,]")) {
+ logger.info(EELFLoggerDelegate.applicationLogger, "Registering jdbc driver '" + driver + "'");
+ try {
+ Class.forName(driver.trim());
+ } catch (ClassNotFoundException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Driver class " + driver + " not found.");
+ }
+ }
+ }
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
index 38309d5..391ee1a 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
@@ -41,7 +41,7 @@ public class NodeConfiguration {
public String nodeName;
public String sqlDatabaseName;
- public NodeConfiguration(String tables, String eventualTables, UUID mriIndex, String sqlDatabaseName, String node){
+ public NodeConfiguration(String tables, List<String> eventualTables, UUID mriIndex, String sqlDatabaseName, String node){
// public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String mriTable, String lockId, String musicTxDigestTable) {
partition = new DatabasePartition(toRanges(tables), mriIndex, null) ;
eventual = new Eventual(toRanges(eventualTables));
@@ -49,16 +49,20 @@ public class NodeConfiguration {
this.sqlDatabaseName = sqlDatabaseName;
}
+ protected List<Range> toRanges(List<String> tables){
+ List<Range> newRange = new ArrayList<>();
+ for(String table: tables) {
+ newRange.add(new Range(table));
+ }
+ return newRange;
+ }
+
protected List<Range> toRanges(String tables){
if(tables.isEmpty()){
return new ArrayList<>();
}
- List<Range> newRange = new ArrayList<>();
String[] tablesArray=tables.split(",");
- for(String table: tablesArray) {
- newRange.add(new Range(table));
- }
- return newRange;
+ return toRanges(new ArrayList<>(Arrays.asList(tablesArray)));
}
public String toJson() {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
index 343a8b8..0598271 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
@@ -26,6 +26,7 @@ import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.mixins.MusicMixin;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigestDaemon;
import com.google.gson.Gson;
import org.onap.music.datastore.PreparedQueryObject;
@@ -46,6 +47,7 @@ public class TablesConfiguration {
private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TablesConfiguration.class);
private List<PartitionInformation> partitions;
+ private List<String> eventual;
String tableToPartitionName;
private String musicNamespace;
private String partitionInformationTableName;
@@ -83,16 +85,17 @@ public class TablesConfiguration {
logger.info("Creating empty row with id "+partitionId);
MusicMixin.createEmptyMriRow(musicNamespace,partitionInfo.mriTableName,UUID.fromString(partitionId),
- partitionInfo.owner,null,partitionInfo.getTables(),true);
+ partitionInfo.owner,null,
+ partitionInfo.getTables(),true);
//3) Create config for this node
StringBuilder newStr = new StringBuilder();
for(Range r: partitionInfo.tables){
- newStr.append(r.toString().toUpperCase()).append(",");
+ newStr.append(r.toString()).append(",");
}
logger.info("Appending row to configuration "+partitionId);
- nodeConfigs.add(new NodeConfiguration(newStr.toString(),"",UUID.fromString(partitionId),
+ nodeConfigs.add(new NodeConfiguration(newStr.toString(),eventual,UUID.fromString(partitionId),
sqlDatabaseName, partitionInfo.owner));
}
return nodeConfigs;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json
new file mode 100644
index 0000000..430525f
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json
@@ -0,0 +1,11 @@
+{
+ "internalNamespace": "music_internal",
+ "internalReplicationFactor": 1,
+ "musicNamespace": "namespace",
+ "musicReplicationFactor": 1,
+ "mriTableName": "musicrangeinformation",
+ "mtxdTableName": "musictxdigest",
+ "eventualMtxdTableName":"musicevetxdigest",
+ "nodeInfoTableName":"nodeinfo",
+ "rangeDependencyTableName":"musicrangedependency"
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json
index 8cbbfec..559d597 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json
@@ -3,21 +3,19 @@
{
"tables": [
{
- "table": "PERSONS"
+ "table": "public.PERSONS"
}
],
"owner": "",
"mriTableName": "musicrangeinformation",
- "mtxdTableName": "musictxdigest",
"partitionId": ""
}
],
- "internalNamespace": "music_internal",
- "internalReplicationFactor": 1,
+ "eventual": ["public.eventualPERSONS"],
"musicNamespace": "namespace",
- "musicReplicationFactor": 1,
"tableToPartitionName": "tabletopartition",
"partitionInformationTableName": "partitioninfo",
"redoHistoryTableName": "redohistory",
"sqlDatabaseName": "test"
}
+
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java
new file mode 100644
index 0000000..543cbd0
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java
@@ -0,0 +1,100 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc.mixins;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.websocket.RemoteEndpoint.Async;
+import org.onap.music.logging.EELFLoggerDelegate;
+
+public class AsyncUpdateHandler {
+
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(AsyncUpdateHandler.class);
+
+ Object finishMonitor;
+ Object taskMonitor;
+ AtomicBoolean pendingUpdate;
+ Runnable handler;
+
+ AsyncUpdateHandler(Runnable handlerToRun){
+ handler=handlerToRun;
+ finishMonitor=new Object();
+ taskMonitor = new Object();
+ pendingUpdate=new AtomicBoolean(false);
+ createUpdateExecutor();
+ }
+
+ void createUpdateExecutor(){
+ Runnable newRunnable = ()-> UpdateExecutor();
+ Thread newThread = new Thread(newRunnable);
+ newThread.setDaemon(true);
+ newThread.start();
+ }
+
+ public void processNewUpdate(){
+ pendingUpdate.set(true);
+ synchronized (taskMonitor) {
+ taskMonitor.notifyAll();
+ }
+ }
+
+ void UpdateExecutor(){
+ while(true) {
+ synchronized (taskMonitor) {
+ try {
+ if(!pendingUpdate.get()){
+ taskMonitor.wait();
+ }
+ } catch (InterruptedException e) {
+ logger.error("Update Executor received an interrup exception");
+ }
+ }
+ startUpdate();
+ }
+ }
+
+ void startUpdate(){
+ synchronized (finishMonitor) {
+ //Keep running until there are no more requests
+ while (pendingUpdate.getAndSet(false)) {
+ if(handler!=null){
+ handler.run();
+ }
+ }
+ finishMonitor.notifyAll();
+ }
+
+ }
+
+ public void waitForAllPendingUpdates(){
+ //Wait until there are no more requests and the thread is not longer running
+ //We could use a join, but given that the thread could be temporally null, then it is easier to
+ //separate the wait from the thread that is running
+ synchronized(finishMonitor){
+ while(pendingUpdate.get()) {
+ try {
+ finishMonitor.wait();
+ } catch (InterruptedException e) {
+ logger.error("waitForAllPendingUpdate received exception");
+ }
+ }
+ }
+ }
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
index a594918..dd71d97 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
@@ -111,6 +112,13 @@ public interface DBInterface {
* @return a ResultSet containing the rows returned from the query
*/
ResultSet executeSQLRead(String sql);
+
+ /**
+ * This method is used to verify that all data structures needed for commit are ready for commit
+ * Either for the MusicMixin (e.g. staging table) or internally for the DBMixin
+ * Important this is not for actual commit operation, it should be treated as a signal.
+ */
+ void preCommitHook();
void synchronizeData(String tableName);
@@ -133,4 +141,6 @@ public interface DBInterface {
void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException;
Connection getSQLConnection();
+
+ String getSchema();
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
index d822615..324b0f6 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
@@ -61,12 +61,13 @@ public class MixinFactory {
con = cl.getConstructor(MusicInterface.class, String.class, Connection.class, Properties.class);
if (con != null) {
logger.info(EELFLoggerDelegate.applicationLogger,"Found match: "+miname);
- return (DBInterface) con.newInstance(mi, url, conn, info);
+ DBInterface newdbi = (DBInterface) con.newInstance(mi, url, conn, info);
+ return newdbi;
}
}
}
} catch (Exception e) {
- logger.error(EELFLoggerDelegate.errorLogger,"createDBInterface: "+e);
+ logger.error(EELFLoggerDelegate.errorLogger,"createDBInterface error for "+cl.getName(),e);
}
}
return null;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
index 71f1b8b..fc8897f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
@@ -303,8 +303,8 @@ public interface MusicInterface {
void deleteOldMriRows(Map<UUID,String> oldRowsAndLocks) throws MDBCServiceException;
List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException;
-
+
void deleteMriRow(MusicRangeInformationRow row) throws MDBCServiceException;
void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index cb37a55..5a322f3 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -61,7 +61,6 @@ import org.onap.music.mdbc.StateManager;
import org.onap.music.mdbc.TableInfo;
import org.onap.music.mdbc.ownership.Dag;
import org.onap.music.mdbc.ownership.DagNode;
-import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.MusicTxDigestId;
@@ -99,6 +98,8 @@ public class MusicMixin implements MusicInterface {
public static final String KEY_MUSIC_RFACTOR = "music_rfactor";
/** The property name to use to provide the replication factor for Cassandra. */
public static final String KEY_MUSIC_NAMESPACE = "music_namespace";
+ /** The property name to use to provide a timeout to mdbc (ownership) */
+ public static final String KEY_TIMEOUT = "mdbc_timeout";
/** The property name to use to provide a flag indicating if compression is required */
public static final String KEY_COMPRESSION = "mdbc_compression";
/** Namespace for the tables in MUSIC (Cassandra) */
@@ -106,13 +107,15 @@ public class MusicMixin implements MusicInterface {
/** The default property value to use for the Cassandra IP address. */
public static final String DEFAULT_MUSIC_ADDRESS = "localhost";
/** The default property value to use for the Cassandra replication factor. */
- public static final int DEFAULT_MUSIC_RFACTOR = 1;
+ public static final int DEFAULT_MUSIC_RFACTOR = 3;
+ /** The default property value to use for the MDBC timeout */
+ public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours
/** The default primary string column, if none is provided. */
public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid";
/** Type of the primary key, if none is defined by the user */
public static final String MDBC_PRIMARYKEY_TYPE = "uuid";
public static final boolean DEFAULT_COMPRESSION = true;
-
+ //TODO: Control network topology strategy with a configuration file entry
public static final boolean ENABLE_NETWORK_TOPOLOGY_STRATEGY = false;
//\TODO Add logic to change the names when required and create the tables when necessary
@@ -254,6 +257,7 @@ public class MusicMixin implements MusicInterface {
public static void createKeyspace(String keyspace, int replicationFactor) throws MDBCServiceException {
Map<String,Object> replicationInfo = new HashMap<>();
replicationInfo.put("'class'", "'NetworkTopologyStrategy'");
+
if (ENABLE_NETWORK_TOPOLOGY_STRATEGY && replicationFactor==3) {
replicationInfo.put("'dc1'", 1);
replicationInfo.put("'dc2'", 1);
@@ -673,7 +677,7 @@ public class MusicMixin implements MusicInterface {
}
if(rs!=null) {
for (Row row : rs) {
- set.add(row.getString("TABLE_NAME").toUpperCase());
+ set.add(row.getString("TABLE_NAME"));
}
}
return set;
@@ -1295,15 +1299,13 @@ public class MusicMixin implements MusicInterface {
logger.warn("Trying tcommit log with null partition");
return;
}
+
List<Range> snapshot = partition.getSnapshot();
if(snapshot==null || snapshot.isEmpty()){
logger.warn("Trying to commit log with empty ranges");
return;
}
- // first deal with commit for eventually consistent tables
- filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper);
-
UUID mriIndex = partition.getMRIIndex();
String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex;
//0. See if reference to lock was already created
@@ -1475,7 +1477,7 @@ public class MusicMixin implements MusicInterface {
for(TupleValue t: log){
//final String tableName = t.getString(0);
final UUID id = t.getUUID(1);
- digestIds.add(new MusicTxDigestId(id,index++));
+ digestIds.add(new MusicTxDigestId(partitionIndex,id,index++));
}
List<Range> partitions = new ArrayList<>();
Set<String> tables = newRow.getSet("keys",String.class);
@@ -1526,11 +1528,14 @@ public class MusicMixin implements MusicInterface {
pQueryObject.appendQueryString(cql);
pQueryObject.addValue(baseRange.getTable());
Row newRow;
+ //TODO Change this when music fix the "." problem in the primary key
+ final String table = baseRange.getTable();
+ final String tableWithoutDot = table.replaceAll("\\.","");
try {
- newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,baseRange.getTable(),null);
+ newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,tableWithoutDot,null);
} catch (MDBCServiceException e) {
- logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName);
- throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e);
+ logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName+" trying for table "+tableWithoutDot);
+ throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information for table "+tableWithoutDot, e);
}
return getRangeDependenciesFromCassandraRow(newRow);
}
@@ -1838,7 +1843,6 @@ public class MusicMixin implements MusicInterface {
PreparedQueryObject query = new PreparedQueryObject();
int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR);
-
String cql = String.format("INSERT INTO %s.%s (txid,transactiondigest,compressed,year,txTimeId ) VALUES (?,?,?,?,now());",this.music_ns,
this.musicEventualTxDigestTableName);
query.appendQueryString(cql);
@@ -1856,6 +1860,7 @@ public class MusicMixin implements MusicInterface {
}
}
+
@Override
public StagingTable getTxDigest(MusicTxDigestId id) throws MDBCServiceException {
String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName);
@@ -1883,8 +1888,7 @@ public class MusicMixin implements MusicInterface {
}
return changes;
}
-
-
+
@Override
public LinkedHashMap<UUID, StagingTable> getEveTxDigest(String nodeName) throws MDBCServiceException {
int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR);
@@ -1901,7 +1905,7 @@ public class MusicMixin implements MusicInterface {
LinkedHashMap<UUID, StagingTable> ecDigestInformation = new LinkedHashMap<>();
UUID musicevetxdigestNodeinfoTimeID = getTxTimeIdFromNodeInfo(nodeName);
PreparedQueryObject pQueryObject = new PreparedQueryObject();
-
+
if (musicevetxdigestNodeinfoTimeID != null) {
// this will fetch only few records based on the time-stamp condition.
cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) AND txtimeid > ? LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString());
@@ -1912,7 +1916,7 @@ public class MusicMixin implements MusicInterface {
cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString());
pQueryObject.appendQueryString(cql);
}
-
+
// I need to get a ResultSet of all the records and give each row to the below HashMap.
ResultSet rs = executeMusicRead(pQueryObject);
while (!rs.isExhausted()) {
@@ -1920,8 +1924,8 @@ public class MusicMixin implements MusicInterface {
ByteBuffer digest = row.getBytes("transactiondigest");
Boolean compressed = row.getBool("compressed");
//String txTimeId = row.getString("txtimeid"); //???
- UUID txTimeId = row.getUUID("txtimeid");
-
+ UUID txTimeId = row.getUUID("txtimeid");
+
try {
if(compressed){
digest=StagingTable.Decompress(digest);
@@ -1932,7 +1936,7 @@ public class MusicMixin implements MusicInterface {
throw e;
}
ecDigestInformation.put(txTimeId, changes);
- }
+ }
return ecDigestInformation;
}
@@ -2218,7 +2222,7 @@ public class MusicMixin implements MusicInterface {
* @param cql the CQL to be sent to Cassandra
*/
private static void executeMusicWriteQuery(String keyspace, String table, String cql)
- throws MDBCServiceException {
+ throws MDBCServiceException {
PreparedQueryObject pQueryObject = new PreparedQueryObject();
pQueryObject.appendQueryString(cql);
ResultType rt = null;
@@ -2254,6 +2258,9 @@ public class MusicMixin implements MusicInterface {
throw new MDBCServiceException("Error executing atomic get", e);
}
}
+ if(result==null){
+ throw new MDBCServiceException("Error executing atomic get for primary key: "+primaryKey);
+ }
if(result.isExhausted()){
return null;
}
@@ -2418,11 +2425,17 @@ public class MusicMixin implements MusicInterface {
pQueryObject.addValue(row.getPartitionIndex());
ReturnType rt ;
try {
- rt = MusicCore.atomicPut(music_ns, musicRangeDependencyTableName, row.getPartitionIndex().toString(),
+ rt = MusicCore.atomicPut(music_ns, musicRangeInformationTableName, row.getPartitionIndex().toString(),
pQueryObject, null);
} catch (MusicLockingException|MusicQueryException|MusicServiceException e) {
logger.error("Failure when deleting mri row");
new MDBCServiceException("Error deleting mri row",e);
}
}
+
+ public StateManager getStateManager() {
+ return stateManager;
+ }
+
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
index 6d6e691..15caf3f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
@@ -40,6 +40,7 @@ import org.json.JSONTokener;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.Configuration;
import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
@@ -75,7 +76,7 @@ public class MySQLMixin implements DBInterface {
public static final String TRANS_TBL = "MDBC_TRANSLOG";
private static final String CREATE_TBL_SQL =
"CREATE TABLE IF NOT EXISTS "+TRANS_TBL+
- " (IX INT AUTO_INCREMENT, OP CHAR(1), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA VARCHAR(1024), " +
+ " (IX INT AUTO_INCREMENT, OP CHAR(1), SCHEMANAME VARCHAR(255), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA VARCHAR(1024), " +
"CONNECTION_ID INT, PRIMARY KEY (IX));";
private final MusicInterface mi;
@@ -85,6 +86,10 @@ public class MySQLMixin implements DBInterface {
private final Map<String, TableInfo> tables;
private PreparedStatement deleteStagingStatement;
private boolean server_tbl_created = false;
+ private boolean useAsyncStagingUpdate = false;
+ private Object stagingHandlerLock = new Object();
+ private AsyncUpdateHandler stagingHandler = null;
+ private StagingTable currentStaging=null;
public MySQLMixin() {
this.mi = null;
@@ -100,12 +105,35 @@ public class MySQLMixin implements DBInterface {
this.dbName = getDBName(conn);
this.jdbcConn = conn;
this.tables = new HashMap<String, TableInfo>();
+ useAsyncStagingUpdate = Boolean.parseBoolean(info.getProperty(Configuration.KEY_ASYNC_STAGING_TABLE_UPDATE,
+ Configuration.ASYNC_STAGING_TABLE_UPDATE));
+ }
+
+ class StagingTableUpdateRunnable implements Runnable{
+
+ private MySQLMixin mixin;
+ private StagingTable staging;
+
+ StagingTableUpdateRunnable(MySQLMixin mixin, StagingTable staging){
+ this.mixin=mixin;
+ this.staging=staging;
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.mixin.updateStagingTable(staging);
+ } catch (NoSuchFieldException|MDBCServiceException e) {
+ this.mixin.logger.error("Error when updating the staging table");
+ }
+ }
}
private PreparedStatement getStagingDeletePreparedStatement() throws SQLException {
return jdbcConn.prepareStatement("DELETE FROM "+TRANS_TBL+" WHERE (IX BETWEEN ? AND ? ) AND " +
"CONNECTION_ID = ?;");
}
+
// This is used to generate a unique connId for this connection to the DB.
private int generateConnID(Connection conn) {
int rv = (int) System.currentTimeMillis(); // random-ish
@@ -156,12 +184,20 @@ public class MySQLMixin implements DBInterface {
}
return dbname;
}
-
+
+ @Override
public String getDatabaseName() {
return this.dbName;
}
@Override
+ public String getSchema() {return this.dbName;}
+
+ /**
+ * Get a set of the table names in the database.
+ * @return the set
+ */
+ @Override
public Set<String> getSQLTableSet() {
Set<String> set = new TreeSet<String>();
String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
@@ -183,7 +219,7 @@ public class MySQLMixin implements DBInterface {
@Override
public Set<Range> getSQLRangeSet() {
Set<String> set = new TreeSet<String>();
- String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
+ String sql = "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
try {
Statement stmt = jdbcConn.createStatement();
ResultSet rs = stmt.executeQuery(sql);
@@ -244,9 +280,19 @@ mysql> describe tables;
TableInfo ti = tables.get(tableName);
if (ti == null) {
try {
- String tbl = tableName;//.toUpperCase();
- String sql = "SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME='"+tbl+"'";
- ResultSet rs = executeSQLRead(sql);
+ final String[] split = tableName.split("\\.");
+ String tbl = (split.length==2)?split[1]:tableName;
+ String localSchema = (split.length==2)?split[0]:getSchema();
+ StringBuilder sql=new StringBuilder();
+ sql.append("SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=");
+ if(localSchema==null) {
+ sql.append("DATABASE() AND TABLE_NAME='");
+ }
+ else {
+ sql.append("'").append(localSchema).append("' AND TABLE_NAME='");
+ }
+ sql.append(tbl).append("';");
+ ResultSet rs = executeSQLRead(sql.toString());
if (rs != null) {
ti = new TableInfo();
while (rs.next()) {
@@ -296,9 +342,13 @@ mysql> describe tables;
}
}
@Override
- public void createSQLTriggers(String tableName) {
- // Don't create triggers for the table the triggers write into!!!
+ public void createSQLTriggers(String table) {
+ final String[] split = table.split("\\.");
+ String schemaName = (split.length==2)?split[0]:getSchema();
+ String tableName = (split.length==2)?split[1]:table;
+
if (tableName.equals(TRANS_TBL))
+ // Don't create triggers for the table the triggers write into!!!
return;
try {
if (!server_tbl_created) {
@@ -321,12 +371,12 @@ mysql> describe tables;
//msm.register(name);
}
// No SELECT trigger
- executeSQLWrite(generateTrigger(tableName, "INSERT"));
- executeSQLWrite(generateTrigger(tableName, "UPDATE"));
+ executeSQLWrite(generateTrigger(schemaName,tableName, "INSERT"));
+ executeSQLWrite(generateTrigger(schemaName,tableName, "UPDATE"));
//\TODO: save key row instead of the whole row for delete
- executeSQLWrite(generateTrigger(tableName, "DELETE"));
+ executeSQLWrite(generateTrigger(schemaName,tableName, "DELETE"));
} catch (SQLException e) {
- if (e.getMessage().equals("Trigger already exists")) {
+ if (e.getMessage().equals("Trigger already exists") || e.getMessage().endsWith("already exists")){
//only warn if trigger already exists
logger.warn(EELFLoggerDelegate.applicationLogger, "createSQLTriggers" + e);
} else {
@@ -343,7 +393,7 @@ END;
OLD.field refers to the old value
NEW.field refers to the new value
*/
- private String generateTrigger(String tableName, String op) {
+ private String generateTrigger(String schema, String tableName, String op) {
boolean isdelete = op.equals("DELETE");
boolean isinsert = op.equals("INSERT");
boolean isupdate = op.equals("UPDATE");
@@ -371,25 +421,27 @@ NEW.field refers to the new value
//\TODO check if using mysql driver, so instead check the exception
//\TODO add conditional for update, if primary key is still the same, use null in the KEYDATA col
StringBuilder sb = new StringBuilder()
- .append("CREATE TRIGGER ") // IF NOT EXISTS not supported by MySQL!
- .append(String.format("%s_%s", op.substring(0, 1), tableName))
- .append(" AFTER ")
- .append(op)
- .append(" ON ")
- .append(tableName.toUpperCase())
- .append(" FOR EACH ROW INSERT INTO ")
- .append(TRANS_TBL)
- .append(" (TABLENAME, OP, KEYDATA, ROWDATA, CONNECTION_ID) VALUES('")
- .append(tableName.toUpperCase())
- .append("', ")
- .append(isdelete ? "'D'" : (op.equals("INSERT") ? "'I'" : "'U'"))
- .append(", ")
- .append( (keyJson.length()>"JSON_OBJECT()".length()) ? keyJson.toString() : "NULL")
- .append(", ")
- .append(newJson.toString())
- .append(", ")
- .append("CONNECTION_ID()")
- .append(")");
+ .append("CREATE TRIGGER ") // IF NOT EXISTS not supported by MySQL!
+ .append(String.format("%s_%s", op.substring(0, 1), tableName))
+ .append(" AFTER ")
+ .append(op)
+ .append(" ON ")
+ .append(tableName)
+ .append(" FOR EACH ROW INSERT INTO ")
+ .append(TRANS_TBL)
+ .append(" (SCHEMANAME, TABLENAME, OP, KEYDATA, ROWDATA, CONNECTION_ID) VALUES('")
+ .append( (schema==null)?this.getSchema():schema )
+ .append("', '")
+ .append(tableName)
+ .append("', ")
+ .append(isdelete ? "'D'" : (op.equals("INSERT") ? "'I'" : "'U'"))
+ .append(", ")
+ .append( (keyJson.length()>"JSON_OBJECT()".length()) ? keyJson.toString() : "NULL")
+ .append(", ")
+ .append(newJson.toString())
+ .append(", ")
+ .append("CONNECTION_ID()")
+ .append(")");
return sb.toString();
}
private String[] getTriggerNames(String tableName) {
@@ -521,6 +573,16 @@ NEW.field refers to the new value
return rs;
}
+ @Override
+ public void preCommitHook() {
+ synchronized (stagingHandlerLock){
+ //\TODO check if this can potentially block forever in certain scenarios
+ if(stagingHandler!=null){
+ stagingHandler.waitForAllPendingUpdates();
+ }
+ }
+ }
+
/**
* This method executes a write query in the sql database.
* @param sql the SQL to be sent to MySQL
@@ -569,11 +631,24 @@ NEW.field refers to the new value
String[] parts = sql.trim().split(" ");
String cmd = parts[0].toLowerCase();
if ("delete".equals(cmd) || "insert".equals(cmd) || "update".equals(cmd)) {
- try {
- this.updateStagingTable(transactionDigest);
- } catch (NoSuchFieldException|MDBCServiceException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ if (useAsyncStagingUpdate) {
+ synchronized (stagingHandlerLock){
+ if(stagingHandler==null||currentStaging!=transactionDigest){
+ Runnable newRunnable = new StagingTableUpdateRunnable(this, transactionDigest);
+ currentStaging=transactionDigest;
+ stagingHandler=new AsyncUpdateHandler(newRunnable);
+ }
+ //else we can keep using the current staging Handler
+ }
+ stagingHandler.processNewUpdate();
+ } else {
+
+ try {
+ this.updateStagingTable(transactionDigest);
+ } catch (NoSuchFieldException | MDBCServiceException e) {
+ // TODO Auto-generated catch block
+ this.logger.error("Error updating the staging table");
+ }
}
}
}
@@ -604,7 +679,7 @@ NEW.field refers to the new value
throws NoSuchFieldException, MDBCServiceException {
// copy from DB.MDBC_TRANSLOG where connid == myconnid
// then delete from MDBC_TRANSLOG
- String sql2 = "SELECT IX, TABLENAME, OP, ROWDATA,KEYDATA FROM "+TRANS_TBL +" WHERE CONNECTION_ID = " + this.connId;
+ String sql2 = "SELECT IX, SCHEMANAME, TABLENAME, OP, ROWDATA, KEYDATA FROM " + TRANS_TBL +" WHERE CONNECTION_ID = " + this.connId;
Integer biggestIx = Integer.MIN_VALUE;
Integer smallestIx = Integer.MAX_VALUE;
try {
@@ -616,10 +691,11 @@ NEW.field refers to the new value
smallestIx = Integer.min(smallestIx,ix);
String op = rs.getString("OP");
SQLOperation opType = toOpEnum(op);
+ String schema= rs.getString("SCHEMANAME");
String tbl = rs.getString("TABLENAME");
String newRowStr = rs.getString("ROWDATA");
String rowStr = rs.getString("KEYDATA");
- Range range = new Range(tbl);
+ Range range = new Range(schema+"."+tbl);
transactionDigests.addOperation(range,opType,newRowStr,rowStr);
rows.add(ix);
}
@@ -1056,7 +1132,7 @@ NEW.field refers to the new value
private void clearReplayedOperations(Statement jdbcStmt) throws SQLException {
logger.info("Clearing replayed operations");
String sql = "DELETE FROM " + TRANS_TBL + " WHERE CONNECTION_ID = " + this.connId;
- jdbcStmt.executeQuery(sql);
+ jdbcStmt.executeUpdate(sql);
}
@Override
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java
new file mode 100755
index 0000000..0f66731
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java
@@ -0,0 +1,1066 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+package org.onap.music.mdbc.mixins;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.*;
+import java.util.Map.Entry;
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.delete.Delete;
+import net.sf.jsqlparser.statement.insert.Insert;
+import net.sf.jsqlparser.statement.update.Update;
+import org.json.JSONObject;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.Configuration;
+import org.onap.music.mdbc.MDBCUtils;
+import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.TableInfo;
+import org.onap.music.mdbc.mixins.MySQLMixin.StagingTableUpdateRunnable;
+import org.onap.music.mdbc.tables.Operation;
+import org.onap.music.mdbc.query.SQLOperation;
+import org.onap.music.mdbc.tables.StagingTable;
+import org.postgresql.util.PGInterval;
+import org.postgresql.util.PGobject;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+/**
+ * This class provides the methods that MDBC needs in order to mirror data to/from a
+ * <a href="https://dev.mysql.com/">MySQL</a> or <a href="http://mariadb.org/">MariaDB</a> database instance. This class
+ * uses the <code>JSON_OBJECT()</code> database function, which means it requires the following minimum versions of
+ * either database:
+ * <table summary="">
+ * <tr>
+ * <th>DATABASE</th>
+ * <th>VERSION</th>
+ * </tr>
+ * <tr>
+ * <td>MySQL</td>
+ * <td>5.7.8</td>
+ * </tr>
+ * <tr>
+ * <td>MariaDB</td>
+ * <td>10.2.3 (Note: 10.2.3 is currently (July 2017) a <i>beta</i> release)</td>
+ * </tr>
+ * </table>
+ *
+ * @author Robert P. Eby
+ */
+public class PostgresMixin implements DBInterface {
+
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(PostgresMixin.class);
+
+ public static final String MIXIN_NAME = "postgres";
+ public static final String TRANS_TBL_SCHEMA = "audit";
+ public static final String TRANS_TBL = "mdbc_translog";
+
+ private final MusicInterface mi;
+ private final String connId;
+ private final String dbName;
+ private final String schema;
+ private final Connection jdbcConn;
+ private final Map<String, TableInfo> tables;
+ private PreparedStatement deleteStagingStatement;
+ private boolean useAsyncStagingUpdate = false;
+ private Object stagingHandlerLock = new Object();
+ private AsyncUpdateHandler stagingHandler = null;
+ private StagingTable currentStaging = null;
+
+ public PostgresMixin() {
+ this.mi = null;
+ this.connId = "";
+ this.dbName = null;
+ this.schema = null;
+ this.jdbcConn = null;
+ this.tables = null;
+ this.deleteStagingStatement = null;
+ }
+
+ private void initializeDeleteStatement() throws SQLException {
+ deleteStagingStatement = jdbcConn.prepareStatement("DELETE FROM " + TRANS_TBL_SCHEMA + "." + TRANS_TBL
+ + " WHERE (ix BETWEEN ? AND ? ) AND " + "connection_id = ?;");
+ }
+
+ public PostgresMixin(MusicInterface mi, String url, Connection conn, Properties info) throws SQLException {
+ this.mi = mi;
+ this.connId = generateConnID(conn);
+ this.dbName = getDBName(conn);
+ this.schema = getSchema(conn);
+ this.jdbcConn = conn;
+ this.tables = new HashMap<>();
+ useAsyncStagingUpdate = Boolean.parseBoolean(info.getProperty(Configuration.KEY_ASYNC_STAGING_TABLE_UPDATE,
+ Configuration.ASYNC_STAGING_TABLE_UPDATE));
+ initializePostgresTriggersStructures();
+ initializeDeleteStatement();
+ }
+
+ class StagingTableUpdateRunnable implements Runnable {
+
+ private PostgresMixin mixin;
+ private StagingTable staging;
+
+ StagingTableUpdateRunnable(PostgresMixin mixin, StagingTable staging) {
+ this.mixin = mixin;
+ this.staging = staging;
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.mixin.updateStagingTable(staging);
+ } catch (NoSuchFieldException | MDBCServiceException e) {
+ this.mixin.logger.error("Error when updating the staging table");
+ }
+ }
+ }
+
+
+ private void createTriggerTable() throws SQLException {
+ final String createSchemaSQL = "CREATE SCHEMA IF NOT EXISTS " + TRANS_TBL_SCHEMA + ";";
+ final String revokeCreatePrivilegesSQL = "REVOKE CREATE ON schema " + TRANS_TBL_SCHEMA + " FROM public;";
+ final String createTableSQL = "CREATE TABLE IF NOT EXISTS " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " ("
+ + "ix serial," + "op TEXT NOT NULL CHECK (op IN ('I','D','U'))," + "schema_name text NOT NULL,"
+ + "table_name text NOT NULL," + "original_data json," + "new_data json," + "connection_id text,"
+ + "PRIMARY KEY (connection_id,ix)" + ") WITH (fillfactor=100);";
+ final String revokeSQL = "REVOKE INSERT,UPDATE,DELETE,TRUNCATE,REFERENCES,TRIGGER ON " + TRANS_TBL_SCHEMA + "."
+ + TRANS_TBL + " FROM public;";
+ final String grantSelectSQL = "GRANT SELECT ON " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " TO public;";
+ final String createIndexSQL = "CREATE INDEX IF NOT EXISTS logged_actions_connection_id_idx" + " ON "
+ + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " (connection_id);";
+ Map<String, String> sqlStatements = new LinkedHashMap<String, String>() {
+ {
+ put("create_schema", createSchemaSQL);
+ put("revoke_privileges", revokeCreatePrivilegesSQL);
+ put("create_table", createTableSQL);
+ put("revoke_sql", revokeSQL);
+ put("grant_select", grantSelectSQL);
+ put("create_index", createIndexSQL);
+ }
+ };
+ for (Entry<String, String> query : sqlStatements.entrySet()) {
+ int retryCount = 0;
+ boolean ready = false;
+ while (retryCount < 3 && !ready) {
+ try {
+ Statement statement = jdbcConn.createStatement();
+ statement.executeUpdate(query.getValue());
+ if (!jdbcConn.getAutoCommit()) {
+ jdbcConn.commit();
+ }
+ statement.close();
+ ready = true;
+ } catch (SQLException e) {
+ if (e.getMessage().equalsIgnoreCase("ERROR: tuple concurrently updated") || e.getMessage()
+ .toLowerCase().startsWith("error: duplicate key value violates unique constraint")) {
+ logger.warn("Error creating schema, retrying. for " + query.getKey(), e);
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e1) {
+ }
+ } else {
+ logger.error("Error executing " + query.getKey(), e);
+ throw e;
+ }
+ }
+ retryCount++;
+ }
+ }
+ }
+
+ private String updateTriggerSection() {
+ return "IF (TG_OP = 'UPDATE') THEN\n" + "v_old_data := row_to_json(OLD);\n"
+ + "v_new_data := row_to_json(NEW);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA + "." + TRANS_TBL
+ + " (op,schema_name,table_name,original_data,new_data,connection_id)\n"
+ + "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_old_data,v_new_data,pg_backend_pid());\n"
+ + "RETURN NEW; ";
+ }
+
+ private String insertTriggerSection() {
+ // \TODO add additinoal conditional on change "IF NEW IS DISTINCT FROM OLD THEN"
+ return "IF (TG_OP = 'INSERT') THEN\n" + "v_new_data := row_to_json(NEW);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA
+ + "." + TRANS_TBL + " (op,schema_name,table_name,new_data,connection_id)\n"
+ + "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_new_data,pg_backend_pid());\n"
+ + "RETURN NEW; ";
+ }
+
+ private String deleteTriggerSection() {
+ return "IF (TG_OP = 'DELETE') THEN\n" + "v_old_data := row_to_json(OLD);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA
+ + "." + TRANS_TBL + " (op,schema_name,table_name,original_data,connection_id)\n"
+ + "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_old_data,pg_backend_pid());\n"
+ + "RETURN OLD; ";
+ }
+
+ private String functionName(SQLOperation type) {
+ final String functionName = (type.equals(SQLOperation.UPDATE)) ? "if_updated_func"
+ : (type.equals(SQLOperation.INSERT)) ? "if_inserted_func" : "if_deleted_func";
+ return "audit." + functionName + "()";
+ }
+
+ private void createTriggerFunctions(SQLOperation type) throws SQLException {
+ StringBuilder functionSQL =
+ new StringBuilder("CREATE OR REPLACE FUNCTION " + functionName(type) + " RETURNS TRIGGER AS $body$\n"
+ + "DECLARE\n" + "v_old_data json;\n" + "v_new_data json;\n" + "BEGIN\n");
+ switch (type) {
+ case UPDATE:
+ functionSQL.append(updateTriggerSection());
+ break;
+ case INSERT:
+ functionSQL.append(insertTriggerSection());
+ break;
+ case DELETE:
+ functionSQL.append(deleteTriggerSection());
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid operation type for creation of trigger functions");
+ }
+ functionSQL.append("ELSE\n"
+ + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - Other action occurred: %, at %',TG_OP,now();\n"
+ + "RETURN NULL;\n" + "END IF;\n" + "EXCEPTION\n" + "WHEN data_exception THEN\n"
+ + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [DATA EXCEPTION] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n"
+ + "RETURN NULL;\n" + "WHEN unique_violation THEN\n"
+ + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [UNIQUE] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n"
+ + "RETURN NULL;\n" + "WHEN OTHERS THEN\n"
+ + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [OTHER] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n"
+ + "RETURN NULL;\n" + "END;\n" + "$body$\n" + "LANGUAGE plpgsql\n" + "SECURITY DEFINER\n"
+ + "SET search_path = pg_catalog, audit;");
+ int retryCount = 0;
+ boolean ready = false;
+ while (retryCount < 3 && !ready) {
+ try {
+ executeSQLWrite(functionSQL.toString());
+ ready = true;
+ } catch (SQLException e) {
+ if (e.getMessage().equalsIgnoreCase("ERROR: tuple concurrently updated")) {
+ logger.warn("Error creating schema, retrying. ", e);
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e1) {
+ }
+ } else {
+ logger.error("Error executing creation of trigger function", e);
+ throw e;
+ }
+ }
+ retryCount++;
+ }
+ }
+
+ private void initializePostgresTriggersStructures() throws SQLException {
+ try {
+ createTriggerTable();
+ } catch (SQLException e) {
+ logger.error("Error creating the trigger tables in postgres", e);
+ throw e;
+ }
+ try {
+ createTriggerFunctions(SQLOperation.INSERT);
+ createTriggerFunctions(SQLOperation.UPDATE);
+ createTriggerFunctions(SQLOperation.DELETE);
+ } catch (SQLException e) {
+ logger.error("Error creating the trigger functions in postgres", e);
+ throw e;
+ }
+ }
+
+ // This is used to generate a unique connId for this connection to the DB.
+ private String generateConnID(Connection conn) {
+ String rv = Integer.toString((int) System.currentTimeMillis()); // random-ish
+ try {
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT pg_backend_pid() AS IX");
+ if (rs.next()) {
+ rv = rs.getString("IX");
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "generateConnID: problem generating a connection ID!");
+ }
+ return rv;
+ }
+
+ /**
+ * Get the name of this DBnterface mixin object.
+ *
+ * @return the name
+ */
+ @Override
+ public String getMixinName() {
+ return MIXIN_NAME;
+ }
+
+ @Override
+ public void close() {
+ // nothing yet
+ }
+
+ /**
+ * Determines the db name associated with the connection This is the private/internal method that actually
+ * determines the name
+ *
+ * @param conn
+ * @return
+ */
+ private String getDBName(Connection conn) {
+ String dbname = "mdbc"; // default name
+ try {
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT current_database();");
+ if (rs.next()) {
+ dbname = rs.getString("current_database");
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "getDBName: problem getting database name from mysql");
+ }
+ return dbname;
+ }
+
+ /**
+ * Determines the db name associated with the connection This is the private/internal method that actually
+ * determines the name
+ *
+ * @param conn
+ * @return
+ */
+ private String getSchema(Connection conn) {
+ String schema = "public"; // default name
+ try {
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT current_schema();");
+ if (rs.next()) {
+ schema = rs.getString("current_schema");
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "getDBName: problem getting database name from mysql");
+ }
+ return schema;
+ }
+
+ @Override
+ public String getSchema() {
+ return schema;
+ }
+
+ @Override
+ public String getDatabaseName() {
+ return this.dbName;
+ }
+
+ /**
+ * Get a set of the table names in the database.
+ *
+ * @return the set
+ */
+ @Override
+ public Set<Range> getSQLRangeSet() {
+ Set<String> set = new TreeSet<String>();
+ String sql =
+ "SELECT table_name FROM information_schema.tables WHERE table_type='BASE TABLE' AND table_schema=current_schema();";
+ try {
+ Statement stmt = jdbcConn.createStatement();
+ ResultSet rs = stmt.executeQuery(sql);
+ while (rs.next()) {
+ String s = rs.getString("table_name");
+ set.add(s);
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "getSQLTableSet: " + e);
+ }
+ logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set);
+ Set<Range> rangeSet = new HashSet<>();
+ for (String table : set) {
+ rangeSet.add(new Range(table));
+ }
+ return rangeSet;
+ }
+
+ /**
+ * Return a TableInfo object for the specified table. This method first looks in a cache of previously constructed
+ * TableInfo objects for the table. If not found, it queries the INFORMATION_SCHEMA.COLUMNS table to obtain the
+ * column names, types, and indexes of the table. It creates a new TableInfo object with the results.
+ *
+ * @param tableName the table to look up
+ * @return a TableInfo object containing the info we need, or null if the table does not exist
+ */
+ @Override
+ public TableInfo getTableInfo(String tableName) {
+ // \TODO: invalidate "tables" when a table schema is modified (uncommon), but needs to be handled
+ TableInfo ti = tables.get(tableName);
+ if (ti == null) {
+ try {
+ String tbl, localSchema;
+ final String[] split = tableName.split("\\.");
+ if (split.length == 2) {
+ localSchema = split[0];
+ tbl = split[1];
+ } else {
+ tbl = tableName;
+ localSchema = this.schema;
+ }
+ String sql;
+ if (schema == null) {
+ sql = "select column_name, data_type from information_schema.columns where table_schema=current_schema() and table_name='"
+ + tbl + "';";
+ } else {
+ sql = "select column_name, data_type from information_schema.columns where table_schema='"
+ + localSchema + "' and table_name='" + tbl + "';";
+ }
+ ResultSet rs = executeSQLRead(sql);
+ if (rs != null) {
+ ti = new TableInfo();
+ while (rs.next()) {
+ String name = rs.getString("column_name");
+ String type = rs.getString("data_type");
+ ti.columns.add(name);
+ ti.coltype.add(mapDatatypeNameToType(type));
+ }
+ rs.getStatement().close();
+ } else {
+ logger.error(EELFLoggerDelegate.errorLogger,
+ "Cannot retrieve table info for table " + tableName + " from POSTGRES.");
+ return null;
+ }
+ final String keysql =
+ "SELECT a.attname as column_name FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid"
+ + " AND a.attnum = ANY(i.indkey) WHERE i.indrelid = '" + tbl + "'::regclass "
+ + " AND i.indisprimary;";
+ ResultSet rs2 = executeSQLRead(keysql);
+ Set<String> keycols = new HashSet<>();
+ if (rs2 != null) {
+ while (rs2.next()) {
+ String name = rs2.getString("column_name");
+ keycols.add(name);
+ }
+ rs2.getStatement().close();
+ } else {
+ logger.error(EELFLoggerDelegate.errorLogger,
+ "Cannot retrieve table info for table " + tableName + " from MySQL.");
+ }
+ for (String col : ti.columns) {
+ if (keycols.contains(col)) {
+ ti.iskey.add(true);
+ } else {
+ ti.iskey.add(false);
+ }
+ }
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,
+ "Cannot retrieve table info for table " + tableName + " from MySQL: " + e);
+ return null;
+ }
+ tables.put(tableName, ti);
+ }
+ return ti;
+ }
+
+ // Map Postgres data type names to the java.sql.Types equivalent
+ private int mapDatatypeNameToType(String nm) {
+ switch (nm) {
+ case "character":
+ return Types.CHAR;
+ case "national character":
+ return Types.NCHAR;
+ case "character varying":
+ return Types.VARCHAR;
+ case "national character varying":
+ return Types.NVARCHAR;
+ case "text":
+ return Types.VARCHAR;
+ case "bytea":
+ return Types.BINARY;
+ case "smallint":
+ return Types.SMALLINT;
+ case "integer":
+ return Types.INTEGER;
+ case "bigint":
+ return Types.BIGINT;
+ case "smallserial":
+ return Types.SMALLINT;
+ case "serial":
+ return Types.INTEGER;
+ case "bigserial":
+ return Types.BIGINT;
+ case "real":
+ return Types.REAL;
+ case "double precision":
+ return Types.DOUBLE;
+ case "numeric":
+ return Types.NUMERIC;
+ case "decimal":
+ return Types.DECIMAL;
+ case "date":
+ return Types.DATE;
+ case "time with time zone":
+ return Types.TIME_WITH_TIMEZONE;
+ case "time without time zone":
+ return Types.TIME;
+ case "timestamp without time zone":
+ return Types.TIMESTAMP;
+ case "timestamp with time zone":
+ return Types.TIMESTAMP_WITH_TIMEZONE;
+ case "boolean":
+ return Types.BIT;
+ case "bit":
+ return Types.BIT;
+ case "oid":
+ return Types.BIGINT;
+ case "xml":
+ return Types.SQLXML;
+ case "array":
+ return Types.ARRAY;
+ case "tinyint":
+ return Types.TINYINT;
+ case "uuid":
+ case "money":
+ case "interval":
+ case "bit varying":
+ case "box":
+ case "point":
+ case "lseg":
+ case "path":
+ case "polygon":
+ case "circle":
+ case "json":
+ case "inet":
+ case "cidr":
+ case "macaddr":
+ case "tsvector":
+ case "tsquery":
+ return Types.OTHER;
+ default:
+ logger.error(EELFLoggerDelegate.errorLogger, "unrecognized and/or unsupported data type " + nm);
+ return Types.VARCHAR;
+ }
+ }
+
+ @Override
+ public void createSQLTriggers(String tableName) {
+ // Don't create triggers for the table the triggers write into!!!
+ if (tableName.equals(TRANS_TBL) || tableName.equals(TRANS_TBL_SCHEMA + "." + TRANS_TBL))
+ return;
+ try {
+ // No SELECT trigger
+ executeSQLWrite(generateTrigger(tableName, SQLOperation.INSERT));
+ executeSQLWrite(generateTrigger(tableName, SQLOperation.DELETE));
+ executeSQLWrite(generateTrigger(tableName, SQLOperation.UPDATE));
+ } catch (SQLException e) {
+ if (e.getMessage().trim().endsWith("already exists")) {
+ // only warn if trigger already exists
+ logger.warn(EELFLoggerDelegate.applicationLogger, "createSQLTriggers" + e);
+ } else {
+ logger.error(EELFLoggerDelegate.errorLogger, "createSQLTriggers: " + e);
+ }
+ }
+ }
+
+ private String generateTrigger(String tableName, SQLOperation op) {
+ StringBuilder triggerSql = new StringBuilder("CREATE TRIGGER ").append(getTriggerName(tableName, op))
+ .append(" AFTER " + op + " ON ").append(tableName).append(" FOR EACH ROW EXECUTE PROCEDURE ")
+ .append(functionName(op)).append(';');
+ return triggerSql.toString();
+ }
+
+ private String getTriggerName(String tableName, SQLOperation op) {
+ switch (op) {
+ case DELETE:
+ return "D_" + tableName;
+ case UPDATE:
+ return "U_" + tableName;
+ case INSERT:
+ return "I_" + tableName;
+ default:
+ throw new IllegalArgumentException("Invalid option in trigger operation type");
+ }
+ }
+
+ private String[] getTriggerNames(String tableName) {
+ return new String[] {getTriggerName(tableName, SQLOperation.INSERT),
+ getTriggerName(tableName, SQLOperation.DELETE), getTriggerName(tableName, SQLOperation.UPDATE),};
+ }
+
+ @Override
+ public void dropSQLTriggers(String tableName) {
+ try {
+ for (String name : getTriggerNames(tableName)) {
+ logger.debug(EELFLoggerDelegate.applicationLogger, "REMOVE trigger " + name + " from postgres");
+ executeSQLWrite("DROP TRIGGER IF EXISTS " + name + ";");
+ }
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "dropSQLTriggers: " + e);
+ }
+ }
+
+ @Override
+ public void insertRowIntoSqlDb(String tableName, Map<String, Object> map) {
+ throw new org.apache.commons.lang.NotImplementedException("Function not implemented yet in postgres");
+ }
+
+ @Override
+ public void deleteRowFromSqlDb(String tableName, Map<String, Object> map) {
+ throw new org.apache.commons.lang.NotImplementedException("Function not implemented yet in postgres");
+ }
+
+ /**
+ * This method executes a read query in the SQL database. Methods that call this method should be sure to call
+ * resultset.getStatement().close() when done in order to free up resources.
+ *
+ * @param sql the query to run
+ * @return a ResultSet containing the rows returned from the query
+ */
+ @Override
+ public ResultSet executeSQLRead(String sql) {
+ logger.debug(EELFLoggerDelegate.applicationLogger, "Executing sql read in postgres");
+ logger.debug("Executing SQL read:" + sql);
+ ResultSet rs;
+ try {
+ Statement stmt = jdbcConn.createStatement();
+ rs = stmt.executeQuery(sql);
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "executeSQLRead" + e);
+ return null;
+ }
+ return rs;
+ }
+
+ /**
+ * This method executes a write query in the sql database.
+ *
+ * @param sql the SQL to be sent to MySQL
+ * @throws SQLException if an underlying JDBC method throws an exception
+ */
+ protected void executeSQLWrite(String sql) throws SQLException {
+ logger.debug(EELFLoggerDelegate.applicationLogger, "Executing SQL write:" + sql);
+ Statement stmt = jdbcConn.createStatement();
+ stmt.execute(sql);
+ stmt.close();
+ }
+
+ @Override
+ public void preCommitHook() {
+ synchronized (stagingHandlerLock) {
+ // \TODO check if this can potentially block forever in certain scenarios
+ if (stagingHandler != null) {
+ stagingHandler.waitForAllPendingUpdates();
+ }
+ }
+ }
+
+ /**
+ * Code to be run within the DB driver before a SQL statement is executed. This is where tables can be synchronized
+ * before a SELECT, for those databases that do not support SELECT triggers.
+ *
+ * @param sql the SQL statement that is about to be executed
+ * @return list of keys that will be updated, if they can't be determined afterwards (i.e. sql table doesn't have
+ * primary key)
+ */
+ @Override
+ public void preStatementHook(final String sql) {
+ if (sql == null) {
+ return;
+ }
+ // \TODO: check if anything needs to be executed here for postgres
+ }
+
+ /**
+ * Code to be run within the DB driver after a SQL statement has been executed. This is where remote statement
+ * actions can be copied back to Cassandra/MUSIC.
+ *
+ * @param sql the SQL statement that was executed
+ */
+ @Override
+ public void postStatementHook(final String sql, StagingTable transactionDigest) {
+ if (sql != null) {
+ String[] parts = sql.trim().split(" ");
+ String cmd = parts[0].toLowerCase();
+ if ("delete".equals(cmd) || "insert".equals(cmd) || "update".equals(cmd)) {
+ if (useAsyncStagingUpdate) {
+ synchronized (stagingHandlerLock) {
+ if (stagingHandler == null || currentStaging != transactionDigest) {
+ Runnable newRunnable =
+ new PostgresMixin.StagingTableUpdateRunnable(this, transactionDigest);
+ currentStaging = transactionDigest;
+ stagingHandler = new AsyncUpdateHandler(newRunnable);
+ }
+ // else we can keep using the current staging Handler
+ }
+ stagingHandler.processNewUpdate();
+ } else {
+
+ try {
+ this.updateStagingTable(transactionDigest);
+ } catch (NoSuchFieldException | MDBCServiceException e) {
+ // TODO Auto-generated catch block
+ this.logger.error("Error updating the staging table");
+ }
+ }
+ }
+ }
+ }
+
+ private SQLOperation toOpEnum(String operation) throws NoSuchFieldException {
+ switch (operation.toLowerCase()) {
+ case "i":
+ return SQLOperation.INSERT;
+ case "d":
+ return SQLOperation.DELETE;
+ case "u":
+ return SQLOperation.UPDATE;
+ case "s":
+ return SQLOperation.SELECT;
+ default:
+ logger.error(EELFLoggerDelegate.errorLogger, "Invalid operation selected: [" + operation + "]");
+ throw new NoSuchFieldException("Invalid operation enum");
+ }
+
+ }
+
+ /**
+ * Copy data that is in transaction table into music interface
+ *
+ * @param transactionDigests
+ * @throws NoSuchFieldException
+ */
+ private void updateStagingTable(StagingTable transactionDigests) throws NoSuchFieldException, MDBCServiceException {
+ String selectSql = "select ix, op, schema_name, table_name, original_data,new_data FROM " + TRANS_TBL_SCHEMA
+ + "." + TRANS_TBL + " where connection_id = '" + this.connId + "';";
+ Integer biggestIx = Integer.MIN_VALUE;
+ Integer smallestIx = Integer.MAX_VALUE;
+ try {
+ ResultSet rs = executeSQLRead(selectSql);
+ Set<Integer> rows = new TreeSet<Integer>();
+ while (rs.next()) {
+ int ix = rs.getInt("ix");
+ biggestIx = Integer.max(biggestIx, ix);
+ smallestIx = Integer.min(smallestIx, ix);
+ String op = rs.getString("op");
+ SQLOperation opType = toOpEnum(op);
+ String schema = rs.getString("schema_name");
+ String tbl = rs.getString("table_name");
+ String original = rs.getString("original_data");
+ String newData = rs.getString("new_data");
+ Range range = new Range(schema + "." + tbl);
+ transactionDigests.addOperation(range, opType, newData, original);
+ rows.add(ix);
+ }
+ rs.getStatement().close();
+ if (rows.size() > 0) {
+ logger.debug("Staging delete: Executing with vals [" + smallestIx + "," + biggestIx + "," + this.connId
+ + "]");
+ this.deleteStagingStatement.setInt(1, smallestIx);
+ this.deleteStagingStatement.setInt(2, biggestIx);
+ this.deleteStagingStatement.setString(3, this.connId);
+ this.deleteStagingStatement.execute();
+ }
+ } catch (SQLException e) {
+ logger.warn("Exception in postStatementHook: " + e);
+ e.printStackTrace();
+ }
+ }
+
+
+
+ /**
+ * Update music with data from MySQL table
+ *
+ * @param tableName - name of table to update in music
+ */
+ @Override
+ public void synchronizeData(String tableName) {}
+
+ /**
+ * Return a list of "reserved" names, that should not be used by MySQL client/MUSIC These are reserved for mdbc
+ */
+ @Override
+ public List<String> getReservedTblNames() {
+ ArrayList<String> rsvdTables = new ArrayList<String>();
+ rsvdTables.add(TRANS_TBL_SCHEMA + "." + TRANS_TBL);
+ rsvdTables.add(TRANS_TBL);
+ // Add others here as necessary
+ return rsvdTables;
+ }
+
+ @Override
+ public String getPrimaryKey(String sql, String tableName) {
+ return null;
+ }
+
+ /**
+ * Parse the transaction digest into individual events
+ *
+ * @param transaction - base 64 encoded, serialized digest
+ */
+ public void replayTransaction(StagingTable transaction, List<Range> ranges)
+ throws SQLException, MDBCServiceException {
+ boolean autocommit = jdbcConn.getAutoCommit();
+ jdbcConn.setAutoCommit(false);
+ Statement jdbcStmt = jdbcConn.createStatement();
+ final ArrayList<Operation> opList = transaction.getOperationList();
+
+ for (Operation op : opList) {
+ if (Range.overlaps(ranges, op.getTable())) {
+ try {
+ replayOperationIntoDB(jdbcStmt, op);
+ } catch (SQLException | MDBCServiceException e) {
+ // rollback transaction
+ logger.error("Unable to replay: " + op.getOperationType() + "->" + op.getVal() + "."
+ + "Rolling back the entire digest replay.");
+ jdbcConn.rollback();
+ throw e;
+ }
+ }
+ }
+
+ clearReplayedOperations(jdbcStmt);
+ jdbcConn.commit();
+ jdbcStmt.close();
+ jdbcConn.setAutoCommit(autocommit);
+ }
+
+ @Override
+ public void disableForeignKeyChecks() throws SQLException {
+ Statement disable = jdbcConn.createStatement();
+ disable.execute("SET session_replication_role = 'replica';");
+ disable.closeOnCompletion();
+ }
+
+ @Override
+ public void enableForeignKeyChecks() throws SQLException {
+ Statement enable = jdbcConn.createStatement();
+ enable.execute("SET session_replication_role = 'origin';");
+ enable.closeOnCompletion();
+ }
+
+ @Override
+ public void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException {
+ replayTransaction(txDigest, ranges);
+ }
+
+ /**
+ * Replays operation into database, usually from txDigest
+ *
+ * @param jdbcStmt: Connection used to perform the replay
+ * @param op: operation to be replayed
+ * @throws SQLException
+ * @throws MDBCServiceException
+ */
+ private void replayOperationIntoDB(Statement jdbcStmt, Operation op) throws SQLException, MDBCServiceException {
+ logger.debug("Replaying Operation: " + op.getOperationType() + "->" + op.getVal());
+ JSONObject newVal = op.getVal();
+ JSONObject oldVal = null;
+ try {
+ oldVal = op.getKey();
+ } catch (MDBCServiceException e) {
+ // Ignore exception, in postgres the structure of the operation is different
+ }
+
+
+ TableInfo ti = getTableInfo(op.getTable());
+ final List<String> keyColumns = ti.getKeyColumns();
+
+ // build and replay the queries
+ String sql = constructSQL(op, keyColumns, newVal, oldVal);
+ if (sql == null)
+ return;
+
+ try {
+ logger.debug("Replaying operation: " + sql);
+ int updated = jdbcStmt.executeUpdate(sql);
+
+ if (updated == 0) {
+ // This applies only for replaying transactions involving Eventually Consistent tables
+ logger.warn(
+ "Error Replaying operation: " + sql + "; Replacing insert/replace/viceversa and replaying ");
+
+ buildAndExecuteSQLInverse(jdbcStmt, op, keyColumns, newVal, oldVal);
+ }
+ } catch (SQLException sqlE) {
+ // This applies for replaying transactions involving Eventually Consistent tables
+ // or transactions that replay on top of existing keys
+ logger.warn(
+ "Error Replaying operation: " + sql + ";" + "Replacing insert/replace/viceversa and replaying ");
+
+ buildAndExecuteSQLInverse(jdbcStmt, op, keyColumns, newVal, oldVal);
+
+ }
+ }
+
+ protected void buildAndExecuteSQLInverse(Statement jdbcStmt, Operation op, List<String> keyColumns,
+ JSONObject newVals, JSONObject oldVals) throws SQLException, MDBCServiceException {
+ String sqlInverse = constructSQLInverse(op, keyColumns, newVals, oldVals);
+ if (sqlInverse == null)
+ return;
+ logger.debug("Replaying operation: " + sqlInverse);
+ jdbcStmt.executeUpdate(sqlInverse);
+ }
+
+ protected String constructSQLInverse(Operation op, List<String> keyColumns, JSONObject newVals, JSONObject oldVals)
+ throws MDBCServiceException {
+ String sqlInverse = null;
+ switch (op.getOperationType()) {
+ case INSERT:
+ sqlInverse = constructUpdate(op.getTable(), keyColumns, newVals, oldVals);
+ break;
+ case UPDATE:
+ sqlInverse = constructInsert(op.getTable(), newVals);
+ break;
+ default:
+ break;
+ }
+ return sqlInverse;
+ }
+
+ protected String constructSQL(Operation op, List<String> keyColumns, JSONObject newVals, JSONObject oldVals)
+ throws MDBCServiceException {
+ String sql = null;
+ switch (op.getOperationType()) {
+ case INSERT:
+ sql = constructInsert(op.getTable(), newVals);
+ break;
+ case UPDATE:
+ sql = constructUpdate(op.getTable(), keyColumns, newVals, oldVals);
+ break;
+ case DELETE:
+ sql = constructDelete(op.getTable(), keyColumns, oldVals);
+ break;
+ case SELECT:
+ // no update happened, do nothing
+ break;
+ default:
+ logger.error(op.getOperationType() + "not implemented for replay");
+ }
+ return sql;
+ }
+
+ private String constructDelete(String tableName, List<String> keyColumns, JSONObject oldVals)
+ throws MDBCServiceException {
+ if (oldVals == null) {
+ throw new MDBCServiceException("Trying to update row with an empty old val exception");
+ }
+ StringBuilder sql = new StringBuilder();
+ sql.append("DELETE FROM ");
+ sql.append(tableName + " WHERE ");
+ sql.append(getPrimaryKeyConditional(keyColumns, oldVals));
+ sql.append(";");
+ return sql.toString();
+ }
+
+ private String constructInsert(String tableName, JSONObject newVals) {
+ StringBuilder keys = new StringBuilder();
+ StringBuilder vals = new StringBuilder();
+ String sep = "";
+ for (String col : newVals.keySet()) {
+ keys.append(sep + col);
+ vals.append(sep + "'" + newVals.get(col) + "'");
+ sep = ", ";
+ }
+ StringBuilder sql = new StringBuilder();
+ sql.append("INSERT INTO ").append(tableName + " (").append(keys).append(") VALUES (").append(vals).append(");");
+ return sql.toString();
+ }
+
+ private String constructUpdate(String tableName, List<String> keyColumns, JSONObject newVals, JSONObject oldVals)
+ throws MDBCServiceException {
+ if (oldVals == null) {
+ throw new MDBCServiceException("Trying to update row with an empty old val exception");
+ }
+ StringBuilder sql = new StringBuilder();
+ sql.append("UPDATE ").append(tableName).append(" SET ");
+ String sep = "";
+ for (String key : newVals.keySet()) {
+ sql.append(sep).append(key).append("=\"").append(newVals.get(key)).append("\"");
+ sep = ", ";
+ }
+ sql.append(" WHERE ");
+ sql.append(getPrimaryKeyConditional(keyColumns, oldVals));
+ sql.append(";");
+ return sql.toString();
+ }
+
+ /**
+ * Create an SQL string for AND'ing all of the primary keys
+ *
+ * @param keyColumns list with the name of the columns that are key
+ * @param vals json with the contents of the old row
+ * @return string in the form of PK1=Val1 AND PK2=Val2 AND PK3=Val3
+ */
+ private String getPrimaryKeyConditional(List<String> keyColumns, JSONObject vals) {
+ StringBuilder keyCondStmt = new StringBuilder();
+ String and = "";
+ for (String key : keyColumns) {
+ // We cannot use the default primary key for the sql table and operations
+ if (!key.equals(mi.getMusicDefaultPrimaryKeyName())) {
+ Object val = vals.get(key);
+ keyCondStmt.append(and + key + "=\"" + val + "\"");
+ and = " AND ";
+ }
+ }
+ return keyCondStmt.toString();
+ }
+
+ /**
+ * Cleans out the transaction table, removing the replayed operations
+ *
+ * @param jdbcStmt
+ * @throws SQLException
+ */
+ private void clearReplayedOperations(Statement jdbcStmt) throws SQLException {
+ logger.info("Clearing replayed operations");
+ String sql =
+ "DELETE FROM " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " WHERE CONNECTION_ID = '" + this.connId + "';";
+ jdbcStmt.executeUpdate(sql);
+ }
+
+ @Override
+ public Connection getSQLConnection() {
+ return jdbcConn;
+ }
+
+ @Override
+ public Set<String> getSQLTableSet() {
+ Set<String> set = new TreeSet<String>();
+ String sql =
+ "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=CURRENT_SCHEMA() AND TABLE_TYPE='BASE TABLE'";
+ try {
+ Statement stmt = jdbcConn.createStatement();
+ ResultSet rs = stmt.executeQuery(sql);
+ while (rs.next()) {
+ String s = rs.getString("TABLE_NAME");
+ set.add(s);
+ }
+ stmt.close();
+ } catch (SQLException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "getSQLTableSet: " + e);
+ System.out.println("getSQLTableSet: " + e);
+ e.printStackTrace();
+ }
+ logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set);
+ System.out.println("getSQLTableSet returning: " + set);
+ return set;
+ }
+
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java
index 86088f9..171ad79 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java
@@ -190,28 +190,6 @@ public class Utils {
}
return list;
}
-
- public static void registerDefaultDrivers() {
- Properties pr = null;
- try {
- pr = new Properties();
- pr.load(Utils.class.getResourceAsStream("/mdbc.properties"));
- }
- catch (IOException e) {
- logger.error("Could not load property file > " + e.getMessage());
- }
-
- @SuppressWarnings("unused")
- List<Class<?>> list = new ArrayList<Class<?>>();
- String drivers = pr.getProperty("DEFAULT_DRIVERS");
- for (String driver: drivers.split("[ ,]")) {
- logger.info(EELFLoggerDelegate.applicationLogger, "Registering jdbc driver '" + driver + "'");
- try {
- @SuppressWarnings("unused")
- Class<?> cl = Class.forName(driver.trim());
- } catch (ClassNotFoundException e) {
- logger.error(EELFLoggerDelegate.errorLogger,"Driver class "+driver+" not found.");
- }
- }
- }
+
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
index 4f3a3bf..5c6fae4 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
@@ -89,9 +89,10 @@ public class MusicTxDigestDaemon implements Runnable {
logger.error("Music interface or DB interface is null in background daemon");
return;
}
+ MdbcConnection conn = null;
while (true) {
try {
- MdbcConnection conn = (MdbcConnection) stateManager.getConnection("daemon");
+ conn = (MdbcConnection) stateManager.getConnection("daemon");
if (conn == null) {
logger.error("Connection created is null in background daemon");
return;
@@ -111,18 +112,20 @@ public class MusicTxDigestDaemon implements Runnable {
}
//2) for each partition I don't own
final Set<Range> warmupRanges = stateManager.getRangesToWarmup();
- final List<DatabasePartition> currentPartitions = stateManager.getPartitions();
- List<Range> missingRanges = new ArrayList<>();
- if (currentPartitions.size() != 0) {
- for (DatabasePartition part : currentPartitions) {
- List<Range> partitionRanges = part.getSnapshot();
- warmupRanges.removeAll(partitionRanges);
- }
- try {
- stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
- } catch (MDBCServiceException e) {
- logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
- continue;
+ if (warmupRanges!=null) {
+ final List<DatabasePartition> currentPartitions = stateManager.getPartitions();
+ List<Range> missingRanges = new ArrayList<>();
+ if (currentPartitions.size() != 0) {
+ for (DatabasePartition part : currentPartitions) {
+ List<Range> partitionRanges = part.getSnapshot();
+ warmupRanges.removeAll(partitionRanges);
+ }
+ try {
+ stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
+ } catch (MDBCServiceException e) {
+ logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
+ continue;
+ }
}
}
@@ -138,6 +141,12 @@ public class MusicTxDigestDaemon implements Runnable {
} catch (InterruptedException | SQLException e) {
logger.error("MusicTxDigest background daemon stopped " + e.getMessage(), e);
Thread.currentThread().interrupt();
+ } finally {
+ try {
+ if (conn!=null && !conn.isClosed()) conn.close();
+ } catch (SQLException e) {
+ logger.error("MusicTxDigest background daemon error closing" + e.getMessage(), e);
+ }
}
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
index 8fa49a9..db9e455 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
@@ -22,10 +22,18 @@ package org.onap.music.mdbc.tables;
import java.util.UUID;
public final class MusicTxDigestId {
+ public final UUID mriId;
public final UUID transactionId;
public final int index;
+ public MusicTxDigestId(UUID mriRowId, UUID digestId, int index) {
+ this.mriId=mriRowId;
+ this.transactionId= digestId;
+ this.index=index;
+ }
+
public MusicTxDigestId(UUID digestId, int index) {
+ this.mriId = null;
this.transactionId= digestId;
this.index=index;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
index d406ad3..3258d0f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
@@ -66,7 +66,7 @@ public final class Operation implements Serializable{
}
public JSONObject getKey() throws MDBCServiceException {
- if(KEY==null){
+ if(KEY==null||KEY.isEmpty()){
throw new MDBCServiceException("This operation ["+TYPE.toString()+"] doesn't contain a key");
}
JSONObject keys = new JSONObject(new JSONTokener(KEY));
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
index 4fdd7ac..4ef9d30 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
@@ -186,7 +186,10 @@ public class StagingTable {
}
OpType newType = (type==SQLOperation.INSERT)?OpType.INSERT:(type==SQLOperation.DELETE)?
OpType.DELETE:OpType.UPDATE;
- Row.Builder rowBuilder = Row.newBuilder().setTable(range.getTable()).setType(newType).setVal(newVal);
+ Row.Builder rowBuilder = Row.newBuilder().setTable(range.getTable()).setType(newType);
+ if(newVal!=null) {
+ rowBuilder.setVal(newVal);
+ }
if(keys!=null){
rowBuilder.setKey(keys);
}
@@ -252,10 +255,10 @@ public class StagingTable {
}
synchronized public boolean isEventualEmpty() {
- return (eventuallyBuilder.getRowsCount()==0);
+ return (eventuallyBuilder!=null) && (eventuallyBuilder.getRowsCount()==0);
}
-
- synchronized public void clear() throws MDBCServiceException {
+
+ synchronized public void clear() throws MDBCServiceException {
if(!builderInitialized){
throw new MDBCServiceException("This type of staging table is unmutable, please use the constructor"
+ "with no parameters");
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java
index 7624201..f6239d1 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java
@@ -24,6 +24,8 @@ import org.onap.music.mdbc.configurations.NodeConfiguration;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.UUID;
public class CreatePartition {
@@ -51,7 +53,9 @@ public class CreatePartition {
}
public void convert(){
- config = new NodeConfiguration(tables, eventual,UUID.fromString(mriIndex),"test","");
+ String[] tablesArray=eventual.split(",");
+ ArrayList<String> eventualTables = (new ArrayList<>(Arrays.asList(tablesArray)));
+ config = new NodeConfiguration(tables, eventualTables,UUID.fromString(mriIndex),"test","");
}
public void saveToFile(){