diff options
author | Brendan Tschaen <ctschaen@att.com> | 2019-05-15 15:08:05 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-05-15 15:08:05 +0000 |
commit | 96933528fdd51e606e48d79941ce43249ffb48b6 (patch) | |
tree | efec05eb72232c030e442eb12afa2260d6b6bd67 | |
parent | 8a3db86e697de59a29e712d23e771c8718f9778a (diff) | |
parent | 52159a510dc31ee1dacf612a1e05cc8612a117e5 (diff) |
Merge "Implement postgres, fixes to eventual, and many bug fixes"
34 files changed, 2320 insertions, 431 deletions
diff --git a/mdbc-server/pom.xml b/mdbc-server/pom.xml index cdf7cff..7a46120 100755 --- a/mdbc-server/pom.xml +++ b/mdbc-server/pom.xml @@ -89,9 +89,9 @@ <version>20160810</version> </dependency> <dependency> - <groupId>org.mariadb.jdbc</groupId> - <artifactId>mariadb-java-client</artifactId> - <version>2.3.0</version> + <groupId>org.mariadb.jdbc</groupId> + <artifactId>mariadb-java-client</artifactId> + <version>2.3.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --> <dependency> @@ -200,6 +200,19 @@ <artifactId>protobuf-java</artifactId> <version>3.6.1</version> </dependency> + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + <version>42.2.5</version> + </dependency> + <dependency> + <groupId>com.opentable.components</groupId> + <artifactId>otj-pg-embedded</artifactId> + <version>0.13.1</version> + <scope>test</scope> + </dependency> + + </dependencies> <build> diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java index ced5745..91b13f3 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java @@ -28,6 +28,8 @@ public class Configuration { public static final String KEY_DB_MIXIN_NAME = "MDBC_DB_MIXIN"; /** The property name to use to select the MUSIC 'mixin'. */ public static final String KEY_MUSIC_MIXIN_NAME = "MDBC_MUSIC_MIXIN"; + /** The property name to select if async staging table update is used */ + public static final String KEY_ASYNC_STAGING_TABLE_UPDATE = "ASYNC_STAGING_TABLE_UPDATE"; /** The name of the default mixin to use for the DBInterface. */ public static final String DB_MIXIN_DEFAULT = "mysql";//"h2"; /** The name of the default mixin to use for the MusicInterface. */ @@ -44,4 +46,6 @@ public class Configuration { public static final long DEFAULT_OWNERSHIP_TIMEOUT = 5*60*60*1000;//default of 5 hours /** The property name to provide comma separated list of ranges to warmup */ public static final String KEY_WARMUPRANGES = "warmupranges"; + /** Default async staging table update o ption*/ + public static final String ASYNC_STAGING_TABLE_UPDATE = "false"; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java index 32d456e..ae2b869 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java @@ -98,10 +98,25 @@ public class MDBCUtils { } public static List<Range> getTables(Map<String,List<SQLOperation>> queryParsed){ + return getTables(null, queryParsed); + } + + public static List<Range> getTables(String defaultDatabaseName, Map<String,List<SQLOperation>> queryParsed){ List<Range> ranges = new ArrayList<>(); for(String table: queryParsed.keySet()){ - ranges.add(new Range(table)); - } + String[] parts = table.split("\\."); + if(parts.length==2){ + ranges.add(new Range(table)); + } + else if(parts.length==1 && defaultDatabaseName!=null){ + ranges.add(new Range(defaultDatabaseName+"."+table)); + } + else{ + throw new IllegalArgumentException("Table should either have only one '.' or none at all, the table " + + "received is "+table); + } + + } return ranges; } }
\ No newline at end of file diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java index cb2df7f..b4d7bb9 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java @@ -160,18 +160,7 @@ public class MdbcConnection implements Connection { if(progressKeeper!=null) progressKeeper.commitRequested(id); logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b); if (b) { - // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction - if(id == null || id.isEmpty()) { - logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); - throw new SQLException("tx id is null"); - } - try { - mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper); - } catch (MDBCServiceException e) { - // TODO Auto-generated catch block - logger.error("Cannot commit log to music" + e.getStackTrace()); - throw new SQLException(e.getMessage(), e); - } + musicCommit(); } if(progressKeeper!=null) { progressKeeper.setMusicDone(id); @@ -191,13 +180,7 @@ public class MdbcConnection implements Connection { return jdbcConn.getAutoCommit(); } - /** - * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed, - * they are performed now and copied into MUSIC. - * @throws SQLException - */ - @Override - public void commit() throws SQLException { + private void musicCommit() throws SQLException { if(progressKeeper.isComplete(id)) { return; } @@ -205,6 +188,7 @@ public class MdbcConnection implements Connection { progressKeeper.commitRequested(id); } + dbi.preCommitHook(); try { logger.debug(EELFLoggerDelegate.applicationLogger, " commit "); // transaction was committed -- add all the updates into the REDO-Log in MUSIC @@ -214,6 +198,16 @@ public class MdbcConnection implements Connection { logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL); throw new SQLException("Failure commiting to MUSIC", e); } + } + + /** + * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed, + * they are performed now and copied into MUSIC. + * @throws SQLException + */ + @Override + public void commit() throws SQLException { + musicCommit(); if(progressKeeper != null) { progressKeeper.setMusicDone(id); @@ -273,9 +267,10 @@ public class MdbcConnection implements Connection { } catch (MDBCServiceException e) { throw new SQLException("Failure during relinquish of partition",e); } - // Warning! Make sure this call remains AFTER the call to jdbcConn.close(), - // otherwise you're going to get stuck in an infinite loop. - statemanager.closeConnection(id); + + // Warning! Make sure this call remains AFTER the call to jdbcConn.close(), + // otherwise you're going to get stuck in an infinite loop. + statemanager.closeConnection(id); } @Override @@ -507,7 +502,8 @@ public class MdbcConnection implements Connection { //Parse tables from the sql query Map<String, List<SQLOperation>> tableToInstruction = QueryProcessor.parseSqlQuery(sql, table_set); //Check ownership of keys - List<Range> queryTables = MDBCUtils.getTables(tableToInstruction); + String defaultSchema = dbi.getSchema(); + List<Range> queryTables = MDBCUtils.getTables(defaultSchema, tableToInstruction); if (this.partition!=null) { List<Range> snapshot = this.partition.getSnapshot(); if(snapshot!=null){ @@ -550,18 +546,15 @@ public class MdbcConnection implements Connection { logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1); for (String tableName : set1) { // This map will be filled in if this table was previously discovered - tableName = tableName.toUpperCase(); - if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) { + if (!table_set.contains(tableName.toUpperCase()) && !dbi.getReservedTblNames().contains(tableName.toUpperCase())) { logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName); try { TableInfo ti = dbi.getTableInfo(tableName); - mi.initializeMusicForTable(ti,tableName); - //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted - ti = dbi.getTableInfo(tableName); - mi.createDirtyRowTable(ti,tableName); + //mi.initializeMusicForTable(ti,tableName); + //mi.createDirtyRowTable(ti,tableName); dbi.createSQLTriggers(tableName); - table_set.add(tableName); - dbi.synchronizeData(tableName); + table_set.add(tableName.toUpperCase()); + //dbi.synchronizeData(tableName); logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" + table_set.size() + "/" + set1.size() + "tables uploaded"); } catch (Exception e) { diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java index 42b9710..500ed81 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java @@ -80,6 +80,15 @@ public class MdbcServer { Properties connectionProps = new Properties(); connectionProps.put("user", user); connectionProps.put("password", password); + String defaultMusicMixin = Utils.getDefaultMusicMixin(); + if(defaultMusicMixin!=null){ + connectionProps.put(Configuration.KEY_MUSIC_MIXIN_NAME,defaultMusicMixin); + } + String defaultDBMixin = Utils.getDefaultDBMixin(); + if(defaultMusicMixin!=null){ + connectionProps.put(Configuration.KEY_DB_MIXIN_NAME,defaultDBMixin); + } + Utils.registerDefaultDrivers(); meta = new MdbcServerLogic(url,connectionProps,config); LocalService service = new LocalService(meta); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java index 16e7170..41aed26 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java @@ -23,6 +23,9 @@ import java.io.Serializable; import java.util.List; import java.util.Objects; +import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.mdbc.mixins.MusicMixin; + /** * This class represent a range of the whole database @@ -32,13 +35,21 @@ import java.util.Objects; */ public class Range implements Cloneable{ + private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Range.class); + private String table; public Range(String table) { - this.table = table.toUpperCase(); + final String[] split = table.split("\\."); + if(split.length!=2){ + logger.debug("Table should contain schema, received in constructor: " + table); +// throw new IllegalArgumentException("Table should always contain the schema, table received in " +// + "constructor is "+table); + } + this.table = table; } - public String toString(){return table.toUpperCase();} + public String toString(){return table;} /** * Compares to Range types @@ -55,7 +66,7 @@ public class Range implements Cloneable{ @Override public int hashCode(){ - return table.hashCode(); + return table.toUpperCase().hashCode(); } @Override @@ -74,11 +85,11 @@ public class Range implements Cloneable{ public static boolean overlaps(List<Range> ranges, String table){ //\TODO check if parallel stream makes sense here - return ranges.stream().map((Range r) -> r.table.equals(table)).anyMatch((Boolean b) -> b); + return ranges.stream().map((Range r) -> r.table.toUpperCase().equals(table.toUpperCase())).anyMatch((Boolean b) -> b); } public boolean overlaps(Range other) { - return table.equals(other.table); + return table.toUpperCase().equals(other.table.toUpperCase()); } public String getTable() { diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index e4c4a24..18bc0db 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -21,7 +21,6 @@ package org.onap.music.mdbc; import java.util.ArrayList; import java.util.List; -import java.util.Set; import org.apache.commons.lang3.tuple.Pair; import org.onap.music.exceptions.MDBCServiceException; @@ -47,6 +46,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -102,10 +102,10 @@ public class StateManager { public StateManager() { } - public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException { + public StateManager(String sqlDBUrl, Properties newInfo, String mdbcServerName, String sqlDBName) throws MDBCServiceException { this.sqlDBName = sqlDBName; - this.sqlDBUrl = sqlDBUrl; - this.info = info; + this.sqlDBUrl = cleanSqlUrl(sqlDBUrl); + this.info = new Properties(); this.mdbcServerName = mdbcServerName; this.connectionRanges = new ConcurrentHashMap<>(); @@ -117,6 +117,7 @@ public class StateManager { } catch (IOException e) { logger.error(EELFLoggerDelegate.errorLogger, e.getMessage()); } + info.putAll(newInfo); cassandraUrl = info.getProperty(Configuration.KEY_CASSANDRA_URL, Configuration.CASSANDRA_URL_DEFAULT); musicmixin = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT); @@ -129,6 +130,16 @@ public class StateManager { ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout); } + protected String cleanSqlUrl(String url){ + if(url!=null) { + url = url.trim(); + if (url.length() > 0 && url.charAt(url.length() - 1) == '/') { + url= url.substring(0, url.length() - 1); + } + } + return url; + } + protected void initTxDaemonThread(){ txDaemon = new Thread( new MusicTxDigestDaemon(Integer.parseInt( @@ -149,27 +160,21 @@ public class StateManager { } protected void initSqlDatabase() throws MDBCServiceException { - try { - //\TODO: pass the driver as a variable - Class.forName("org.mariadb.jdbc.Driver"); - } - catch (ClassNotFoundException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, - ErrorTypes.GENERALSERVICEERROR); - return; - } - try { - Connection sqlConnection = DriverManager.getConnection(this.sqlDBUrl, this.info); - StringBuilder sql = new StringBuilder("CREATE DATABASE IF NOT EXISTS ") + if(!this.sqlDBUrl.toLowerCase().startsWith("jdbc:postgresql")) { + try { + Connection sqlConnection = DriverManager.getConnection(this.sqlDBUrl, this.info); + StringBuilder sql = new StringBuilder("CREATE DATABASE IF NOT EXISTS ") .append(sqlDBName) .append(";"); - Statement stmt = sqlConnection.createStatement(); - stmt.execute(sql.toString()); - sqlConnection.close(); - } catch (SQLException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, + Statement stmt = sqlConnection.createStatement(); + stmt.execute(sql.toString()); + sqlConnection.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.UNKNOWNERROR, + ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR); - throw new MDBCServiceException(e.getMessage(), e); + throw new MDBCServiceException(e.getMessage(), e); + } } } @@ -306,17 +311,9 @@ public class StateManager { public Connection openConnection(String id) { Connection sqlConnection; MdbcConnection newConnection; - try { - //TODO: pass the driver as a variable - Class.forName("org.mariadb.jdbc.Driver"); - } - catch (ClassNotFoundException e) { - // TODO Auto-generated catch block - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, - ErrorTypes.QUERYERROR); - } - + Utils.registerDefaultDrivers(); //Create connection to local SQL DB + try { sqlConnection = DriverManager.getConnection(this.sqlDBUrl+"/"+this.sqlDBName, this.info); } catch (SQLException e) { diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java index e5a3252..e5a3252 100755 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java new file mode 100644 index 0000000..7a09dca --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java @@ -0,0 +1,80 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ + +package org.onap.music.mdbc; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import org.onap.music.logging.EELFLoggerDelegate; + +public class Utils { + + + public static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Utils.class); + + static Properties retrieveMdbcProperties() { + Properties pr = null; + try { + pr = new Properties(); + pr.load(Utils.class.getResourceAsStream("/mdbc.properties")); + } catch (IOException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Could not load property file: " + e.getMessage()); + } + return pr; + } + + public static String getDefaultMusicMixin() { + Properties pr = retrieveMdbcProperties(); + if (pr == null) + return null; + String defaultMusicMixin = pr.getProperty("DEFAULT_MUSIC_MIXIN"); + return defaultMusicMixin; + } + + public static String getDefaultDBMixin() { + Properties pr = retrieveMdbcProperties(); + if (pr == null) + return null; + String defaultMusicMixin = pr.getProperty("DEFAULT_DB_MIXIN"); + return defaultMusicMixin; + } + + public static void registerDefaultDrivers() { + Properties pr = null; + try { + pr = new Properties(); + pr.load(Utils.class.getResourceAsStream("/mdbc.properties")); + } catch (IOException e) { + logger.error("Could not load property file > " + e.getMessage()); + } + + String drivers = pr.getProperty("DEFAULT_DRIVERS"); + for (String driver : drivers.split("[ ,]")) { + logger.info(EELFLoggerDelegate.applicationLogger, "Registering jdbc driver '" + driver + "'"); + try { + Class.forName(driver.trim()); + } catch (ClassNotFoundException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Driver class " + driver + " not found."); + } + } + } +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java index 38309d5..391ee1a 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java @@ -41,7 +41,7 @@ public class NodeConfiguration { public String nodeName; public String sqlDatabaseName; - public NodeConfiguration(String tables, String eventualTables, UUID mriIndex, String sqlDatabaseName, String node){ + public NodeConfiguration(String tables, List<String> eventualTables, UUID mriIndex, String sqlDatabaseName, String node){ // public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String mriTable, String lockId, String musicTxDigestTable) { partition = new DatabasePartition(toRanges(tables), mriIndex, null) ; eventual = new Eventual(toRanges(eventualTables)); @@ -49,16 +49,20 @@ public class NodeConfiguration { this.sqlDatabaseName = sqlDatabaseName; } + protected List<Range> toRanges(List<String> tables){ + List<Range> newRange = new ArrayList<>(); + for(String table: tables) { + newRange.add(new Range(table)); + } + return newRange; + } + protected List<Range> toRanges(String tables){ if(tables.isEmpty()){ return new ArrayList<>(); } - List<Range> newRange = new ArrayList<>(); String[] tablesArray=tables.split(","); - for(String table: tablesArray) { - newRange.add(new Range(table)); - } - return newRange; + return toRanges(new ArrayList<>(Arrays.asList(tablesArray))); } public String toJson() { diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java index 343a8b8..0598271 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java @@ -26,6 +26,7 @@ import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.mixins.MusicMixin; import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.mdbc.tables.MusicTxDigestDaemon; import com.google.gson.Gson; import org.onap.music.datastore.PreparedQueryObject; @@ -46,6 +47,7 @@ public class TablesConfiguration { private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TablesConfiguration.class); private List<PartitionInformation> partitions; + private List<String> eventual; String tableToPartitionName; private String musicNamespace; private String partitionInformationTableName; @@ -83,16 +85,17 @@ public class TablesConfiguration { logger.info("Creating empty row with id "+partitionId); MusicMixin.createEmptyMriRow(musicNamespace,partitionInfo.mriTableName,UUID.fromString(partitionId), - partitionInfo.owner,null,partitionInfo.getTables(),true); + partitionInfo.owner,null, + partitionInfo.getTables(),true); //3) Create config for this node StringBuilder newStr = new StringBuilder(); for(Range r: partitionInfo.tables){ - newStr.append(r.toString().toUpperCase()).append(","); + newStr.append(r.toString()).append(","); } logger.info("Appending row to configuration "+partitionId); - nodeConfigs.add(new NodeConfiguration(newStr.toString(),"",UUID.fromString(partitionId), + nodeConfigs.add(new NodeConfiguration(newStr.toString(),eventual,UUID.fromString(partitionId), sqlDatabaseName, partitionInfo.owner)); } return nodeConfigs; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json new file mode 100644 index 0000000..430525f --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json @@ -0,0 +1,11 @@ +{ + "internalNamespace": "music_internal", + "internalReplicationFactor": 1, + "musicNamespace": "namespace", + "musicReplicationFactor": 1, + "mriTableName": "musicrangeinformation", + "mtxdTableName": "musictxdigest", + "eventualMtxdTableName":"musicevetxdigest", + "nodeInfoTableName":"nodeinfo", + "rangeDependencyTableName":"musicrangedependency" +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json index 8cbbfec..559d597 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json @@ -3,21 +3,19 @@ { "tables": [ { - "table": "PERSONS" + "table": "public.PERSONS" } ], "owner": "", "mriTableName": "musicrangeinformation", - "mtxdTableName": "musictxdigest", "partitionId": "" } ], - "internalNamespace": "music_internal", - "internalReplicationFactor": 1, + "eventual": ["public.eventualPERSONS"], "musicNamespace": "namespace", - "musicReplicationFactor": 1, "tableToPartitionName": "tabletopartition", "partitionInformationTableName": "partitioninfo", "redoHistoryTableName": "redohistory", "sqlDatabaseName": "test" } + diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java new file mode 100644 index 0000000..543cbd0 --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java @@ -0,0 +1,100 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ + +package org.onap.music.mdbc.mixins; + +import java.util.concurrent.atomic.AtomicBoolean; +import javax.websocket.RemoteEndpoint.Async; +import org.onap.music.logging.EELFLoggerDelegate; + +public class AsyncUpdateHandler { + + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(AsyncUpdateHandler.class); + + Object finishMonitor; + Object taskMonitor; + AtomicBoolean pendingUpdate; + Runnable handler; + + AsyncUpdateHandler(Runnable handlerToRun){ + handler=handlerToRun; + finishMonitor=new Object(); + taskMonitor = new Object(); + pendingUpdate=new AtomicBoolean(false); + createUpdateExecutor(); + } + + void createUpdateExecutor(){ + Runnable newRunnable = ()-> UpdateExecutor(); + Thread newThread = new Thread(newRunnable); + newThread.setDaemon(true); + newThread.start(); + } + + public void processNewUpdate(){ + pendingUpdate.set(true); + synchronized (taskMonitor) { + taskMonitor.notifyAll(); + } + } + + void UpdateExecutor(){ + while(true) { + synchronized (taskMonitor) { + try { + if(!pendingUpdate.get()){ + taskMonitor.wait(); + } + } catch (InterruptedException e) { + logger.error("Update Executor received an interrup exception"); + } + } + startUpdate(); + } + } + + void startUpdate(){ + synchronized (finishMonitor) { + //Keep running until there are no more requests + while (pendingUpdate.getAndSet(false)) { + if(handler!=null){ + handler.run(); + } + } + finishMonitor.notifyAll(); + } + + } + + public void waitForAllPendingUpdates(){ + //Wait until there are no more requests and the thread is not longer running + //We could use a join, but given that the thread could be temporally null, then it is easier to + //separate the wait from the thread that is running + synchronized(finishMonitor){ + while(pendingUpdate.get()) { + try { + finishMonitor.wait(); + } catch (InterruptedException e) { + logger.error("waitForAllPendingUpdate received exception"); + } + } + } + } +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java index a594918..dd71d97 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; + import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; @@ -111,6 +112,13 @@ public interface DBInterface { * @return a ResultSet containing the rows returned from the query */ ResultSet executeSQLRead(String sql); + + /** + * This method is used to verify that all data structures needed for commit are ready for commit + * Either for the MusicMixin (e.g. staging table) or internally for the DBMixin + * Important this is not for actual commit operation, it should be treated as a signal. + */ + void preCommitHook(); void synchronizeData(String tableName); @@ -133,4 +141,6 @@ public interface DBInterface { void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException; Connection getSQLConnection(); + + String getSchema(); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java index d822615..324b0f6 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java @@ -61,12 +61,13 @@ public class MixinFactory { con = cl.getConstructor(MusicInterface.class, String.class, Connection.class, Properties.class); if (con != null) { logger.info(EELFLoggerDelegate.applicationLogger,"Found match: "+miname); - return (DBInterface) con.newInstance(mi, url, conn, info); + DBInterface newdbi = (DBInterface) con.newInstance(mi, url, conn, info); + return newdbi; } } } } catch (Exception e) { - logger.error(EELFLoggerDelegate.errorLogger,"createDBInterface: "+e); + logger.error(EELFLoggerDelegate.errorLogger,"createDBInterface error for "+cl.getName(),e); } } return null; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java index 71f1b8b..fc8897f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java @@ -303,8 +303,8 @@ public interface MusicInterface { void deleteOldMriRows(Map<UUID,String> oldRowsAndLocks) throws MDBCServiceException; List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException; - + void deleteMriRow(MusicRangeInformationRow row) throws MDBCServiceException; void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index cb37a55..5a322f3 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -61,7 +61,6 @@ import org.onap.music.mdbc.StateManager; import org.onap.music.mdbc.TableInfo; import org.onap.music.mdbc.ownership.Dag; import org.onap.music.mdbc.ownership.DagNode; -import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint; import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.MusicTxDigestId; @@ -99,6 +98,8 @@ public class MusicMixin implements MusicInterface { public static final String KEY_MUSIC_RFACTOR = "music_rfactor"; /** The property name to use to provide the replication factor for Cassandra. */ public static final String KEY_MUSIC_NAMESPACE = "music_namespace"; + /** The property name to use to provide a timeout to mdbc (ownership) */ + public static final String KEY_TIMEOUT = "mdbc_timeout"; /** The property name to use to provide a flag indicating if compression is required */ public static final String KEY_COMPRESSION = "mdbc_compression"; /** Namespace for the tables in MUSIC (Cassandra) */ @@ -106,13 +107,15 @@ public class MusicMixin implements MusicInterface { /** The default property value to use for the Cassandra IP address. */ public static final String DEFAULT_MUSIC_ADDRESS = "localhost"; /** The default property value to use for the Cassandra replication factor. */ - public static final int DEFAULT_MUSIC_RFACTOR = 1; + public static final int DEFAULT_MUSIC_RFACTOR = 3; + /** The default property value to use for the MDBC timeout */ + public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours /** The default primary string column, if none is provided. */ public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid"; /** Type of the primary key, if none is defined by the user */ public static final String MDBC_PRIMARYKEY_TYPE = "uuid"; public static final boolean DEFAULT_COMPRESSION = true; - + //TODO: Control network topology strategy with a configuration file entry public static final boolean ENABLE_NETWORK_TOPOLOGY_STRATEGY = false; //\TODO Add logic to change the names when required and create the tables when necessary @@ -254,6 +257,7 @@ public class MusicMixin implements MusicInterface { public static void createKeyspace(String keyspace, int replicationFactor) throws MDBCServiceException { Map<String,Object> replicationInfo = new HashMap<>(); replicationInfo.put("'class'", "'NetworkTopologyStrategy'"); + if (ENABLE_NETWORK_TOPOLOGY_STRATEGY && replicationFactor==3) { replicationInfo.put("'dc1'", 1); replicationInfo.put("'dc2'", 1); @@ -673,7 +677,7 @@ public class MusicMixin implements MusicInterface { } if(rs!=null) { for (Row row : rs) { - set.add(row.getString("TABLE_NAME").toUpperCase()); + set.add(row.getString("TABLE_NAME")); } } return set; @@ -1295,15 +1299,13 @@ public class MusicMixin implements MusicInterface { logger.warn("Trying tcommit log with null partition"); return; } + List<Range> snapshot = partition.getSnapshot(); if(snapshot==null || snapshot.isEmpty()){ logger.warn("Trying to commit log with empty ranges"); return; } - // first deal with commit for eventually consistent tables - filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper); - UUID mriIndex = partition.getMRIIndex(); String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex; //0. See if reference to lock was already created @@ -1475,7 +1477,7 @@ public class MusicMixin implements MusicInterface { for(TupleValue t: log){ //final String tableName = t.getString(0); final UUID id = t.getUUID(1); - digestIds.add(new MusicTxDigestId(id,index++)); + digestIds.add(new MusicTxDigestId(partitionIndex,id,index++)); } List<Range> partitions = new ArrayList<>(); Set<String> tables = newRow.getSet("keys",String.class); @@ -1526,11 +1528,14 @@ public class MusicMixin implements MusicInterface { pQueryObject.appendQueryString(cql); pQueryObject.addValue(baseRange.getTable()); Row newRow; + //TODO Change this when music fix the "." problem in the primary key + final String table = baseRange.getTable(); + final String tableWithoutDot = table.replaceAll("\\.",""); try { - newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,baseRange.getTable(),null); + newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,tableWithoutDot,null); } catch (MDBCServiceException e) { - logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName); - throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e); + logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName+" trying for table "+tableWithoutDot); + throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information for table "+tableWithoutDot, e); } return getRangeDependenciesFromCassandraRow(newRow); } @@ -1838,7 +1843,6 @@ public class MusicMixin implements MusicInterface { PreparedQueryObject query = new PreparedQueryObject(); int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR); - String cql = String.format("INSERT INTO %s.%s (txid,transactiondigest,compressed,year,txTimeId ) VALUES (?,?,?,?,now());",this.music_ns, this.musicEventualTxDigestTableName); query.appendQueryString(cql); @@ -1856,6 +1860,7 @@ public class MusicMixin implements MusicInterface { } } + @Override public StagingTable getTxDigest(MusicTxDigestId id) throws MDBCServiceException { String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName); @@ -1883,8 +1888,7 @@ public class MusicMixin implements MusicInterface { } return changes; } - - + @Override public LinkedHashMap<UUID, StagingTable> getEveTxDigest(String nodeName) throws MDBCServiceException { int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR); @@ -1901,7 +1905,7 @@ public class MusicMixin implements MusicInterface { LinkedHashMap<UUID, StagingTable> ecDigestInformation = new LinkedHashMap<>(); UUID musicevetxdigestNodeinfoTimeID = getTxTimeIdFromNodeInfo(nodeName); PreparedQueryObject pQueryObject = new PreparedQueryObject(); - + if (musicevetxdigestNodeinfoTimeID != null) { // this will fetch only few records based on the time-stamp condition. cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) AND txtimeid > ? LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString()); @@ -1912,7 +1916,7 @@ public class MusicMixin implements MusicInterface { cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString()); pQueryObject.appendQueryString(cql); } - + // I need to get a ResultSet of all the records and give each row to the below HashMap. ResultSet rs = executeMusicRead(pQueryObject); while (!rs.isExhausted()) { @@ -1920,8 +1924,8 @@ public class MusicMixin implements MusicInterface { ByteBuffer digest = row.getBytes("transactiondigest"); Boolean compressed = row.getBool("compressed"); //String txTimeId = row.getString("txtimeid"); //??? - UUID txTimeId = row.getUUID("txtimeid"); - + UUID txTimeId = row.getUUID("txtimeid"); + try { if(compressed){ digest=StagingTable.Decompress(digest); @@ -1932,7 +1936,7 @@ public class MusicMixin implements MusicInterface { throw e; } ecDigestInformation.put(txTimeId, changes); - } + } return ecDigestInformation; } @@ -2218,7 +2222,7 @@ public class MusicMixin implements MusicInterface { * @param cql the CQL to be sent to Cassandra */ private static void executeMusicWriteQuery(String keyspace, String table, String cql) - throws MDBCServiceException { + throws MDBCServiceException { PreparedQueryObject pQueryObject = new PreparedQueryObject(); pQueryObject.appendQueryString(cql); ResultType rt = null; @@ -2254,6 +2258,9 @@ public class MusicMixin implements MusicInterface { throw new MDBCServiceException("Error executing atomic get", e); } } + if(result==null){ + throw new MDBCServiceException("Error executing atomic get for primary key: "+primaryKey); + } if(result.isExhausted()){ return null; } @@ -2418,11 +2425,17 @@ public class MusicMixin implements MusicInterface { pQueryObject.addValue(row.getPartitionIndex()); ReturnType rt ; try { - rt = MusicCore.atomicPut(music_ns, musicRangeDependencyTableName, row.getPartitionIndex().toString(), + rt = MusicCore.atomicPut(music_ns, musicRangeInformationTableName, row.getPartitionIndex().toString(), pQueryObject, null); } catch (MusicLockingException|MusicQueryException|MusicServiceException e) { logger.error("Failure when deleting mri row"); new MDBCServiceException("Error deleting mri row",e); } } + + public StateManager getStateManager() { + return stateManager; + } + + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java index 6d6e691..15caf3f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java @@ -40,6 +40,7 @@ import org.json.JSONTokener; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.mdbc.Configuration; import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; @@ -75,7 +76,7 @@ public class MySQLMixin implements DBInterface { public static final String TRANS_TBL = "MDBC_TRANSLOG"; private static final String CREATE_TBL_SQL = "CREATE TABLE IF NOT EXISTS "+TRANS_TBL+ - " (IX INT AUTO_INCREMENT, OP CHAR(1), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA VARCHAR(1024), " + + " (IX INT AUTO_INCREMENT, OP CHAR(1), SCHEMANAME VARCHAR(255), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA VARCHAR(1024), " + "CONNECTION_ID INT, PRIMARY KEY (IX));"; private final MusicInterface mi; @@ -85,6 +86,10 @@ public class MySQLMixin implements DBInterface { private final Map<String, TableInfo> tables; private PreparedStatement deleteStagingStatement; private boolean server_tbl_created = false; + private boolean useAsyncStagingUpdate = false; + private Object stagingHandlerLock = new Object(); + private AsyncUpdateHandler stagingHandler = null; + private StagingTable currentStaging=null; public MySQLMixin() { this.mi = null; @@ -100,12 +105,35 @@ public class MySQLMixin implements DBInterface { this.dbName = getDBName(conn); this.jdbcConn = conn; this.tables = new HashMap<String, TableInfo>(); + useAsyncStagingUpdate = Boolean.parseBoolean(info.getProperty(Configuration.KEY_ASYNC_STAGING_TABLE_UPDATE, + Configuration.ASYNC_STAGING_TABLE_UPDATE)); + } + + class StagingTableUpdateRunnable implements Runnable{ + + private MySQLMixin mixin; + private StagingTable staging; + + StagingTableUpdateRunnable(MySQLMixin mixin, StagingTable staging){ + this.mixin=mixin; + this.staging=staging; + } + + @Override + public void run() { + try { + this.mixin.updateStagingTable(staging); + } catch (NoSuchFieldException|MDBCServiceException e) { + this.mixin.logger.error("Error when updating the staging table"); + } + } } private PreparedStatement getStagingDeletePreparedStatement() throws SQLException { return jdbcConn.prepareStatement("DELETE FROM "+TRANS_TBL+" WHERE (IX BETWEEN ? AND ? ) AND " + "CONNECTION_ID = ?;"); } + // This is used to generate a unique connId for this connection to the DB. private int generateConnID(Connection conn) { int rv = (int) System.currentTimeMillis(); // random-ish @@ -156,12 +184,20 @@ public class MySQLMixin implements DBInterface { } return dbname; } - + + @Override public String getDatabaseName() { return this.dbName; } @Override + public String getSchema() {return this.dbName;} + + /** + * Get a set of the table names in the database. + * @return the set + */ + @Override public Set<String> getSQLTableSet() { Set<String> set = new TreeSet<String>(); String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'"; @@ -183,7 +219,7 @@ public class MySQLMixin implements DBInterface { @Override public Set<Range> getSQLRangeSet() { Set<String> set = new TreeSet<String>(); - String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'"; + String sql = "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'"; try { Statement stmt = jdbcConn.createStatement(); ResultSet rs = stmt.executeQuery(sql); @@ -244,9 +280,19 @@ mysql> describe tables; TableInfo ti = tables.get(tableName); if (ti == null) { try { - String tbl = tableName;//.toUpperCase(); - String sql = "SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME='"+tbl+"'"; - ResultSet rs = executeSQLRead(sql); + final String[] split = tableName.split("\\."); + String tbl = (split.length==2)?split[1]:tableName; + String localSchema = (split.length==2)?split[0]:getSchema(); + StringBuilder sql=new StringBuilder(); + sql.append("SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA="); + if(localSchema==null) { + sql.append("DATABASE() AND TABLE_NAME='"); + } + else { + sql.append("'").append(localSchema).append("' AND TABLE_NAME='"); + } + sql.append(tbl).append("';"); + ResultSet rs = executeSQLRead(sql.toString()); if (rs != null) { ti = new TableInfo(); while (rs.next()) { @@ -296,9 +342,13 @@ mysql> describe tables; } } @Override - public void createSQLTriggers(String tableName) { - // Don't create triggers for the table the triggers write into!!! + public void createSQLTriggers(String table) { + final String[] split = table.split("\\."); + String schemaName = (split.length==2)?split[0]:getSchema(); + String tableName = (split.length==2)?split[1]:table; + if (tableName.equals(TRANS_TBL)) + // Don't create triggers for the table the triggers write into!!! return; try { if (!server_tbl_created) { @@ -321,12 +371,12 @@ mysql> describe tables; //msm.register(name); } // No SELECT trigger - executeSQLWrite(generateTrigger(tableName, "INSERT")); - executeSQLWrite(generateTrigger(tableName, "UPDATE")); + executeSQLWrite(generateTrigger(schemaName,tableName, "INSERT")); + executeSQLWrite(generateTrigger(schemaName,tableName, "UPDATE")); //\TODO: save key row instead of the whole row for delete - executeSQLWrite(generateTrigger(tableName, "DELETE")); + executeSQLWrite(generateTrigger(schemaName,tableName, "DELETE")); } catch (SQLException e) { - if (e.getMessage().equals("Trigger already exists")) { + if (e.getMessage().equals("Trigger already exists") || e.getMessage().endsWith("already exists")){ //only warn if trigger already exists logger.warn(EELFLoggerDelegate.applicationLogger, "createSQLTriggers" + e); } else { @@ -343,7 +393,7 @@ END; OLD.field refers to the old value NEW.field refers to the new value */ - private String generateTrigger(String tableName, String op) { + private String generateTrigger(String schema, String tableName, String op) { boolean isdelete = op.equals("DELETE"); boolean isinsert = op.equals("INSERT"); boolean isupdate = op.equals("UPDATE"); @@ -371,25 +421,27 @@ NEW.field refers to the new value //\TODO check if using mysql driver, so instead check the exception //\TODO add conditional for update, if primary key is still the same, use null in the KEYDATA col StringBuilder sb = new StringBuilder() - .append("CREATE TRIGGER ") // IF NOT EXISTS not supported by MySQL! - .append(String.format("%s_%s", op.substring(0, 1), tableName)) - .append(" AFTER ") - .append(op) - .append(" ON ") - .append(tableName.toUpperCase()) - .append(" FOR EACH ROW INSERT INTO ") - .append(TRANS_TBL) - .append(" (TABLENAME, OP, KEYDATA, ROWDATA, CONNECTION_ID) VALUES('") - .append(tableName.toUpperCase()) - .append("', ") - .append(isdelete ? "'D'" : (op.equals("INSERT") ? "'I'" : "'U'")) - .append(", ") - .append( (keyJson.length()>"JSON_OBJECT()".length()) ? keyJson.toString() : "NULL") - .append(", ") - .append(newJson.toString()) - .append(", ") - .append("CONNECTION_ID()") - .append(")"); + .append("CREATE TRIGGER ") // IF NOT EXISTS not supported by MySQL! + .append(String.format("%s_%s", op.substring(0, 1), tableName)) + .append(" AFTER ") + .append(op) + .append(" ON ") + .append(tableName) + .append(" FOR EACH ROW INSERT INTO ") + .append(TRANS_TBL) + .append(" (SCHEMANAME, TABLENAME, OP, KEYDATA, ROWDATA, CONNECTION_ID) VALUES('") + .append( (schema==null)?this.getSchema():schema ) + .append("', '") + .append(tableName) + .append("', ") + .append(isdelete ? "'D'" : (op.equals("INSERT") ? "'I'" : "'U'")) + .append(", ") + .append( (keyJson.length()>"JSON_OBJECT()".length()) ? keyJson.toString() : "NULL") + .append(", ") + .append(newJson.toString()) + .append(", ") + .append("CONNECTION_ID()") + .append(")"); return sb.toString(); } private String[] getTriggerNames(String tableName) { @@ -521,6 +573,16 @@ NEW.field refers to the new value return rs; } + @Override + public void preCommitHook() { + synchronized (stagingHandlerLock){ + //\TODO check if this can potentially block forever in certain scenarios + if(stagingHandler!=null){ + stagingHandler.waitForAllPendingUpdates(); + } + } + } + /** * This method executes a write query in the sql database. * @param sql the SQL to be sent to MySQL @@ -569,11 +631,24 @@ NEW.field refers to the new value String[] parts = sql.trim().split(" "); String cmd = parts[0].toLowerCase(); if ("delete".equals(cmd) || "insert".equals(cmd) || "update".equals(cmd)) { - try { - this.updateStagingTable(transactionDigest); - } catch (NoSuchFieldException|MDBCServiceException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + if (useAsyncStagingUpdate) { + synchronized (stagingHandlerLock){ + if(stagingHandler==null||currentStaging!=transactionDigest){ + Runnable newRunnable = new StagingTableUpdateRunnable(this, transactionDigest); + currentStaging=transactionDigest; + stagingHandler=new AsyncUpdateHandler(newRunnable); + } + //else we can keep using the current staging Handler + } + stagingHandler.processNewUpdate(); + } else { + + try { + this.updateStagingTable(transactionDigest); + } catch (NoSuchFieldException | MDBCServiceException e) { + // TODO Auto-generated catch block + this.logger.error("Error updating the staging table"); + } } } } @@ -604,7 +679,7 @@ NEW.field refers to the new value throws NoSuchFieldException, MDBCServiceException { // copy from DB.MDBC_TRANSLOG where connid == myconnid // then delete from MDBC_TRANSLOG - String sql2 = "SELECT IX, TABLENAME, OP, ROWDATA,KEYDATA FROM "+TRANS_TBL +" WHERE CONNECTION_ID = " + this.connId; + String sql2 = "SELECT IX, SCHEMANAME, TABLENAME, OP, ROWDATA, KEYDATA FROM " + TRANS_TBL +" WHERE CONNECTION_ID = " + this.connId; Integer biggestIx = Integer.MIN_VALUE; Integer smallestIx = Integer.MAX_VALUE; try { @@ -616,10 +691,11 @@ NEW.field refers to the new value smallestIx = Integer.min(smallestIx,ix); String op = rs.getString("OP"); SQLOperation opType = toOpEnum(op); + String schema= rs.getString("SCHEMANAME"); String tbl = rs.getString("TABLENAME"); String newRowStr = rs.getString("ROWDATA"); String rowStr = rs.getString("KEYDATA"); - Range range = new Range(tbl); + Range range = new Range(schema+"."+tbl); transactionDigests.addOperation(range,opType,newRowStr,rowStr); rows.add(ix); } @@ -1056,7 +1132,7 @@ NEW.field refers to the new value private void clearReplayedOperations(Statement jdbcStmt) throws SQLException { logger.info("Clearing replayed operations"); String sql = "DELETE FROM " + TRANS_TBL + " WHERE CONNECTION_ID = " + this.connId; - jdbcStmt.executeQuery(sql); + jdbcStmt.executeUpdate(sql); } @Override diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java new file mode 100755 index 0000000..0f66731 --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java @@ -0,0 +1,1066 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ +package org.onap.music.mdbc.mixins; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.*; +import java.util.Map.Entry; +import net.sf.jsqlparser.JSQLParserException; +import net.sf.jsqlparser.parser.CCJSqlParserUtil; +import net.sf.jsqlparser.statement.delete.Delete; +import net.sf.jsqlparser.statement.insert.Insert; +import net.sf.jsqlparser.statement.update.Update; +import org.json.JSONObject; +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.mdbc.Configuration; +import org.onap.music.mdbc.MDBCUtils; +import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.TableInfo; +import org.onap.music.mdbc.mixins.MySQLMixin.StagingTableUpdateRunnable; +import org.onap.music.mdbc.tables.Operation; +import org.onap.music.mdbc.query.SQLOperation; +import org.onap.music.mdbc.tables.StagingTable; +import org.postgresql.util.PGInterval; +import org.postgresql.util.PGobject; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +/** + * This class provides the methods that MDBC needs in order to mirror data to/from a + * <a href="https://dev.mysql.com/">MySQL</a> or <a href="http://mariadb.org/">MariaDB</a> database instance. This class + * uses the <code>JSON_OBJECT()</code> database function, which means it requires the following minimum versions of + * either database: + * <table summary=""> + * <tr> + * <th>DATABASE</th> + * <th>VERSION</th> + * </tr> + * <tr> + * <td>MySQL</td> + * <td>5.7.8</td> + * </tr> + * <tr> + * <td>MariaDB</td> + * <td>10.2.3 (Note: 10.2.3 is currently (July 2017) a <i>beta</i> release)</td> + * </tr> + * </table> + * + * @author Robert P. Eby + */ +public class PostgresMixin implements DBInterface { + + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(PostgresMixin.class); + + public static final String MIXIN_NAME = "postgres"; + public static final String TRANS_TBL_SCHEMA = "audit"; + public static final String TRANS_TBL = "mdbc_translog"; + + private final MusicInterface mi; + private final String connId; + private final String dbName; + private final String schema; + private final Connection jdbcConn; + private final Map<String, TableInfo> tables; + private PreparedStatement deleteStagingStatement; + private boolean useAsyncStagingUpdate = false; + private Object stagingHandlerLock = new Object(); + private AsyncUpdateHandler stagingHandler = null; + private StagingTable currentStaging = null; + + public PostgresMixin() { + this.mi = null; + this.connId = ""; + this.dbName = null; + this.schema = null; + this.jdbcConn = null; + this.tables = null; + this.deleteStagingStatement = null; + } + + private void initializeDeleteStatement() throws SQLException { + deleteStagingStatement = jdbcConn.prepareStatement("DELETE FROM " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + + " WHERE (ix BETWEEN ? AND ? ) AND " + "connection_id = ?;"); + } + + public PostgresMixin(MusicInterface mi, String url, Connection conn, Properties info) throws SQLException { + this.mi = mi; + this.connId = generateConnID(conn); + this.dbName = getDBName(conn); + this.schema = getSchema(conn); + this.jdbcConn = conn; + this.tables = new HashMap<>(); + useAsyncStagingUpdate = Boolean.parseBoolean(info.getProperty(Configuration.KEY_ASYNC_STAGING_TABLE_UPDATE, + Configuration.ASYNC_STAGING_TABLE_UPDATE)); + initializePostgresTriggersStructures(); + initializeDeleteStatement(); + } + + class StagingTableUpdateRunnable implements Runnable { + + private PostgresMixin mixin; + private StagingTable staging; + + StagingTableUpdateRunnable(PostgresMixin mixin, StagingTable staging) { + this.mixin = mixin; + this.staging = staging; + } + + @Override + public void run() { + try { + this.mixin.updateStagingTable(staging); + } catch (NoSuchFieldException | MDBCServiceException e) { + this.mixin.logger.error("Error when updating the staging table"); + } + } + } + + + private void createTriggerTable() throws SQLException { + final String createSchemaSQL = "CREATE SCHEMA IF NOT EXISTS " + TRANS_TBL_SCHEMA + ";"; + final String revokeCreatePrivilegesSQL = "REVOKE CREATE ON schema " + TRANS_TBL_SCHEMA + " FROM public;"; + final String createTableSQL = "CREATE TABLE IF NOT EXISTS " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " (" + + "ix serial," + "op TEXT NOT NULL CHECK (op IN ('I','D','U'))," + "schema_name text NOT NULL," + + "table_name text NOT NULL," + "original_data json," + "new_data json," + "connection_id text," + + "PRIMARY KEY (connection_id,ix)" + ") WITH (fillfactor=100);"; + final String revokeSQL = "REVOKE INSERT,UPDATE,DELETE,TRUNCATE,REFERENCES,TRIGGER ON " + TRANS_TBL_SCHEMA + "." + + TRANS_TBL + " FROM public;"; + final String grantSelectSQL = "GRANT SELECT ON " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " TO public;"; + final String createIndexSQL = "CREATE INDEX IF NOT EXISTS logged_actions_connection_id_idx" + " ON " + + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " (connection_id);"; + Map<String, String> sqlStatements = new LinkedHashMap<String, String>() { + { + put("create_schema", createSchemaSQL); + put("revoke_privileges", revokeCreatePrivilegesSQL); + put("create_table", createTableSQL); + put("revoke_sql", revokeSQL); + put("grant_select", grantSelectSQL); + put("create_index", createIndexSQL); + } + }; + for (Entry<String, String> query : sqlStatements.entrySet()) { + int retryCount = 0; + boolean ready = false; + while (retryCount < 3 && !ready) { + try { + Statement statement = jdbcConn.createStatement(); + statement.executeUpdate(query.getValue()); + if (!jdbcConn.getAutoCommit()) { + jdbcConn.commit(); + } + statement.close(); + ready = true; + } catch (SQLException e) { + if (e.getMessage().equalsIgnoreCase("ERROR: tuple concurrently updated") || e.getMessage() + .toLowerCase().startsWith("error: duplicate key value violates unique constraint")) { + logger.warn("Error creating schema, retrying. for " + query.getKey(), e); + try { + Thread.sleep(100); + } catch (InterruptedException e1) { + } + } else { + logger.error("Error executing " + query.getKey(), e); + throw e; + } + } + retryCount++; + } + } + } + + private String updateTriggerSection() { + return "IF (TG_OP = 'UPDATE') THEN\n" + "v_old_data := row_to_json(OLD);\n" + + "v_new_data := row_to_json(NEW);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + + " (op,schema_name,table_name,original_data,new_data,connection_id)\n" + + "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_old_data,v_new_data,pg_backend_pid());\n" + + "RETURN NEW; "; + } + + private String insertTriggerSection() { + // \TODO add additinoal conditional on change "IF NEW IS DISTINCT FROM OLD THEN" + return "IF (TG_OP = 'INSERT') THEN\n" + "v_new_data := row_to_json(NEW);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA + + "." + TRANS_TBL + " (op,schema_name,table_name,new_data,connection_id)\n" + + "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_new_data,pg_backend_pid());\n" + + "RETURN NEW; "; + } + + private String deleteTriggerSection() { + return "IF (TG_OP = 'DELETE') THEN\n" + "v_old_data := row_to_json(OLD);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA + + "." + TRANS_TBL + " (op,schema_name,table_name,original_data,connection_id)\n" + + "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_old_data,pg_backend_pid());\n" + + "RETURN OLD; "; + } + + private String functionName(SQLOperation type) { + final String functionName = (type.equals(SQLOperation.UPDATE)) ? "if_updated_func" + : (type.equals(SQLOperation.INSERT)) ? "if_inserted_func" : "if_deleted_func"; + return "audit." + functionName + "()"; + } + + private void createTriggerFunctions(SQLOperation type) throws SQLException { + StringBuilder functionSQL = + new StringBuilder("CREATE OR REPLACE FUNCTION " + functionName(type) + " RETURNS TRIGGER AS $body$\n" + + "DECLARE\n" + "v_old_data json;\n" + "v_new_data json;\n" + "BEGIN\n"); + switch (type) { + case UPDATE: + functionSQL.append(updateTriggerSection()); + break; + case INSERT: + functionSQL.append(insertTriggerSection()); + break; + case DELETE: + functionSQL.append(deleteTriggerSection()); + break; + default: + throw new IllegalArgumentException("Invalid operation type for creation of trigger functions"); + } + functionSQL.append("ELSE\n" + + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - Other action occurred: %, at %',TG_OP,now();\n" + + "RETURN NULL;\n" + "END IF;\n" + "EXCEPTION\n" + "WHEN data_exception THEN\n" + + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [DATA EXCEPTION] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n" + + "RETURN NULL;\n" + "WHEN unique_violation THEN\n" + + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [UNIQUE] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n" + + "RETURN NULL;\n" + "WHEN OTHERS THEN\n" + + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [OTHER] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n" + + "RETURN NULL;\n" + "END;\n" + "$body$\n" + "LANGUAGE plpgsql\n" + "SECURITY DEFINER\n" + + "SET search_path = pg_catalog, audit;"); + int retryCount = 0; + boolean ready = false; + while (retryCount < 3 && !ready) { + try { + executeSQLWrite(functionSQL.toString()); + ready = true; + } catch (SQLException e) { + if (e.getMessage().equalsIgnoreCase("ERROR: tuple concurrently updated")) { + logger.warn("Error creating schema, retrying. ", e); + try { + Thread.sleep(200); + } catch (InterruptedException e1) { + } + } else { + logger.error("Error executing creation of trigger function", e); + throw e; + } + } + retryCount++; + } + } + + private void initializePostgresTriggersStructures() throws SQLException { + try { + createTriggerTable(); + } catch (SQLException e) { + logger.error("Error creating the trigger tables in postgres", e); + throw e; + } + try { + createTriggerFunctions(SQLOperation.INSERT); + createTriggerFunctions(SQLOperation.UPDATE); + createTriggerFunctions(SQLOperation.DELETE); + } catch (SQLException e) { + logger.error("Error creating the trigger functions in postgres", e); + throw e; + } + } + + // This is used to generate a unique connId for this connection to the DB. + private String generateConnID(Connection conn) { + String rv = Integer.toString((int) System.currentTimeMillis()); // random-ish + try { + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT pg_backend_pid() AS IX"); + if (rs.next()) { + rv = rs.getString("IX"); + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "generateConnID: problem generating a connection ID!"); + } + return rv; + } + + /** + * Get the name of this DBnterface mixin object. + * + * @return the name + */ + @Override + public String getMixinName() { + return MIXIN_NAME; + } + + @Override + public void close() { + // nothing yet + } + + /** + * Determines the db name associated with the connection This is the private/internal method that actually + * determines the name + * + * @param conn + * @return + */ + private String getDBName(Connection conn) { + String dbname = "mdbc"; // default name + try { + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT current_database();"); + if (rs.next()) { + dbname = rs.getString("current_database"); + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "getDBName: problem getting database name from mysql"); + } + return dbname; + } + + /** + * Determines the db name associated with the connection This is the private/internal method that actually + * determines the name + * + * @param conn + * @return + */ + private String getSchema(Connection conn) { + String schema = "public"; // default name + try { + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT current_schema();"); + if (rs.next()) { + schema = rs.getString("current_schema"); + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "getDBName: problem getting database name from mysql"); + } + return schema; + } + + @Override + public String getSchema() { + return schema; + } + + @Override + public String getDatabaseName() { + return this.dbName; + } + + /** + * Get a set of the table names in the database. + * + * @return the set + */ + @Override + public Set<Range> getSQLRangeSet() { + Set<String> set = new TreeSet<String>(); + String sql = + "SELECT table_name FROM information_schema.tables WHERE table_type='BASE TABLE' AND table_schema=current_schema();"; + try { + Statement stmt = jdbcConn.createStatement(); + ResultSet rs = stmt.executeQuery(sql); + while (rs.next()) { + String s = rs.getString("table_name"); + set.add(s); + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "getSQLTableSet: " + e); + } + logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set); + Set<Range> rangeSet = new HashSet<>(); + for (String table : set) { + rangeSet.add(new Range(table)); + } + return rangeSet; + } + + /** + * Return a TableInfo object for the specified table. This method first looks in a cache of previously constructed + * TableInfo objects for the table. If not found, it queries the INFORMATION_SCHEMA.COLUMNS table to obtain the + * column names, types, and indexes of the table. It creates a new TableInfo object with the results. + * + * @param tableName the table to look up + * @return a TableInfo object containing the info we need, or null if the table does not exist + */ + @Override + public TableInfo getTableInfo(String tableName) { + // \TODO: invalidate "tables" when a table schema is modified (uncommon), but needs to be handled + TableInfo ti = tables.get(tableName); + if (ti == null) { + try { + String tbl, localSchema; + final String[] split = tableName.split("\\."); + if (split.length == 2) { + localSchema = split[0]; + tbl = split[1]; + } else { + tbl = tableName; + localSchema = this.schema; + } + String sql; + if (schema == null) { + sql = "select column_name, data_type from information_schema.columns where table_schema=current_schema() and table_name='" + + tbl + "';"; + } else { + sql = "select column_name, data_type from information_schema.columns where table_schema='" + + localSchema + "' and table_name='" + tbl + "';"; + } + ResultSet rs = executeSQLRead(sql); + if (rs != null) { + ti = new TableInfo(); + while (rs.next()) { + String name = rs.getString("column_name"); + String type = rs.getString("data_type"); + ti.columns.add(name); + ti.coltype.add(mapDatatypeNameToType(type)); + } + rs.getStatement().close(); + } else { + logger.error(EELFLoggerDelegate.errorLogger, + "Cannot retrieve table info for table " + tableName + " from POSTGRES."); + return null; + } + final String keysql = + "SELECT a.attname as column_name FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid" + + " AND a.attnum = ANY(i.indkey) WHERE i.indrelid = '" + tbl + "'::regclass " + + " AND i.indisprimary;"; + ResultSet rs2 = executeSQLRead(keysql); + Set<String> keycols = new HashSet<>(); + if (rs2 != null) { + while (rs2.next()) { + String name = rs2.getString("column_name"); + keycols.add(name); + } + rs2.getStatement().close(); + } else { + logger.error(EELFLoggerDelegate.errorLogger, + "Cannot retrieve table info for table " + tableName + " from MySQL."); + } + for (String col : ti.columns) { + if (keycols.contains(col)) { + ti.iskey.add(true); + } else { + ti.iskey.add(false); + } + } + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, + "Cannot retrieve table info for table " + tableName + " from MySQL: " + e); + return null; + } + tables.put(tableName, ti); + } + return ti; + } + + // Map Postgres data type names to the java.sql.Types equivalent + private int mapDatatypeNameToType(String nm) { + switch (nm) { + case "character": + return Types.CHAR; + case "national character": + return Types.NCHAR; + case "character varying": + return Types.VARCHAR; + case "national character varying": + return Types.NVARCHAR; + case "text": + return Types.VARCHAR; + case "bytea": + return Types.BINARY; + case "smallint": + return Types.SMALLINT; + case "integer": + return Types.INTEGER; + case "bigint": + return Types.BIGINT; + case "smallserial": + return Types.SMALLINT; + case "serial": + return Types.INTEGER; + case "bigserial": + return Types.BIGINT; + case "real": + return Types.REAL; + case "double precision": + return Types.DOUBLE; + case "numeric": + return Types.NUMERIC; + case "decimal": + return Types.DECIMAL; + case "date": + return Types.DATE; + case "time with time zone": + return Types.TIME_WITH_TIMEZONE; + case "time without time zone": + return Types.TIME; + case "timestamp without time zone": + return Types.TIMESTAMP; + case "timestamp with time zone": + return Types.TIMESTAMP_WITH_TIMEZONE; + case "boolean": + return Types.BIT; + case "bit": + return Types.BIT; + case "oid": + return Types.BIGINT; + case "xml": + return Types.SQLXML; + case "array": + return Types.ARRAY; + case "tinyint": + return Types.TINYINT; + case "uuid": + case "money": + case "interval": + case "bit varying": + case "box": + case "point": + case "lseg": + case "path": + case "polygon": + case "circle": + case "json": + case "inet": + case "cidr": + case "macaddr": + case "tsvector": + case "tsquery": + return Types.OTHER; + default: + logger.error(EELFLoggerDelegate.errorLogger, "unrecognized and/or unsupported data type " + nm); + return Types.VARCHAR; + } + } + + @Override + public void createSQLTriggers(String tableName) { + // Don't create triggers for the table the triggers write into!!! + if (tableName.equals(TRANS_TBL) || tableName.equals(TRANS_TBL_SCHEMA + "." + TRANS_TBL)) + return; + try { + // No SELECT trigger + executeSQLWrite(generateTrigger(tableName, SQLOperation.INSERT)); + executeSQLWrite(generateTrigger(tableName, SQLOperation.DELETE)); + executeSQLWrite(generateTrigger(tableName, SQLOperation.UPDATE)); + } catch (SQLException e) { + if (e.getMessage().trim().endsWith("already exists")) { + // only warn if trigger already exists + logger.warn(EELFLoggerDelegate.applicationLogger, "createSQLTriggers" + e); + } else { + logger.error(EELFLoggerDelegate.errorLogger, "createSQLTriggers: " + e); + } + } + } + + private String generateTrigger(String tableName, SQLOperation op) { + StringBuilder triggerSql = new StringBuilder("CREATE TRIGGER ").append(getTriggerName(tableName, op)) + .append(" AFTER " + op + " ON ").append(tableName).append(" FOR EACH ROW EXECUTE PROCEDURE ") + .append(functionName(op)).append(';'); + return triggerSql.toString(); + } + + private String getTriggerName(String tableName, SQLOperation op) { + switch (op) { + case DELETE: + return "D_" + tableName; + case UPDATE: + return "U_" + tableName; + case INSERT: + return "I_" + tableName; + default: + throw new IllegalArgumentException("Invalid option in trigger operation type"); + } + } + + private String[] getTriggerNames(String tableName) { + return new String[] {getTriggerName(tableName, SQLOperation.INSERT), + getTriggerName(tableName, SQLOperation.DELETE), getTriggerName(tableName, SQLOperation.UPDATE),}; + } + + @Override + public void dropSQLTriggers(String tableName) { + try { + for (String name : getTriggerNames(tableName)) { + logger.debug(EELFLoggerDelegate.applicationLogger, "REMOVE trigger " + name + " from postgres"); + executeSQLWrite("DROP TRIGGER IF EXISTS " + name + ";"); + } + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "dropSQLTriggers: " + e); + } + } + + @Override + public void insertRowIntoSqlDb(String tableName, Map<String, Object> map) { + throw new org.apache.commons.lang.NotImplementedException("Function not implemented yet in postgres"); + } + + @Override + public void deleteRowFromSqlDb(String tableName, Map<String, Object> map) { + throw new org.apache.commons.lang.NotImplementedException("Function not implemented yet in postgres"); + } + + /** + * This method executes a read query in the SQL database. Methods that call this method should be sure to call + * resultset.getStatement().close() when done in order to free up resources. + * + * @param sql the query to run + * @return a ResultSet containing the rows returned from the query + */ + @Override + public ResultSet executeSQLRead(String sql) { + logger.debug(EELFLoggerDelegate.applicationLogger, "Executing sql read in postgres"); + logger.debug("Executing SQL read:" + sql); + ResultSet rs; + try { + Statement stmt = jdbcConn.createStatement(); + rs = stmt.executeQuery(sql); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "executeSQLRead" + e); + return null; + } + return rs; + } + + /** + * This method executes a write query in the sql database. + * + * @param sql the SQL to be sent to MySQL + * @throws SQLException if an underlying JDBC method throws an exception + */ + protected void executeSQLWrite(String sql) throws SQLException { + logger.debug(EELFLoggerDelegate.applicationLogger, "Executing SQL write:" + sql); + Statement stmt = jdbcConn.createStatement(); + stmt.execute(sql); + stmt.close(); + } + + @Override + public void preCommitHook() { + synchronized (stagingHandlerLock) { + // \TODO check if this can potentially block forever in certain scenarios + if (stagingHandler != null) { + stagingHandler.waitForAllPendingUpdates(); + } + } + } + + /** + * Code to be run within the DB driver before a SQL statement is executed. This is where tables can be synchronized + * before a SELECT, for those databases that do not support SELECT triggers. + * + * @param sql the SQL statement that is about to be executed + * @return list of keys that will be updated, if they can't be determined afterwards (i.e. sql table doesn't have + * primary key) + */ + @Override + public void preStatementHook(final String sql) { + if (sql == null) { + return; + } + // \TODO: check if anything needs to be executed here for postgres + } + + /** + * Code to be run within the DB driver after a SQL statement has been executed. This is where remote statement + * actions can be copied back to Cassandra/MUSIC. + * + * @param sql the SQL statement that was executed + */ + @Override + public void postStatementHook(final String sql, StagingTable transactionDigest) { + if (sql != null) { + String[] parts = sql.trim().split(" "); + String cmd = parts[0].toLowerCase(); + if ("delete".equals(cmd) || "insert".equals(cmd) || "update".equals(cmd)) { + if (useAsyncStagingUpdate) { + synchronized (stagingHandlerLock) { + if (stagingHandler == null || currentStaging != transactionDigest) { + Runnable newRunnable = + new PostgresMixin.StagingTableUpdateRunnable(this, transactionDigest); + currentStaging = transactionDigest; + stagingHandler = new AsyncUpdateHandler(newRunnable); + } + // else we can keep using the current staging Handler + } + stagingHandler.processNewUpdate(); + } else { + + try { + this.updateStagingTable(transactionDigest); + } catch (NoSuchFieldException | MDBCServiceException e) { + // TODO Auto-generated catch block + this.logger.error("Error updating the staging table"); + } + } + } + } + } + + private SQLOperation toOpEnum(String operation) throws NoSuchFieldException { + switch (operation.toLowerCase()) { + case "i": + return SQLOperation.INSERT; + case "d": + return SQLOperation.DELETE; + case "u": + return SQLOperation.UPDATE; + case "s": + return SQLOperation.SELECT; + default: + logger.error(EELFLoggerDelegate.errorLogger, "Invalid operation selected: [" + operation + "]"); + throw new NoSuchFieldException("Invalid operation enum"); + } + + } + + /** + * Copy data that is in transaction table into music interface + * + * @param transactionDigests + * @throws NoSuchFieldException + */ + private void updateStagingTable(StagingTable transactionDigests) throws NoSuchFieldException, MDBCServiceException { + String selectSql = "select ix, op, schema_name, table_name, original_data,new_data FROM " + TRANS_TBL_SCHEMA + + "." + TRANS_TBL + " where connection_id = '" + this.connId + "';"; + Integer biggestIx = Integer.MIN_VALUE; + Integer smallestIx = Integer.MAX_VALUE; + try { + ResultSet rs = executeSQLRead(selectSql); + Set<Integer> rows = new TreeSet<Integer>(); + while (rs.next()) { + int ix = rs.getInt("ix"); + biggestIx = Integer.max(biggestIx, ix); + smallestIx = Integer.min(smallestIx, ix); + String op = rs.getString("op"); + SQLOperation opType = toOpEnum(op); + String schema = rs.getString("schema_name"); + String tbl = rs.getString("table_name"); + String original = rs.getString("original_data"); + String newData = rs.getString("new_data"); + Range range = new Range(schema + "." + tbl); + transactionDigests.addOperation(range, opType, newData, original); + rows.add(ix); + } + rs.getStatement().close(); + if (rows.size() > 0) { + logger.debug("Staging delete: Executing with vals [" + smallestIx + "," + biggestIx + "," + this.connId + + "]"); + this.deleteStagingStatement.setInt(1, smallestIx); + this.deleteStagingStatement.setInt(2, biggestIx); + this.deleteStagingStatement.setString(3, this.connId); + this.deleteStagingStatement.execute(); + } + } catch (SQLException e) { + logger.warn("Exception in postStatementHook: " + e); + e.printStackTrace(); + } + } + + + + /** + * Update music with data from MySQL table + * + * @param tableName - name of table to update in music + */ + @Override + public void synchronizeData(String tableName) {} + + /** + * Return a list of "reserved" names, that should not be used by MySQL client/MUSIC These are reserved for mdbc + */ + @Override + public List<String> getReservedTblNames() { + ArrayList<String> rsvdTables = new ArrayList<String>(); + rsvdTables.add(TRANS_TBL_SCHEMA + "." + TRANS_TBL); + rsvdTables.add(TRANS_TBL); + // Add others here as necessary + return rsvdTables; + } + + @Override + public String getPrimaryKey(String sql, String tableName) { + return null; + } + + /** + * Parse the transaction digest into individual events + * + * @param transaction - base 64 encoded, serialized digest + */ + public void replayTransaction(StagingTable transaction, List<Range> ranges) + throws SQLException, MDBCServiceException { + boolean autocommit = jdbcConn.getAutoCommit(); + jdbcConn.setAutoCommit(false); + Statement jdbcStmt = jdbcConn.createStatement(); + final ArrayList<Operation> opList = transaction.getOperationList(); + + for (Operation op : opList) { + if (Range.overlaps(ranges, op.getTable())) { + try { + replayOperationIntoDB(jdbcStmt, op); + } catch (SQLException | MDBCServiceException e) { + // rollback transaction + logger.error("Unable to replay: " + op.getOperationType() + "->" + op.getVal() + "." + + "Rolling back the entire digest replay."); + jdbcConn.rollback(); + throw e; + } + } + } + + clearReplayedOperations(jdbcStmt); + jdbcConn.commit(); + jdbcStmt.close(); + jdbcConn.setAutoCommit(autocommit); + } + + @Override + public void disableForeignKeyChecks() throws SQLException { + Statement disable = jdbcConn.createStatement(); + disable.execute("SET session_replication_role = 'replica';"); + disable.closeOnCompletion(); + } + + @Override + public void enableForeignKeyChecks() throws SQLException { + Statement enable = jdbcConn.createStatement(); + enable.execute("SET session_replication_role = 'origin';"); + enable.closeOnCompletion(); + } + + @Override + public void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException { + replayTransaction(txDigest, ranges); + } + + /** + * Replays operation into database, usually from txDigest + * + * @param jdbcStmt: Connection used to perform the replay + * @param op: operation to be replayed + * @throws SQLException + * @throws MDBCServiceException + */ + private void replayOperationIntoDB(Statement jdbcStmt, Operation op) throws SQLException, MDBCServiceException { + logger.debug("Replaying Operation: " + op.getOperationType() + "->" + op.getVal()); + JSONObject newVal = op.getVal(); + JSONObject oldVal = null; + try { + oldVal = op.getKey(); + } catch (MDBCServiceException e) { + // Ignore exception, in postgres the structure of the operation is different + } + + + TableInfo ti = getTableInfo(op.getTable()); + final List<String> keyColumns = ti.getKeyColumns(); + + // build and replay the queries + String sql = constructSQL(op, keyColumns, newVal, oldVal); + if (sql == null) + return; + + try { + logger.debug("Replaying operation: " + sql); + int updated = jdbcStmt.executeUpdate(sql); + + if (updated == 0) { + // This applies only for replaying transactions involving Eventually Consistent tables + logger.warn( + "Error Replaying operation: " + sql + "; Replacing insert/replace/viceversa and replaying "); + + buildAndExecuteSQLInverse(jdbcStmt, op, keyColumns, newVal, oldVal); + } + } catch (SQLException sqlE) { + // This applies for replaying transactions involving Eventually Consistent tables + // or transactions that replay on top of existing keys + logger.warn( + "Error Replaying operation: " + sql + ";" + "Replacing insert/replace/viceversa and replaying "); + + buildAndExecuteSQLInverse(jdbcStmt, op, keyColumns, newVal, oldVal); + + } + } + + protected void buildAndExecuteSQLInverse(Statement jdbcStmt, Operation op, List<String> keyColumns, + JSONObject newVals, JSONObject oldVals) throws SQLException, MDBCServiceException { + String sqlInverse = constructSQLInverse(op, keyColumns, newVals, oldVals); + if (sqlInverse == null) + return; + logger.debug("Replaying operation: " + sqlInverse); + jdbcStmt.executeUpdate(sqlInverse); + } + + protected String constructSQLInverse(Operation op, List<String> keyColumns, JSONObject newVals, JSONObject oldVals) + throws MDBCServiceException { + String sqlInverse = null; + switch (op.getOperationType()) { + case INSERT: + sqlInverse = constructUpdate(op.getTable(), keyColumns, newVals, oldVals); + break; + case UPDATE: + sqlInverse = constructInsert(op.getTable(), newVals); + break; + default: + break; + } + return sqlInverse; + } + + protected String constructSQL(Operation op, List<String> keyColumns, JSONObject newVals, JSONObject oldVals) + throws MDBCServiceException { + String sql = null; + switch (op.getOperationType()) { + case INSERT: + sql = constructInsert(op.getTable(), newVals); + break; + case UPDATE: + sql = constructUpdate(op.getTable(), keyColumns, newVals, oldVals); + break; + case DELETE: + sql = constructDelete(op.getTable(), keyColumns, oldVals); + break; + case SELECT: + // no update happened, do nothing + break; + default: + logger.error(op.getOperationType() + "not implemented for replay"); + } + return sql; + } + + private String constructDelete(String tableName, List<String> keyColumns, JSONObject oldVals) + throws MDBCServiceException { + if (oldVals == null) { + throw new MDBCServiceException("Trying to update row with an empty old val exception"); + } + StringBuilder sql = new StringBuilder(); + sql.append("DELETE FROM "); + sql.append(tableName + " WHERE "); + sql.append(getPrimaryKeyConditional(keyColumns, oldVals)); + sql.append(";"); + return sql.toString(); + } + + private String constructInsert(String tableName, JSONObject newVals) { + StringBuilder keys = new StringBuilder(); + StringBuilder vals = new StringBuilder(); + String sep = ""; + for (String col : newVals.keySet()) { + keys.append(sep + col); + vals.append(sep + "'" + newVals.get(col) + "'"); + sep = ", "; + } + StringBuilder sql = new StringBuilder(); + sql.append("INSERT INTO ").append(tableName + " (").append(keys).append(") VALUES (").append(vals).append(");"); + return sql.toString(); + } + + private String constructUpdate(String tableName, List<String> keyColumns, JSONObject newVals, JSONObject oldVals) + throws MDBCServiceException { + if (oldVals == null) { + throw new MDBCServiceException("Trying to update row with an empty old val exception"); + } + StringBuilder sql = new StringBuilder(); + sql.append("UPDATE ").append(tableName).append(" SET "); + String sep = ""; + for (String key : newVals.keySet()) { + sql.append(sep).append(key).append("=\"").append(newVals.get(key)).append("\""); + sep = ", "; + } + sql.append(" WHERE "); + sql.append(getPrimaryKeyConditional(keyColumns, oldVals)); + sql.append(";"); + return sql.toString(); + } + + /** + * Create an SQL string for AND'ing all of the primary keys + * + * @param keyColumns list with the name of the columns that are key + * @param vals json with the contents of the old row + * @return string in the form of PK1=Val1 AND PK2=Val2 AND PK3=Val3 + */ + private String getPrimaryKeyConditional(List<String> keyColumns, JSONObject vals) { + StringBuilder keyCondStmt = new StringBuilder(); + String and = ""; + for (String key : keyColumns) { + // We cannot use the default primary key for the sql table and operations + if (!key.equals(mi.getMusicDefaultPrimaryKeyName())) { + Object val = vals.get(key); + keyCondStmt.append(and + key + "=\"" + val + "\""); + and = " AND "; + } + } + return keyCondStmt.toString(); + } + + /** + * Cleans out the transaction table, removing the replayed operations + * + * @param jdbcStmt + * @throws SQLException + */ + private void clearReplayedOperations(Statement jdbcStmt) throws SQLException { + logger.info("Clearing replayed operations"); + String sql = + "DELETE FROM " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " WHERE CONNECTION_ID = '" + this.connId + "';"; + jdbcStmt.executeUpdate(sql); + } + + @Override + public Connection getSQLConnection() { + return jdbcConn; + } + + @Override + public Set<String> getSQLTableSet() { + Set<String> set = new TreeSet<String>(); + String sql = + "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=CURRENT_SCHEMA() AND TABLE_TYPE='BASE TABLE'"; + try { + Statement stmt = jdbcConn.createStatement(); + ResultSet rs = stmt.executeQuery(sql); + while (rs.next()) { + String s = rs.getString("TABLE_NAME"); + set.add(s); + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "getSQLTableSet: " + e); + System.out.println("getSQLTableSet: " + e); + e.printStackTrace(); + } + logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set); + System.out.println("getSQLTableSet returning: " + set); + return set; + } + +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java index 86088f9..171ad79 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java @@ -190,28 +190,6 @@ public class Utils { } return list; } - - public static void registerDefaultDrivers() { - Properties pr = null; - try { - pr = new Properties(); - pr.load(Utils.class.getResourceAsStream("/mdbc.properties")); - } - catch (IOException e) { - logger.error("Could not load property file > " + e.getMessage()); - } - - @SuppressWarnings("unused") - List<Class<?>> list = new ArrayList<Class<?>>(); - String drivers = pr.getProperty("DEFAULT_DRIVERS"); - for (String driver: drivers.split("[ ,]")) { - logger.info(EELFLoggerDelegate.applicationLogger, "Registering jdbc driver '" + driver + "'"); - try { - @SuppressWarnings("unused") - Class<?> cl = Class.forName(driver.trim()); - } catch (ClassNotFoundException e) { - logger.error(EELFLoggerDelegate.errorLogger,"Driver class "+driver+" not found."); - } - } - } + + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java index 4f3a3bf..5c6fae4 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java @@ -89,9 +89,10 @@ public class MusicTxDigestDaemon implements Runnable { logger.error("Music interface or DB interface is null in background daemon"); return; } + MdbcConnection conn = null; while (true) { try { - MdbcConnection conn = (MdbcConnection) stateManager.getConnection("daemon"); + conn = (MdbcConnection) stateManager.getConnection("daemon"); if (conn == null) { logger.error("Connection created is null in background daemon"); return; @@ -111,18 +112,20 @@ public class MusicTxDigestDaemon implements Runnable { } //2) for each partition I don't own final Set<Range> warmupRanges = stateManager.getRangesToWarmup(); - final List<DatabasePartition> currentPartitions = stateManager.getPartitions(); - List<Range> missingRanges = new ArrayList<>(); - if (currentPartitions.size() != 0) { - for (DatabasePartition part : currentPartitions) { - List<Range> partitionRanges = part.getSnapshot(); - warmupRanges.removeAll(partitionRanges); - } - try { - stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges)); - } catch (MDBCServiceException e) { - logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage()); - continue; + if (warmupRanges!=null) { + final List<DatabasePartition> currentPartitions = stateManager.getPartitions(); + List<Range> missingRanges = new ArrayList<>(); + if (currentPartitions.size() != 0) { + for (DatabasePartition part : currentPartitions) { + List<Range> partitionRanges = part.getSnapshot(); + warmupRanges.removeAll(partitionRanges); + } + try { + stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges)); + } catch (MDBCServiceException e) { + logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage()); + continue; + } } } @@ -138,6 +141,12 @@ public class MusicTxDigestDaemon implements Runnable { } catch (InterruptedException | SQLException e) { logger.error("MusicTxDigest background daemon stopped " + e.getMessage(), e); Thread.currentThread().interrupt(); + } finally { + try { + if (conn!=null && !conn.isClosed()) conn.close(); + } catch (SQLException e) { + logger.error("MusicTxDigest background daemon error closing" + e.getMessage(), e); + } } } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java index 8fa49a9..db9e455 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java @@ -22,10 +22,18 @@ package org.onap.music.mdbc.tables; import java.util.UUID; public final class MusicTxDigestId { + public final UUID mriId; public final UUID transactionId; public final int index; + public MusicTxDigestId(UUID mriRowId, UUID digestId, int index) { + this.mriId=mriRowId; + this.transactionId= digestId; + this.index=index; + } + public MusicTxDigestId(UUID digestId, int index) { + this.mriId = null; this.transactionId= digestId; this.index=index; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java index d406ad3..3258d0f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java @@ -66,7 +66,7 @@ public final class Operation implements Serializable{ } public JSONObject getKey() throws MDBCServiceException { - if(KEY==null){ + if(KEY==null||KEY.isEmpty()){ throw new MDBCServiceException("This operation ["+TYPE.toString()+"] doesn't contain a key"); } JSONObject keys = new JSONObject(new JSONTokener(KEY)); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java index 4fdd7ac..4ef9d30 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java @@ -186,7 +186,10 @@ public class StagingTable { } OpType newType = (type==SQLOperation.INSERT)?OpType.INSERT:(type==SQLOperation.DELETE)? OpType.DELETE:OpType.UPDATE; - Row.Builder rowBuilder = Row.newBuilder().setTable(range.getTable()).setType(newType).setVal(newVal); + Row.Builder rowBuilder = Row.newBuilder().setTable(range.getTable()).setType(newType); + if(newVal!=null) { + rowBuilder.setVal(newVal); + } if(keys!=null){ rowBuilder.setKey(keys); } @@ -252,10 +255,10 @@ public class StagingTable { } synchronized public boolean isEventualEmpty() { - return (eventuallyBuilder.getRowsCount()==0); + return (eventuallyBuilder!=null) && (eventuallyBuilder.getRowsCount()==0); } - - synchronized public void clear() throws MDBCServiceException { + + synchronized public void clear() throws MDBCServiceException { if(!builderInitialized){ throw new MDBCServiceException("This type of staging table is unmutable, please use the constructor" + "with no parameters"); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java index 7624201..f6239d1 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java @@ -24,6 +24,8 @@ import org.onap.music.mdbc.configurations.NodeConfiguration; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; +import java.util.ArrayList; +import java.util.Arrays; import java.util.UUID; public class CreatePartition { @@ -51,7 +53,9 @@ public class CreatePartition { } public void convert(){ - config = new NodeConfiguration(tables, eventual,UUID.fromString(mriIndex),"test",""); + String[] tablesArray=eventual.split(","); + ArrayList<String> eventualTables = (new ArrayList<>(Arrays.asList(tablesArray))); + config = new NodeConfiguration(tables, eventualTables,UUID.fromString(mriIndex),"test",""); } public void saveToFile(){ diff --git a/mdbc-server/src/main/resources/mdbc.properties b/mdbc-server/src/main/resources/mdbc.properties index 73e8f77..49fdfd2 100755 --- a/mdbc-server/src/main/resources/mdbc.properties +++ b/mdbc-server/src/main/resources/mdbc.properties @@ -4,10 +4,15 @@ MIXINS= \ org.onap.music.mdbc.mixins.MySQLMixin \ org.onap.music.mdbc.mixins.MusicMixin \ - org.onap.music.mdbc.mixins.Music2Mixin + org.onap.music.mdbc.mixins.Music2Mixin \ + org.onap.music.mdbc.mixins.PostgresMixin + +DEFAULT_DB_MIXIN= mysql + +DEFAULT_MUSIC_MIXIN= cassandra2 DEFAULT_DRIVERS=\ - org.h2.Driver \ - com.mysql.jdbc.Driver + org.mariadb.jdbc.Driver \ + org.postgresql.Driver -txdaemonsleeps=15
\ No newline at end of file +txdaemonsleeps=15 diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java index 4703d0e..87f8445 100755 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java @@ -36,7 +36,7 @@ public class MDBCUtilsTest { public void toStringTest1() { StagingTable table = new StagingTable(); try { - table.addOperation(new Range("TABLE1"),SQLOperation.INSERT,(new JSONObject(new String[]{"test3", "Test4"})).toString(),null); + table.addOperation(new Range("TEST.TABLE1"),SQLOperation.INSERT,(new JSONObject(new String[]{"test3", "Test4"})).toString(),null); } catch (MDBCServiceException e) { fail(); } diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java new file mode 100644 index 0000000..72ec8d3 --- /dev/null +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java @@ -0,0 +1,280 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ + +package org.onap.music.mdbc; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import ch.vorburger.exec.ManagedProcessException; +import ch.vorburger.mariadb4j.DB; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import com.opentable.db.postgres.embedded.EmbeddedPostgres; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Properties; +import javax.sql.DataSource; +import org.cassandraunit.utils.EmbeddedCassandraServerHelper; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.onap.music.datastore.MusicDataStore; +import org.onap.music.datastore.MusicDataStoreHandle; +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.lockingservice.cassandra.CassaLockStore; +import org.onap.music.mdbc.mixins.MusicMixin; +import org.onap.music.mdbc.mixins.PostgresMixin; + +public class MdbcTestUtils { + + // Postgres variables + static EmbeddedPostgres pg=null; + static DataSource postgresDatabase=null; + final private static int postgresPort = 13307; + @Rule + public static TemporaryFolder tf = new TemporaryFolder(); + + // Cassandra variables + //Properties used to connect to music + private static Cluster cluster; + private static Session session; + + //Mdbc variables + final private static String keyspace="metricmusictest"; + final private static String mdbcServerName = "name"; + final private static String mtdTableName = "musictxdigest"; + final private static String eventualMtxdTableName = "musicevetxdigest"; + final private static String mriTableName = "musicrangeinformation"; + final private static String rangeDependencyTableName = "musicrangedependency"; + final private static String nodeInfoTableName = "nodeinfo"; + //Mariadb variables + static DB db=null; + final public static String mariaDBDatabaseName="test"; + final static Integer mariaDbPort=13306; + + + + public enum DBType {POSTGRES, MySQL} + + public static String getCassandraUrl(){ + return cluster.getMetadata().getAllHosts().iterator().next().getAddress().toString(); + + } + + public static String getKeyspace(){ + return keyspace; + } + + public static String getServerName(){ + return mdbcServerName; + } + + public static String getMriTableName(){ + return mriTableName; + } + + public static String getMariaDbPort() { + return mariaDbPort.toString(); + } + + public static String getMariaDBDBName(){ + return mariaDBDatabaseName; + } + + static Connection getPostgreConnection() { + startPostgres(); + Connection conn=null; + try + { + conn = postgresDatabase.getConnection(); + } catch(SQLException e){ + e.printStackTrace(); + fail(); + } + return conn; + } + + static synchronized public void startPostgres(){ + if(pg==null) { + try { + tf.create(); + pg = EmbeddedPostgres.builder().setPort(postgresPort).setDataDirectory(tf.newFolder("tmp")+"/data-dir").start(); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + } + if(postgresDatabase==null) { + postgresDatabase = pg.getPostgresDatabase(); + } + } + + static public String getPostgresUrl(){ + return getPostgresUrlWithoutDb()+"/postgres"; + } + + static public String getPostgresUrlWithoutDb(){ + return "jdbc:postgresql://localhost:"+Integer.toString(postgresPort); + } + + synchronized static Connection getMariadbConnection(){ + startMariaDb(); + Connection conn = null; + try { + conn = DriverManager + .getConnection(getMariadbUrlWithoutDatabase()+"/"+mariaDBDatabaseName, "root", ""); + } catch (SQLException e) { + e.printStackTrace(); + fail("Error creating mdbc connection"); + } + return conn; + } + + public synchronized static void startMariaDb(){ + if (db == null) { + try { + db=DB.newEmbeddedDB(mariaDbPort); + db.start(); + db.createDB(mariaDBDatabaseName); + } catch (ManagedProcessException e) { + e.printStackTrace(); + fail("error initializing embedded mariadb"); + } + } + } + + static String getMariadbUrlWithoutDatabase(){ + return "jdbc:mariadb://localhost:"+Integer.toString(mariaDbPort); + } + + public static Connection getConnection(DBType type){ + switch(type){ + case MySQL: + return getMariadbConnection(); + case POSTGRES: + return getPostgreConnection(); + default: + fail("Wrong type for creating connection"); + } + return null; + } + + synchronized static void stopPostgres(){ + postgresDatabase=null; + if(pg!=null) { + try { + pg.close(); + pg=null; + } catch (IOException e) { + e.printStackTrace(); + fail("Error closing postgres database"); + } + } + if(tf!=null){ + tf.delete(); + } + } + + static void stopMySql(){ + try { + db.stop(); + } catch (ManagedProcessException e) { + e.printStackTrace(); + fail("Error closing mysql"); + } + } + + public static void cleanDatabase(DBType type){ + switch(type) { + case MySQL: + stopMySql(); + break; + case POSTGRES: + stopPostgres(); + break; + default: + fail("Wrong type for creating connection"); + } + } + + public static void initCassandra(){ + try { + EmbeddedCassandraServerHelper.startEmbeddedCassandra(EmbeddedCassandraServerHelper.CASSANDRA_RNDPORT_YML_FILE); + } catch (Exception e) { + System.out.println(e); + fail("Error starting embedded cassandra"); + } + cluster=EmbeddedCassandraServerHelper.getCluster(); + //cluster = new Cluster.Builder().addContactPoint(cassaHost).withPort(9142).build(); + cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(20000); + assertNotNull("Invalid configuration for cassandra", cluster); + session = EmbeddedCassandraServerHelper.getSession(); + assertNotNull("Invalid configuration for cassandra", session); + + MusicDataStoreHandle.mDstoreHandle = new MusicDataStore(cluster, session); + CassaLockStore store = new CassaLockStore(MusicDataStoreHandle.mDstoreHandle); + assertNotNull("Invalid configuration for music", store); + } + + public static void stopCassandra(){ + try { + EmbeddedCassandraServerHelper.cleanEmbeddedCassandra(); + } + catch(NullPointerException e){ + } + } + + public static Session getSession(){ + return session; + } + + public static MusicMixin getMusicMixin() throws MDBCServiceException { + initNamespaces(); + initTables(); + MusicMixin mixin=null; + try { + Properties properties = new Properties(); + properties.setProperty(MusicMixin.KEY_MY_ID,MdbcTestUtils.getServerName()); + properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,MdbcTestUtils.getKeyspace()); + properties.setProperty(MusicMixin.KEY_MUSIC_RFACTOR,"1"); + properties.setProperty(MusicMixin.KEY_MUSIC_ADDRESS,MdbcTestUtils.getCassandraUrl()); + mixin =new MusicMixin(null, MdbcTestUtils.getServerName(),properties); + } catch (MDBCServiceException e) { + fail("error creating music mixin"); + } + return mixin; + } + + public static void initNamespaces() throws MDBCServiceException{ + MusicMixin.createKeyspace("music_internal",1); + MusicMixin.createKeyspace(keyspace,1); + } + + public static void initTables() throws MDBCServiceException{ + MusicMixin.createMusicRangeInformationTable(keyspace, mriTableName); + MusicMixin.createMusicTxDigest(mtdTableName,keyspace, -1); + MusicMixin.createMusicEventualTxDigest(eventualMtxdTableName,keyspace, -1); + MusicMixin.createMusicNodeInfoTable(nodeInfoTableName,keyspace,-1); + MusicMixin.createMusicRangeDependencyTable(keyspace,rangeDependencyTableName); + } + +} diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java index 2d31939..862e600 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java @@ -20,12 +20,14 @@ package org.onap.music.mdbc; +import java.util.Properties; import org.junit.*; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import org.onap.music.mdbc.MdbcTestUtils.DBType; import org.onap.music.mdbc.mixins.MySQLMixin; import ch.vorburger.mariadb4j.DB; @@ -33,8 +35,8 @@ import ch.vorburger.mariadb4j.DB; public class MySQLMixinTest { public static final String DATABASE = "mdbctest"; - public static final String TABLE= "Persons"; - public static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS " + TABLE + " (\n" + + public static final String TABLE= MdbcTestUtils.getMariaDBDBName(); + public static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS " + MdbcTestUtils.getMariaDBDBName()+ " (\n" + " PersonID int,\n" + " LastName varchar(255),\n" + " FirstName varchar(255),\n" + @@ -52,10 +54,8 @@ public class MySQLMixinTest { @BeforeClass public static void init() throws Exception { Class.forName("org.mariadb.jdbc.Driver"); - //start embedded mariadb - DB db = DB.newEmbeddedDB(13306); - db.start(); - db.createDB(DATABASE); + MdbcTestUtils.startMariaDb(); + } @AfterClass @@ -65,13 +65,14 @@ public class MySQLMixinTest { @Before public void beforeTest() throws SQLException { - this.conn = DriverManager.getConnection("jdbc:mariadb://localhost:13306/"+DATABASE, "root", ""); - this.mysqlMixin = new MySQLMixin(null, "localhost:13306/"+DATABASE, conn, null); + this.conn = MdbcTestUtils.getConnection(DBType.MySQL); + Properties info = new Properties(); + this.mysqlMixin = new MySQLMixin(null, null, conn, info); } @Test public void testGetDataBaseName() throws SQLException { - Assert.assertEquals(DATABASE, mysqlMixin.getDatabaseName()); + Assert.assertEquals(MdbcTestUtils.getMariaDBDBName(), mysqlMixin.getDatabaseName()); } } diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java index 41a943e..c8d284e 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java @@ -31,23 +31,35 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.UUID; import java.util.function.Consumer; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.Assert.*; + +import com.datastax.driver.core.Session; + +import java.util.*; + + import org.cassandraunit.utils.EmbeddedCassandraServerHelper; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; + import org.onap.music.datastore.MusicDataStore; import org.onap.music.datastore.MusicDataStoreHandle; + import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.exceptions.MusicLockingException; import org.onap.music.exceptions.MusicQueryException; import org.onap.music.exceptions.MusicServiceException; -import org.onap.music.lockingservice.cassandra.CassaLockStore; import org.onap.music.lockingservice.cassandra.MusicLockState; import org.onap.music.main.MusicCore; import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; + import org.onap.music.mdbc.StateManager; import org.onap.music.mdbc.proto.ProtoDigest.Digest.CompleteDigest; import org.onap.music.mdbc.tables.MusicRangeInformationRow; @@ -57,37 +69,25 @@ import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Session; import com.google.protobuf.InvalidProtocolBufferException; +import org.onap.music.mdbc.MdbcTestUtils; +import org.onap.music.mdbc.TestUtils; +import org.onap.music.mdbc.ownership.Dag; +import org.onap.music.mdbc.ownership.DagNode; +import org.onap.music.mdbc.tables.MusicRangeInformationRow; + public class MusicMixinTest { - final private static String keyspace="metricmusictest"; - final private static String mriTableName = "musicrangeinformation"; - final private static String mtdTableName = "musictxdigest"; - final private static String mdbcServerName = "name"; + //Properties used to connect to music - private static Cluster cluster; private static Session session; - private static String cassaHost = "localhost"; private static MusicMixin mixin = null; private StateManager stateManager; @BeforeClass - public static void init() throws MusicServiceException { - try { - EmbeddedCassandraServerHelper.startEmbeddedCassandra(); - } catch (Exception e) { - System.out.println(e); - } - cluster=EmbeddedCassandraServerHelper.getCluster(); - //cluster = new Cluster.Builder().addContactPoint(cassaHost).withPort(9142).build(); - cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(20000); - assertNotNull("Invalid configuration for cassandra", cluster); - session = EmbeddedCassandraServerHelper.getSession(); - assertNotNull("Invalid configuration for cassandra", session); + public static void init() throws MDBCServiceException { + MdbcTestUtils.initCassandra(); - MusicDataStoreHandle.mDstoreHandle = new MusicDataStore(cluster, session); - CassaLockStore store = new CassaLockStore(MusicDataStoreHandle.mDstoreHandle); - assertNotNull("Invalid configuration for music", store); } @AfterClass @@ -102,59 +102,52 @@ public class MusicMixinTest { } @Before - public void initTest(){ - session.execute("DROP KEYSPACE IF EXISTS "+keyspace); + public void initTest() throws MDBCServiceException { + session = MdbcTestUtils.getSession(); + session.execute("DROP KEYSPACE IF EXISTS "+ MdbcTestUtils.getKeyspace()); + mixin=MdbcTestUtils.getMusicMixin(); + } + + //@Test(timeout=10000) + @Ignore // TODO: Move ownership tests to OwnershipAndCheckpointTest + @Test + public void own() { + Range range = new Range("TEST.TABLE1"); + List<Range> ranges = new ArrayList<>(); + ranges.add(range); + DatabasePartition partition=null; try { - Properties properties = new Properties(); - properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,keyspace); - properties.setProperty(MusicMixin.KEY_MY_ID,mdbcServerName); - properties.setProperty(MusicMixin.KEY_COMPRESSION, Boolean.toString(true)); - mixin=new MusicMixin(stateManager, mdbcServerName,properties); - } catch (MDBCServiceException e) { - fail("error creating music mixin"); + partition = TestUtils.createBasicRow(range, mixin, MdbcTestUtils.getServerName()); + } + catch(Exception e){ + fail("fail to create partition"); + } + try { + TestUtils.unlockRow(MdbcTestUtils.getKeyspace(),MdbcTestUtils.getMriTableName(),partition); + } catch (MusicLockingException e) { + fail(e.getMessage()); } + DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey()); + try { + mixin.getStateManager().getOwnAndCheck().own(mixin,ranges,currentPartition, MDBCUtils.generateTimebasedUniqueKey()); + } catch (MDBCServiceException e) { + fail("failure when running own function"); + } } - //Own has been removed from musicMixin -// @Test(timeout=10000) -// public void own() { -// Range range = new Range("TABLE1"); -// List<Range> ranges = new ArrayList<>(); -// ranges.add(range); -// DatabasePartition partition=null; -// try { -// partition = TestUtils.createBasicRow(range, mixin, mdbcServerName); -// } -// catch(Exception e){ -// fail(e.getMessage()); -// } -// try { -// TestUtils.unlockRow(keyspace,mriTableName,partition); -// } catch (MusicLockingException e) { -// fail(e.getMessage()); -// } -// -// DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey()); -// try { -// mixin.own(ranges,currentPartition, MDBCUtils.generateTimebasedUniqueKey()); -// } catch (MDBCServiceException e) { -// fail("failure when running own function"); -// } -// } - private DatabasePartition addRow(List<Range> ranges,boolean isLatest){ final UUID uuid = MDBCUtils.generateTimebasedUniqueKey(); DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null); MusicRangeInformationRow newRow = new MusicRangeInformationRow(uuid,dbPartition, new ArrayList<>(), "", - mdbcServerName, isLatest); + MdbcTestUtils.getServerName(), isLatest); DatabasePartition partition=null; try { partition = mixin.createMusicRangeInformation(newRow); } catch (MDBCServiceException e) { fail("failure when creating new row"); } - String fullyQualifiedMriKey = keyspace+"."+ mriTableName+"."+partition.getMRIIndex().toString(); + String fullyQualifiedMriKey = MdbcTestUtils.getKeyspace()+"."+ MdbcTestUtils.getMriTableName()+"."+partition.getMRIIndex().toString(); try { MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId()); } catch (MusicLockingException e) { @@ -163,81 +156,81 @@ public class MusicMixinTest { return partition; } - //Own has been removed from musicMixin -// @Test(timeout=10000) -// public void own2() throws InterruptedException, MDBCServiceException { -// List<Range> range12 = new ArrayList<>( Arrays.asList( -// new Range("RANGE1"), -// new Range("RANGE2") -// )); -// List<Range> range34 = new ArrayList<>( Arrays.asList( -// new Range("RANGE3"), -// new Range("RANGE4") -// )); -// List<Range> range24 = new ArrayList<>( Arrays.asList( -// new Range("RANGE2"), -// new Range("RANGE4") -// )); -// List<Range> range123 = new ArrayList<>( Arrays.asList( -// new Range("RANGE1"), -// new Range("RANGE2"), -// new Range("RANGE3") -// )); -// DatabasePartition db1 = addRow(range12, false); -// DatabasePartition db2 = addRow(range34, false); -// MILLISECONDS.sleep(10); -// DatabasePartition db3 = addRow(range12, true); -// DatabasePartition db4 = addRow(range34, true); -// MILLISECONDS.sleep(10); -// DatabasePartition db5 = addRow(range24, true); -// DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey()); -// MusicInterface.OwnershipReturn own = null; -// try { -// own = mixin.own(range123, currentPartition, MDBCUtils.generateTimebasedUniqueKey()); -// } catch (MDBCServiceException e) { -// fail("failure when running own function"); -// } -// Dag dag = own.getDag(); -// -// DagNode node4 = dag.getNode(db4.getMRIIndex()); -// assertFalse(node4.hasNotIncomingEdges()); -// List<DagNode> outgoingEdges = new ArrayList<>(node4.getOutgoingEdges()); -// assertEquals(1,outgoingEdges.size()); -// -// DagNode missing = outgoingEdges.get(0); -// Set<Range> missingRanges = missing.getRangeSet(); -// assertEquals(2,missingRanges.size()); -// assertTrue(missingRanges.contains(new Range("RANGE1"))); -// assertTrue(missingRanges.contains(new Range("RANGE3"))); -// List<DagNode> outgoingEdges1 = missing.getOutgoingEdges(); -// assertEquals(1,outgoingEdges1.size()); -// -// DagNode finalNode = outgoingEdges1.get(0); -// assertFalse(finalNode.hasNotIncomingEdges()); -// Set<Range> finalSet = finalNode.getRangeSet(); -// assertEquals(3,finalSet.size()); -// assertTrue(finalSet.contains(new Range("RANGE1"))); -// assertTrue(finalSet.contains(new Range("RANGE2"))); -// assertTrue(finalSet.contains(new Range("RANGE3"))); -// -// DagNode node5 = dag.getNode(db5.getMRIIndex()); -// List<DagNode> toRemoveOutEdges = node5.getOutgoingEdges(); -// assertEquals(1,toRemoveOutEdges.size()); -// toRemoveOutEdges.remove(finalNode); -// assertEquals(0,toRemoveOutEdges.size()); -// -// MusicRangeInformationRow row = mixin.getMusicRangeInformation(own.getRangeId()); -// assertTrue(row.getIsLatest()); -// DatabasePartition dbPartition = row.getDBPartition(); -// List<Range> snapshot = dbPartition.getSnapshot(); -// assertEquals(3,snapshot.size()); -// MusicRangeInformationRow node5row = mixin.getMusicRangeInformation(node5.getId()); -// assertFalse(node5row.getIsLatest()); -// MusicRangeInformationRow node4Row = mixin.getMusicRangeInformation(db4.getMRIIndex()); -// assertFalse(node4Row.getIsLatest()); -// MusicRangeInformationRow node3Row = mixin.getMusicRangeInformation(db3.getMRIIndex()); -// assertFalse(node3Row.getIsLatest()); -// } + @Ignore // TODO: Move ownership tests to OwnershipAndCheckpointTest + @Test(timeout=1000) + public void own2() throws InterruptedException, MDBCServiceException { + List<Range> range12 = new ArrayList<>( Arrays.asList( + new Range("TEST.RANGE1"), + new Range("TEST.RANGE2") + )); + List<Range> range34 = new ArrayList<>( Arrays.asList( + new Range("TEST.RANGE3"), + new Range("TEST.RANGE4") + )); + List<Range> range24 = new ArrayList<>( Arrays.asList( + new Range("TEST.RANGE2"), + new Range("TEST.RANGE4") + )); + List<Range> range123 = new ArrayList<>( Arrays.asList( + new Range("TEST.RANGE1"), + new Range("TEST.RANGE2"), + new Range("TEST.RANGE3") + )); + DatabasePartition db1 = addRow(range12, false); + DatabasePartition db2 = addRow(range34, false); + MILLISECONDS.sleep(10); + DatabasePartition db3 = addRow(range12, true); + DatabasePartition db4 = addRow(range34, true); + MILLISECONDS.sleep(10); + DatabasePartition db5 = addRow(range24, true); + DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey()); + MusicInterface.OwnershipReturn own = null; + try { + own = mixin.getStateManager().getOwnAndCheck().own(mixin,range123, currentPartition, MDBCUtils.generateTimebasedUniqueKey()); + } catch (MDBCServiceException e) { + fail("failure when running own function"); + } + Dag dag = own.getDag(); + + DagNode node4 = dag.getNode(db4.getMRIIndex()); + assertFalse(node4.hasNotIncomingEdges()); + List<DagNode> outgoingEdges = new ArrayList<>(node4.getOutgoingEdges()); + assertEquals(1,outgoingEdges.size()); + + DagNode missing = outgoingEdges.get(0); + Set<Range> missingRanges = missing.getRangeSet(); + assertEquals(2,missingRanges.size()); + assertTrue(missingRanges.contains(new Range("TEST.RANGE1"))); + assertTrue(missingRanges.contains(new Range("TEST.RANGE3"))); + List<DagNode> outgoingEdges1 = missing.getOutgoingEdges(); + assertEquals(1,outgoingEdges1.size()); + + DagNode finalNode = outgoingEdges1.get(0); + assertFalse(finalNode.hasNotIncomingEdges()); + Set<Range> finalSet = finalNode.getRangeSet(); + assertEquals(3,finalSet.size()); + assertTrue(finalSet.contains(new Range("TEST.RANGE1"))); + assertTrue(finalSet.contains(new Range("TEST.RANGE2"))); + assertTrue(finalSet.contains(new Range("TEST.RANGE3"))); + + DagNode node5 = dag.getNode(db5.getMRIIndex()); + List<DagNode> toRemoveOutEdges = node5.getOutgoingEdges(); + assertEquals(1,toRemoveOutEdges.size()); + toRemoveOutEdges.remove(finalNode); + assertEquals(0,toRemoveOutEdges.size()); + + MusicRangeInformationRow row = mixin.getMusicRangeInformation(own.getRangeId()); + assertTrue(row.getIsLatest()); + DatabasePartition dbPartition = row.getDBPartition(); + List<Range> snapshot = dbPartition.getSnapshot(); + assertEquals(3,snapshot.size()); + MusicRangeInformationRow node5row = mixin.getMusicRangeInformation(node5.getId()); + assertFalse(node5row.getIsLatest()); + MusicRangeInformationRow node4Row = mixin.getMusicRangeInformation(db4.getMRIIndex()); + assertFalse(node4Row.getIsLatest()); + MusicRangeInformationRow node3Row = mixin.getMusicRangeInformation(db3.getMRIIndex()); + assertFalse(node3Row.getIsLatest()); + } @Test public void relinquish() { diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java new file mode 100644 index 0000000..2134a79 --- /dev/null +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java @@ -0,0 +1,220 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ + +package org.onap.music.mdbc.mixins; + +import static org.junit.Assert.*; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.mdbc.MdbcTestUtils; +import org.onap.music.mdbc.MdbcTestUtils.DBType; +import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.TableInfo; +import org.onap.music.mdbc.tables.StagingTable; + +public class PostgresMixinTest { + final private static String keyspace="metricmusictest"; + final private static String mdbcServerName = "name"; + + static PostgresMixin mixin; + + private static MusicMixin mi = null; + private static Connection conn; + + @BeforeClass + public static void init() throws MDBCServiceException { + MdbcTestUtils.initCassandra(); + mi=MdbcTestUtils.getMusicMixin(); + try { + conn = MdbcTestUtils.getConnection(DBType.POSTGRES); + Properties info = new Properties(); + mixin = new PostgresMixin(mi, null, conn, info); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + + @AfterClass + public static void close(){ + //TODO: shutdown cassandra + mixin=null; + MdbcTestUtils.cleanDatabase(DBType.POSTGRES); + MdbcTestUtils.stopCassandra(); + } + + @Test + public void getMixinName() { + final String mixinName = mixin.getMixinName(); + assertEquals(mixinName.toLowerCase(),"postgres"); + } + + @Test + public void getSQLTableSet() { + createTestTable(); + final Set<String> sqlTableSet = mixin.getSQLTableSet(); + assertEquals(1,sqlTableSet.size()); + assertTrue(sqlTableSet.contains("testtable")); + } + + @Test + public void getTableInfo() { + createTestTable(); + final TableInfo tableInfo = mixin.getTableInfo("testtable"); + assertNotNull(tableInfo); + assertEquals(3,tableInfo.columns.size()); + int index=0; + for(String col: tableInfo.columns) { + switch(col.toLowerCase()){ + case "ix": + assertTrue(tableInfo.iskey.get(index)); + assertEquals(Types.INTEGER, tableInfo.coltype.get(index).intValue()); + break; + case "test1": + assertFalse(tableInfo.iskey.get(index)); + assertEquals(Types.CHAR, tableInfo.coltype.get(index).intValue()); + break; + case "test2": + assertFalse(tableInfo.iskey.get(index)); + assertEquals(Types.VARCHAR, tableInfo.coltype.get(index).intValue()); + break; + default: + fail(); + } + index++; + } + } + + private void createTestTable() { + try { + final Statement statement = conn.createStatement(); + statement.execute("CREATE TABLE IF NOT EXISTS testtable (IX SERIAL, test1 CHAR(1), test2 VARCHAR(255), PRIMARY KEY (IX));"); + statement.close(); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + + private void cleanTestTable() { + try { + final Statement statement = conn.createStatement(); + statement.execute("DELETE FROM testtable;"); + statement.close(); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void postStatementHook() { + createTestTable(); + mixin.createSQLTriggers("testtable"); + final String sqlOperation = "INSERT INTO testtable (test1,test2) VALUES ('u','test');"; + Statement stm=null; + try { + stm = conn.createStatement(); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + mixin.preStatementHook(sqlOperation); + try { + stm.execute(sqlOperation); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + StagingTable st=new StagingTable(); + mixin.postStatementHook(sqlOperation,st); + mixin.preCommitHook(); + assertFalse(st.isEmpty()); + } + + void checkEmptyTestTable(){ + ResultSet resultSet = mixin.executeSQLRead("SELECT * FROM testtable;"); + try { + assertFalse(resultSet.next()); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + + + void checkOneRowWithContents(String test1Val, String test2Val){ + ResultSet resultSet = mixin.executeSQLRead("SELECT * FROM testtable;"); + try { + assertTrue(resultSet.next()); + assertEquals(test1Val, resultSet.getString("test1")); + assertEquals(test2Val, resultSet.getString("test2")); + assertFalse(resultSet.next()); + } + catch(SQLException e){ + e.printStackTrace(); + fail(); + } + } + + @Test + public void applyTxDigest() { + createTestTable(); + mixin.createSQLTriggers("testtable"); + final String sqlOperation = "INSERT INTO testtable (test1,test2) VALUES ('u','test');"; + Statement stm=null; + try { + stm = conn.createStatement(); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + mixin.preStatementHook(sqlOperation); + try { + stm.execute(sqlOperation); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + StagingTable st=new StagingTable(); + mixin.postStatementHook(sqlOperation,st); + mixin.preCommitHook(); + assertFalse(st.isEmpty()); + cleanTestTable(); + checkEmptyTestTable(); + List<Range> ranges = new ArrayList<>(); + ranges.add(new Range("public.testtable")); + try { + mixin.applyTxDigest(st,ranges); + } catch (SQLException|MDBCServiceException e) { + e.printStackTrace(); + fail(); + } + checkOneRowWithContents("u","test"); + } +}
\ No newline at end of file diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java index da64595..9e6161a 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java @@ -59,7 +59,7 @@ public class DagTest { public void getDag() throws InterruptedException, MDBCServiceException { List<MusicRangeInformationRow> rows = new ArrayList<>(); List<Range> ranges = new ArrayList<>( Arrays.asList( - new Range("range1") + new Range("schema.range1") )); rows.add(createNewRow(new ArrayList<>(ranges),"",false)); MILLISECONDS.sleep(10); @@ -87,14 +87,14 @@ public class DagTest { public void getDag2() throws InterruptedException, MDBCServiceException { List<MusicRangeInformationRow> rows = new ArrayList<>(); List<Range> range1 = new ArrayList<>( Arrays.asList( - new Range("range1") + new Range("schema.range1") )); List<Range> range2 = new ArrayList<>( Arrays.asList( - new Range("range2") + new Range("schema.range2") )); List<Range> ranges = new ArrayList<>( Arrays.asList( - new Range("range2"), - new Range("range1") + new Range("schema.range2"), + new Range("schema.range1") )); rows.add(createNewRow(new ArrayList<>(range1),"",false)); MILLISECONDS.sleep(10); @@ -123,7 +123,7 @@ public class DagTest { public void nextToOwn() throws InterruptedException, MDBCServiceException { List<MusicRangeInformationRow> rows = new ArrayList<>(); List<Range> ranges = new ArrayList<>( Arrays.asList( - new Range("range1") + new Range("schema.range1") )); rows.add(createNewRow(new ArrayList<>(ranges),"",false)); MILLISECONDS.sleep(10); @@ -149,20 +149,20 @@ public class DagTest { public void nextToApply() throws InterruptedException { List<MusicRangeInformationRow> rows = new ArrayList<>(); List<Range> ranges = new ArrayList<>( Arrays.asList( - new Range("range1") + new Range("schema.range1") )); List<MusicTxDigestId> redo1 = new ArrayList<>(Arrays.asList( - new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0) + new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0) )); rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo1)); MILLISECONDS.sleep(10); List<MusicTxDigestId> redo2 = new ArrayList<>(Arrays.asList( - new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0) + new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0) )); rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo2)); MILLISECONDS.sleep(10); List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList( - new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0) + new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0) )); rows.add(createNewRow(new ArrayList<>(ranges),"",true,redo3)); Dag dag = Dag.getDag(rows, ranges); @@ -180,7 +180,7 @@ public class DagTest { assertEquals(0,pair.getKey().index); List<Range> value = pair.getValue(); assertEquals(1,value.size()); - assertEquals(new Range("range1"),value.get(0)); + assertEquals(new Range("schema.range1"),value.get(0)); pair = node.nextNotAppliedTransaction(rangesSet); transactionCounter++; } @@ -195,23 +195,23 @@ public class DagTest { Map<Range, Pair<MriReference, Integer>> alreadyApplied = new HashMap<>(); List<MusicRangeInformationRow> rows = new ArrayList<>(); List<Range> ranges = new ArrayList<>( Arrays.asList( - new Range("range1") + new Range("schema.range1") )); List<MusicTxDigestId> redo1 = new ArrayList<>(Arrays.asList( - new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0) + new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0) )); rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo1)); MILLISECONDS.sleep(10); List<MusicTxDigestId> redo2 = new ArrayList<>(Arrays.asList( - new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0), - new MusicTxDigestId(MDBCUtils.generateUniqueKey(),1) + new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0), + new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),1) )); MusicRangeInformationRow newRow = createNewRow(new ArrayList<>(ranges), "", false, redo2); - alreadyApplied.put(new Range("range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0)); + alreadyApplied.put(new Range("schema.range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0)); rows.add(newRow); MILLISECONDS.sleep(10); List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList( - new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0) + new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0) )); rows.add(createNewRow(new ArrayList<>(ranges),"",true,redo3)); Dag dag = Dag.getDag(rows, ranges); @@ -230,7 +230,7 @@ public class DagTest { assertEquals(2-nodeCounter,pair.getKey().index); List<Range> value = pair.getValue(); assertEquals(1,value.size()); - assertEquals(new Range("range1"),value.get(0)); + assertEquals(new Range("schema.range1"),value.get(0)); pair = node.nextNotAppliedTransaction(rangesSet); transactionCounter++; } @@ -244,14 +244,14 @@ public class DagTest { public void isDifferent() throws InterruptedException { List<MusicRangeInformationRow> rows = new ArrayList<>(); List<Range> range1 = new ArrayList<>( Arrays.asList( - new Range("range1") + new Range("schema.range1") )); List<Range> range2 = new ArrayList<>( Arrays.asList( - new Range("range2") + new Range("schema.range2") )); List<Range> ranges = new ArrayList<>( Arrays.asList( - new Range("range2"), - new Range("range1") + new Range("schema.range2"), + new Range("schema.range1") )); rows.add(createNewRow(new ArrayList<>(range1),"",false)); MILLISECONDS.sleep(10); @@ -277,14 +277,14 @@ public class DagTest { public void getOldestDoubles() throws InterruptedException, MDBCServiceException { List<MusicRangeInformationRow> rows = new ArrayList<>(); List<Range> range1 = new ArrayList<>( Arrays.asList( - new Range("range1") + new Range("schema.range1") )); List<Range> range2 = new ArrayList<>( Arrays.asList( - new Range("range2") + new Range("schema.range2") )); List<Range> ranges = new ArrayList<>( Arrays.asList( - new Range("range2"), - new Range("range1") + new Range("schema.range2"), + new Range("schema.range1") )); rows.add(createNewRow(new ArrayList<>(range1),"",false)); MILLISECONDS.sleep(10); @@ -306,15 +306,15 @@ public class DagTest { public void getIncompleteRangesAndDependents() throws InterruptedException, MDBCServiceException { List<MusicRangeInformationRow> rows = new ArrayList<>(); List<Range> range1 = new ArrayList<>( Arrays.asList( - new Range("range1") + new Range("schema.range1") )); List<Range> range2 = new ArrayList<>( Arrays.asList( - new Range("range2"), - new Range("range3") + new Range("schema.range2"), + new Range("schema.range3") )); List<Range> ranges = new ArrayList<>( Arrays.asList( - new Range("range2"), - new Range("range1") + new Range("schema.range2"), + new Range("schema.range1") )); rows.add(createNewRow(new ArrayList<>(range1),"",false)); MILLISECONDS.sleep(10); @@ -330,7 +330,7 @@ public class DagTest { List<Range> incomplete = incompleteRangesAndDependents.getKey(); Set<DagNode> dependents = incompleteRangesAndDependents.getValue(); assertEquals(1,incomplete.size()); - assertTrue(incomplete.contains(new Range("range3"))); + assertTrue(incomplete.contains(new Range("schema.range3"))); assertEquals(1,dependents.size()); assertTrue(dependents.contains(dag.getNode(rows.get(3).getPartitionIndex()))); } @@ -339,16 +339,16 @@ public class DagTest { public void getIncompleteRangesAndDependents2() throws InterruptedException, MDBCServiceException { List<MusicRangeInformationRow> rows = new ArrayList<>(); List<Range> range1 = new ArrayList<>( Arrays.asList( - new Range("range1"), - new Range("range4") + new Range("schema.range1"), + new Range("schema.range4") )); List<Range> range2 = new ArrayList<>( Arrays.asList( - new Range("range2"), - new Range("range3") + new Range("schema.range2"), + new Range("schema.range3") )); List<Range> ranges = new ArrayList<>( Arrays.asList( - new Range("range2"), - new Range("range1") + new Range("schema.range2"), + new Range("schema.range1") )); rows.add(createNewRow(new ArrayList<>(range1),"",false)); MILLISECONDS.sleep(10); @@ -364,8 +364,8 @@ public class DagTest { List<Range> incomplete = incompleteRangesAndDependents.getKey(); Set<DagNode> dependents = incompleteRangesAndDependents.getValue(); assertEquals(2,incomplete.size()); - assertTrue(incomplete.contains(new Range("range3"))); - assertTrue(incomplete.contains(new Range("range4"))); + assertTrue(incomplete.contains(new Range("schema.range3"))); + assertTrue(incomplete.contains(new Range("schema.range4"))); assertEquals(2,dependents.size()); assertTrue(dependents.contains(dag.getNode(rows.get(3).getPartitionIndex()))); assertTrue(dependents.contains(dag.getNode(rows.get(2).getPartitionIndex()))); @@ -375,20 +375,20 @@ public class DagTest { public void addNewNodeWithSearch() throws InterruptedException, MDBCServiceException { List<MusicRangeInformationRow> rows = new ArrayList<>(); List<Range> range1 = new ArrayList<>( Arrays.asList( - new Range("range1") + new Range("schema.range1") )); List<Range> range2 = new ArrayList<>( Arrays.asList( - new Range("range2"), - new Range("range3") + new Range("schema.range2"), + new Range("schema.range3") )); List<Range> ranges = new ArrayList<>( Arrays.asList( - new Range("range2"), - new Range("range1") + new Range("schema.range2"), + new Range("schema.range1") )); List<Range> allRanges = new ArrayList<>( Arrays.asList( - new Range("range2"), - new Range("range3"), - new Range("range1") + new Range("schema.range2"), + new Range("schema.range3"), + new Range("schema.range1") )); rows.add(createNewRow(new ArrayList<>(range1),"",false)); MILLISECONDS.sleep(10); diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java index eb01bcd..f2fbd1f 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java @@ -34,7 +34,6 @@ import org.cassandraunit.utils.EmbeddedCassandraServerHelper; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.mockito.Mock; import org.mockito.Mockito; @@ -46,8 +45,10 @@ import org.onap.music.exceptions.MusicServiceException; import org.onap.music.lockingservice.cassandra.CassaLockStore; import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.MDBCUtils; +import org.onap.music.mdbc.MdbcTestUtils.DBType; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.StateManager; +import org.onap.music.mdbc.MdbcTestUtils; import org.onap.music.mdbc.TestUtils; import org.onap.music.mdbc.mixins.LockResult; import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn; @@ -58,13 +59,8 @@ import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.TxCommitProgress; public class OwnershipAndCheckpointTest { - final private static int sqlPort = 13350; - final private static String keyspace="metricmusictest"; - final private static String mriTableName = "musicrangeinformation"; - final private static String mtdTableName = "musictxdigest"; - final private static String mdbcServerName = "name"; - public static final String DATABASE = "mdbcTest"; - public static final String TABLE= "PERSONS"; + public static final String DATABASE = MdbcTestUtils.mariaDBDatabaseName; + public static final String TABLE= MdbcTestUtils.mariaDBDatabaseName+".PERSONS"; public static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS " + TABLE + " (\n" + " PersonID int,\n" + " LastName varchar(255),\n" + @@ -75,11 +71,7 @@ public class OwnershipAndCheckpointTest { ");"; public static final String DROP_TABLE = "DROP TABLE IF EXISTS " + TABLE + ";"; //Properties used to connect to music - private static Cluster cluster; - private static Session session; - private static String cassaHost = "localhost"; private static MusicMixin musicMixin = null; - private static DB db; Connection conn; MySQLMixin mysqlMixin; OwnershipAndCheckpoint ownAndCheck; @@ -90,37 +82,18 @@ public class OwnershipAndCheckpointTest { @BeforeClass public static void init() throws MusicServiceException, ClassNotFoundException, ManagedProcessException { - try { - EmbeddedCassandraServerHelper.startEmbeddedCassandra(); - } catch (Exception e) { - fail(e.getMessage()); - } - cluster=EmbeddedCassandraServerHelper.getCluster(); - //cluster = new Cluster.Builder().addContactPoint(cassaHost).withPort(9142).build(); - cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(20000); - assertNotNull("Invalid configuration for cassandra", cluster); - session = EmbeddedCassandraServerHelper.getSession(); - assertNotNull("Invalid configuration for cassandra", session); + MdbcTestUtils.initCassandra(); Class.forName("org.mariadb.jdbc.Driver"); - MusicDataStoreHandle.mDstoreHandle = new MusicDataStore(cluster, session); - CassaLockStore store = new CassaLockStore(MusicDataStoreHandle.mDstoreHandle); - assertNotNull("Invalid configuration for music", store); //start embedded mariadb - db = DB.newEmbeddedDB(sqlPort); - db.start(); - db.createDB(DATABASE); + MdbcTestUtils.startMariaDb(); } @AfterClass public static void close() throws MusicServiceException, MusicQueryException, ManagedProcessException { //TODO: shutdown cassandra musicMixin=null; - db.stop(); - try { - EmbeddedCassandraServerHelper.cleanEmbeddedCassandra(); - } - catch(NullPointerException e){ - } + MdbcTestUtils.cleanDatabase(DBType.MySQL); + MdbcTestUtils.stopCassandra(); } private void dropTable() throws SQLException { @@ -144,25 +117,34 @@ public class OwnershipAndCheckpointTest { @Before public void initTest() throws SQLException { - session.execute("DROP KEYSPACE IF EXISTS "+keyspace); + MdbcTestUtils.getSession().execute("DROP KEYSPACE IF EXISTS "+MdbcTestUtils.getKeyspace()); try { Properties properties = new Properties(); +/* properties.setProperty(MusicMixin.KEY_MY_ID,mdbcServerName); properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,keyspace); properties.setProperty(MusicMixin.KEY_MUSIC_RFACTOR,"1"); //StateManager stateManager = new StateManager("dbUrl", properties, "serverName", "dbName"); ownAndCheck = new OwnershipAndCheckpoint(); musicMixin =new MusicMixin(stateManager, mdbcServerName,properties); +*/ + properties.setProperty(MusicMixin.KEY_MY_ID,MdbcTestUtils.getServerName()); + properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,MdbcTestUtils.getKeyspace()); + properties.setProperty(MusicMixin.KEY_MUSIC_RFACTOR,"1"); + properties.setProperty(MusicMixin.KEY_MUSIC_ADDRESS,MdbcTestUtils.getCassandraUrl()); + ownAndCheck = new OwnershipAndCheckpoint(); + musicMixin =new MusicMixin(stateManager, MdbcTestUtils.getServerName(), properties); } catch (MDBCServiceException e) { fail("error creating music musicMixin " + e.getMessage()); } - this.conn = DriverManager.getConnection("jdbc:mariadb://localhost:"+sqlPort+"/"+DATABASE, "root", ""); - this.mysqlMixin = new MySQLMixin(musicMixin, "localhost:"+sqlPort+"/"+DATABASE, conn, null); + this.conn = MdbcTestUtils.getConnection(DBType.MySQL); + Properties info = new Properties(); + this.mysqlMixin = new MySQLMixin(musicMixin, "localhost:"+MdbcTestUtils.getMariaDbPort()+"/"+DATABASE, conn, info); dropAndCreateTable(); } private void initDatabase(Range range) throws MDBCServiceException, SQLException { - final DatabasePartition partition = TestUtils.createBasicRow(range, musicMixin, mdbcServerName); + final DatabasePartition partition = TestUtils.createBasicRow(range, musicMixin, MdbcTestUtils.getServerName()); String sqlOperation = "INSERT INTO "+TABLE+" (PersonID,LastName,FirstName,Address,City) VALUES "+ "(1,'SAUREZ','ENRIQUE','GATECH','ATLANTA');"; StagingTable stagingTable = new StagingTable(); @@ -171,13 +153,15 @@ public class OwnershipAndCheckpointTest { executeStatement.execute(sqlOperation); this.conn.commit(); mysqlMixin.postStatementHook(sqlOperation,stagingTable); + mysqlMixin.preCommitHook(); executeStatement.close(); String id = MDBCUtils.generateUniqueKey().toString(); TxCommitProgress progressKeeper = new TxCommitProgress(); progressKeeper.createNewTransactionTracker(id ,this.conn); musicMixin.commitLog(partition, null, stagingTable, id, progressKeeper); try { - TestUtils.unlockRow(keyspace, mriTableName, partition); +// TestUtils.unlockRow(keyspace, mriTableName, partition); + TestUtils.unlockRow(MdbcTestUtils.getKeyspace(), MdbcTestUtils.getMriTableName(), partition); } catch(Exception e){ fail(e.getMessage()); |