diff options
Diffstat (limited to 'mdbc-server/src/main')
26 files changed, 1806 insertions, 195 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java index ced5745..91b13f3 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java @@ -28,6 +28,8 @@ public class Configuration { public static final String KEY_DB_MIXIN_NAME = "MDBC_DB_MIXIN"; /** The property name to use to select the MUSIC 'mixin'. */ public static final String KEY_MUSIC_MIXIN_NAME = "MDBC_MUSIC_MIXIN"; + /** The property name to select if async staging table update is used */ + public static final String KEY_ASYNC_STAGING_TABLE_UPDATE = "ASYNC_STAGING_TABLE_UPDATE"; /** The name of the default mixin to use for the DBInterface. */ public static final String DB_MIXIN_DEFAULT = "mysql";//"h2"; /** The name of the default mixin to use for the MusicInterface. */ @@ -44,4 +46,6 @@ public class Configuration { public static final long DEFAULT_OWNERSHIP_TIMEOUT = 5*60*60*1000;//default of 5 hours /** The property name to provide comma separated list of ranges to warmup */ public static final String KEY_WARMUPRANGES = "warmupranges"; + /** Default async staging table update o ption*/ + public static final String ASYNC_STAGING_TABLE_UPDATE = "false"; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java index 32d456e..ae2b869 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java @@ -98,10 +98,25 @@ public class MDBCUtils { } public static List<Range> getTables(Map<String,List<SQLOperation>> queryParsed){ + return getTables(null, queryParsed); + } + + public static List<Range> getTables(String defaultDatabaseName, Map<String,List<SQLOperation>> queryParsed){ List<Range> ranges = new ArrayList<>(); for(String table: queryParsed.keySet()){ - ranges.add(new Range(table)); - } + String[] parts = table.split("\\."); + if(parts.length==2){ + ranges.add(new Range(table)); + } + else if(parts.length==1 && defaultDatabaseName!=null){ + ranges.add(new Range(defaultDatabaseName+"."+table)); + } + else{ + throw new IllegalArgumentException("Table should either have only one '.' or none at all, the table " + + "received is "+table); + } + + } return ranges; } }
\ No newline at end of file diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java index 3db6c3f..f1ac851 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java @@ -160,18 +160,7 @@ public class MdbcConnection implements Connection { if(progressKeeper!=null) progressKeeper.commitRequested(id); logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b); if (b) { - // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction - if(id == null || id.isEmpty()) { - logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); - throw new SQLException("tx id is null"); - } - try { - mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper); - } catch (MDBCServiceException e) { - // TODO Auto-generated catch block - logger.error("Cannot commit log to music" + e.getStackTrace()); - throw new SQLException(e.getMessage(), e); - } + musicCommit(); } if(progressKeeper!=null) { progressKeeper.setMusicDone(id); @@ -191,13 +180,7 @@ public class MdbcConnection implements Connection { return jdbcConn.getAutoCommit(); } - /** - * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed, - * they are performed now and copied into MUSIC. - * @throws SQLException - */ - @Override - public void commit() throws SQLException { + private void musicCommit() throws SQLException { if(progressKeeper.isComplete(id)) { return; } @@ -205,6 +188,7 @@ public class MdbcConnection implements Connection { progressKeeper.commitRequested(id); } + dbi.preCommitHook(); try { logger.debug(EELFLoggerDelegate.applicationLogger, " commit "); // transaction was committed -- add all the updates into the REDO-Log in MUSIC @@ -214,6 +198,16 @@ public class MdbcConnection implements Connection { logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL); throw new SQLException("Failure commiting to MUSIC", e); } + } + + /** + * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed, + * they are performed now and copied into MUSIC. + * @throws SQLException + */ + @Override + public void commit() throws SQLException { + musicCommit(); if(progressKeeper != null) { progressKeeper.setMusicDone(id); @@ -273,9 +267,10 @@ public class MdbcConnection implements Connection { } catch (MDBCServiceException e) { throw new SQLException("Failure during relinquish of partition",e); } - // Warning! Make sure this call remains AFTER the call to jdbcConn.close(), - // otherwise you're going to get stuck in an infinite loop. - statemanager.closeConnection(id); + + // Warning! Make sure this call remains AFTER the call to jdbcConn.close(), + // otherwise you're going to get stuck in an infinite loop. + statemanager.closeConnection(id); } @Override @@ -507,7 +502,8 @@ public class MdbcConnection implements Connection { //Parse tables from the sql query Map<String, List<SQLOperation>> tableToInstruction = QueryProcessor.parseSqlQuery(sql); //Check ownership of keys - List<Range> queryTables = MDBCUtils.getTables(tableToInstruction); + String defaultSchema = dbi.getSchema(); + List<Range> queryTables = MDBCUtils.getTables(defaultSchema, tableToInstruction); if (this.partition!=null) { List<Range> snapshot = this.partition.getSnapshot(); if(snapshot!=null){ @@ -550,18 +546,15 @@ public class MdbcConnection implements Connection { logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1); for (String tableName : set1) { // This map will be filled in if this table was previously discovered - tableName = tableName.toUpperCase(); - if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) { + if (!table_set.contains(tableName.toUpperCase()) && !dbi.getReservedTblNames().contains(tableName.toUpperCase())) { logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName); try { TableInfo ti = dbi.getTableInfo(tableName); - mi.initializeMusicForTable(ti,tableName); - //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted - ti = dbi.getTableInfo(tableName); - mi.createDirtyRowTable(ti,tableName); + //mi.initializeMusicForTable(ti,tableName); + //mi.createDirtyRowTable(ti,tableName); dbi.createSQLTriggers(tableName); - table_set.add(tableName); - dbi.synchronizeData(tableName); + table_set.add(tableName.toUpperCase()); + //dbi.synchronizeData(tableName); logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" + table_set.size() + "/" + set1.size() + "tables uploaded"); } catch (Exception e) { diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java index 42b9710..500ed81 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java @@ -80,6 +80,15 @@ public class MdbcServer { Properties connectionProps = new Properties(); connectionProps.put("user", user); connectionProps.put("password", password); + String defaultMusicMixin = Utils.getDefaultMusicMixin(); + if(defaultMusicMixin!=null){ + connectionProps.put(Configuration.KEY_MUSIC_MIXIN_NAME,defaultMusicMixin); + } + String defaultDBMixin = Utils.getDefaultDBMixin(); + if(defaultMusicMixin!=null){ + connectionProps.put(Configuration.KEY_DB_MIXIN_NAME,defaultDBMixin); + } + Utils.registerDefaultDrivers(); meta = new MdbcServerLogic(url,connectionProps,config); LocalService service = new LocalService(meta); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java index 16e7170..41aed26 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java @@ -23,6 +23,9 @@ import java.io.Serializable; import java.util.List; import java.util.Objects; +import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.mdbc.mixins.MusicMixin; + /** * This class represent a range of the whole database @@ -32,13 +35,21 @@ import java.util.Objects; */ public class Range implements Cloneable{ + private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Range.class); + private String table; public Range(String table) { - this.table = table.toUpperCase(); + final String[] split = table.split("\\."); + if(split.length!=2){ + logger.debug("Table should contain schema, received in constructor: " + table); +// throw new IllegalArgumentException("Table should always contain the schema, table received in " +// + "constructor is "+table); + } + this.table = table; } - public String toString(){return table.toUpperCase();} + public String toString(){return table;} /** * Compares to Range types @@ -55,7 +66,7 @@ public class Range implements Cloneable{ @Override public int hashCode(){ - return table.hashCode(); + return table.toUpperCase().hashCode(); } @Override @@ -74,11 +85,11 @@ public class Range implements Cloneable{ public static boolean overlaps(List<Range> ranges, String table){ //\TODO check if parallel stream makes sense here - return ranges.stream().map((Range r) -> r.table.equals(table)).anyMatch((Boolean b) -> b); + return ranges.stream().map((Range r) -> r.table.toUpperCase().equals(table.toUpperCase())).anyMatch((Boolean b) -> b); } public boolean overlaps(Range other) { - return table.equals(other.table); + return table.toUpperCase().equals(other.table.toUpperCase()); } public String getTable() { diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index e4c4a24..18bc0db 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -21,7 +21,6 @@ package org.onap.music.mdbc; import java.util.ArrayList; import java.util.List; -import java.util.Set; import org.apache.commons.lang3.tuple.Pair; import org.onap.music.exceptions.MDBCServiceException; @@ -47,6 +46,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -102,10 +102,10 @@ public class StateManager { public StateManager() { } - public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException { + public StateManager(String sqlDBUrl, Properties newInfo, String mdbcServerName, String sqlDBName) throws MDBCServiceException { this.sqlDBName = sqlDBName; - this.sqlDBUrl = sqlDBUrl; - this.info = info; + this.sqlDBUrl = cleanSqlUrl(sqlDBUrl); + this.info = new Properties(); this.mdbcServerName = mdbcServerName; this.connectionRanges = new ConcurrentHashMap<>(); @@ -117,6 +117,7 @@ public class StateManager { } catch (IOException e) { logger.error(EELFLoggerDelegate.errorLogger, e.getMessage()); } + info.putAll(newInfo); cassandraUrl = info.getProperty(Configuration.KEY_CASSANDRA_URL, Configuration.CASSANDRA_URL_DEFAULT); musicmixin = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT); @@ -129,6 +130,16 @@ public class StateManager { ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout); } + protected String cleanSqlUrl(String url){ + if(url!=null) { + url = url.trim(); + if (url.length() > 0 && url.charAt(url.length() - 1) == '/') { + url= url.substring(0, url.length() - 1); + } + } + return url; + } + protected void initTxDaemonThread(){ txDaemon = new Thread( new MusicTxDigestDaemon(Integer.parseInt( @@ -149,27 +160,21 @@ public class StateManager { } protected void initSqlDatabase() throws MDBCServiceException { - try { - //\TODO: pass the driver as a variable - Class.forName("org.mariadb.jdbc.Driver"); - } - catch (ClassNotFoundException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, - ErrorTypes.GENERALSERVICEERROR); - return; - } - try { - Connection sqlConnection = DriverManager.getConnection(this.sqlDBUrl, this.info); - StringBuilder sql = new StringBuilder("CREATE DATABASE IF NOT EXISTS ") + if(!this.sqlDBUrl.toLowerCase().startsWith("jdbc:postgresql")) { + try { + Connection sqlConnection = DriverManager.getConnection(this.sqlDBUrl, this.info); + StringBuilder sql = new StringBuilder("CREATE DATABASE IF NOT EXISTS ") .append(sqlDBName) .append(";"); - Statement stmt = sqlConnection.createStatement(); - stmt.execute(sql.toString()); - sqlConnection.close(); - } catch (SQLException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, + Statement stmt = sqlConnection.createStatement(); + stmt.execute(sql.toString()); + sqlConnection.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.UNKNOWNERROR, + ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR); - throw new MDBCServiceException(e.getMessage(), e); + throw new MDBCServiceException(e.getMessage(), e); + } } } @@ -306,17 +311,9 @@ public class StateManager { public Connection openConnection(String id) { Connection sqlConnection; MdbcConnection newConnection; - try { - //TODO: pass the driver as a variable - Class.forName("org.mariadb.jdbc.Driver"); - } - catch (ClassNotFoundException e) { - // TODO Auto-generated catch block - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, - ErrorTypes.QUERYERROR); - } - + Utils.registerDefaultDrivers(); //Create connection to local SQL DB + try { sqlConnection = DriverManager.getConnection(this.sqlDBUrl+"/"+this.sqlDBName, this.info); } catch (SQLException e) { diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java new file mode 100755 index 0000000..e5a3252 --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java @@ -0,0 +1,213 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ +package org.onap.music.mdbc; + +import com.datastax.driver.core.*; +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.exceptions.MusicLockingException; +import org.onap.music.lockingservice.cassandra.MusicLockState; +import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.main.MusicCore; +import org.onap.music.main.MusicUtil; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.*; +import org.onap.music.mdbc.mixins.MusicInterface; +import org.onap.music.mdbc.tables.MusicRangeInformationRow; + +public class TestUtils { + + private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TestUtils.class); + + public static DatabasePartition createBasicRow(Range range, MusicInterface mixin, String mdbcServerName) + throws MDBCServiceException { + final UUID uuid = MDBCUtils.generateTimebasedUniqueKey(); + List<Range> ranges = new ArrayList<>(); + ranges.add(range); + DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null); + MusicRangeInformationRow newRow = new MusicRangeInformationRow(uuid,dbPartition, new ArrayList<>(), "", + mdbcServerName, true); + DatabasePartition partition=null; + partition = mixin.createMusicRangeInformation(newRow); + return partition; + } + + public static void unlockRow(String keyspace, String mriTableName, DatabasePartition partition) + throws MusicLockingException { + String fullyQualifiedMriKey = keyspace+"."+ mriTableName+"."+partition.getMRIIndex().toString(); + MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId()); + } + + public static void createKeyspace(String keyspace, Session session) { + String queryOp = "CREATE KEYSPACE " + + keyspace + + " WITH REPLICATION " + + "= {'class':'SimpleStrategy', 'replication_factor':1}; "; + ResultSet res=null; + res = session.execute(queryOp); + } + + public static void deleteKeyspace(String keyspace, Session session){ + String queryBuilder = "DROP KEYSPACE " + + keyspace + + ";"; + ResultSet res = session.execute(queryBuilder); + } + + public static HashSet<String> getMriColNames(){ + return new HashSet<>( + Arrays.asList("rangeid","keys","txredolog","ownerid","metricprocessid") + ); + } + + public static HashSet<String> getMtdColNames(){ + return new HashSet<>( + Arrays.asList("txid","transactiondigest") + ); + } + + public static HashMap<String, DataType> getMriColTypes(Cluster cluster) throws Exception { + HashMap<String, DataType> expectedTypes = new HashMap<>(); + expectedTypes.put("rangeid",DataType.uuid()); + expectedTypes.put("keys",DataType.set(DataType.text())); + ProtocolVersion currentVer = cluster.getConfiguration().getProtocolOptions().getProtocolVersion(); + if(currentVer != null) { + throw new Exception("Protocol version for cluster is invalid"); + } + CodecRegistry registry = cluster.getConfiguration().getCodecRegistry(); + if(registry!= null) { + throw new Exception("Codec registry for cluster is invalid"); + } + expectedTypes.put("txredolog",DataType.list(TupleType.of(currentVer,registry,DataType.text(),DataType.uuid()))); + expectedTypes.put("ownerid",DataType.text()); + expectedTypes.put("metricprocessid",DataType.text()); + return expectedTypes; + } + + public static HashMap<String, DataType> getMtdColTypes(){ + HashMap<String,DataType> expectedTypes = new HashMap<>(); + expectedTypes.put("txid",DataType.uuid()); + expectedTypes.put("transactiondigest",DataType.text()); + return expectedTypes; + } + + public static void checkDataTypeForTable(List<ColumnMetadata> columnsMeta, HashSet<String> expectedColumns, + HashMap<String,DataType> expectedTypes) throws Exception { + for(ColumnMetadata cMeta : columnsMeta){ + String columnName = cMeta.getName(); + DataType type = cMeta.getType(); + if(!expectedColumns.contains(columnName)){ + throw new Exception("Invalid column name: "); + } + if(!expectedTypes.containsKey(columnName)){ + throw new Exception("Fix the contents of expectedtypes for column: "+columnName); + } + if(expectedTypes.get(columnName)!=type) { + throw new Exception("Invalid type for column: "+columnName); + } + } + } + + public static void readPropertiesFile(Properties prop) { + try { + String fileLocation = MusicUtil.getMusicPropertiesFilePath(); + InputStream fstream = new FileInputStream(fileLocation); + prop.load(fstream); + fstream.close(); + } catch (FileNotFoundException e) { + logger.error("Configuration file not found"); + + } catch (IOException e) { + // TODO Auto-generated catch block + logger.error("Exception when reading file: "+e.toString()); + } + } + + + public static void populateMusicUtilsWithProperties(Properties prop){ + //TODO: Learn how to do this properly within music + String[] propKeys = MusicUtil.getPropkeys(); + for (int k = 0; k < propKeys.length; k++) { + String key = propKeys[k]; + if (prop.containsKey(key) && prop.get(key) != null) { + switch (key) { + case "cassandra.host": + MusicUtil.setMyCassaHost(prop.getProperty(key)); + break; + case "music.ip": + MusicUtil.setDefaultMusicIp(prop.getProperty(key)); + break; + case "debug": + MusicUtil.setDebug(Boolean + .getBoolean(prop.getProperty(key).toLowerCase())); + break; + case "version": + MusicUtil.setVersion(prop.getProperty(key)); + break; + case "music.rest.ip": + MusicUtil.setMusicRestIp(prop.getProperty(key)); + break; + case "music.properties": + MusicUtil.setMusicPropertiesFilePath(prop.getProperty(key)); + break; + case "lock.lease.period": + MusicUtil.setDefaultLockLeasePeriod( + Long.parseLong(prop.getProperty(key))); + break; + case "my.id": + MusicUtil.setMyId(Integer.parseInt(prop.getProperty(key))); + break; + case "all.ids": + String[] ids = prop.getProperty(key).split(":"); + MusicUtil.setAllIds(new ArrayList<String>(Arrays.asList(ids))); + break; + case "public.ip": + MusicUtil.setPublicIp(prop.getProperty(key)); + break; + case "all.public.ips": + String[] ips = prop.getProperty(key).split(":"); + if (ips.length== 1) { + // Future use + } else if (ips.length > 1) { + MusicUtil.setAllPublicIps( + new ArrayList<String>(Arrays.asList(ips))); + } + break; + case "cassandra.user": + MusicUtil.setCassName(prop.getProperty(key)); + break; + case "cassandra.password": + MusicUtil.setCassPwd(prop.getProperty(key)); + break; + case "aaf.endpoint.url": + MusicUtil.setAafEndpointUrl(prop.getProperty(key)); + break; + default: + System.out.println("No case found for " + key); + } + } + } + + + } +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java new file mode 100644 index 0000000..7a09dca --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java @@ -0,0 +1,80 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ + +package org.onap.music.mdbc; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import org.onap.music.logging.EELFLoggerDelegate; + +public class Utils { + + + public static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Utils.class); + + static Properties retrieveMdbcProperties() { + Properties pr = null; + try { + pr = new Properties(); + pr.load(Utils.class.getResourceAsStream("/mdbc.properties")); + } catch (IOException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Could not load property file: " + e.getMessage()); + } + return pr; + } + + public static String getDefaultMusicMixin() { + Properties pr = retrieveMdbcProperties(); + if (pr == null) + return null; + String defaultMusicMixin = pr.getProperty("DEFAULT_MUSIC_MIXIN"); + return defaultMusicMixin; + } + + public static String getDefaultDBMixin() { + Properties pr = retrieveMdbcProperties(); + if (pr == null) + return null; + String defaultMusicMixin = pr.getProperty("DEFAULT_DB_MIXIN"); + return defaultMusicMixin; + } + + public static void registerDefaultDrivers() { + Properties pr = null; + try { + pr = new Properties(); + pr.load(Utils.class.getResourceAsStream("/mdbc.properties")); + } catch (IOException e) { + logger.error("Could not load property file > " + e.getMessage()); + } + + String drivers = pr.getProperty("DEFAULT_DRIVERS"); + for (String driver : drivers.split("[ ,]")) { + logger.info(EELFLoggerDelegate.applicationLogger, "Registering jdbc driver '" + driver + "'"); + try { + Class.forName(driver.trim()); + } catch (ClassNotFoundException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Driver class " + driver + " not found."); + } + } + } +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java index 38309d5..391ee1a 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java @@ -41,7 +41,7 @@ public class NodeConfiguration { public String nodeName; public String sqlDatabaseName; - public NodeConfiguration(String tables, String eventualTables, UUID mriIndex, String sqlDatabaseName, String node){ + public NodeConfiguration(String tables, List<String> eventualTables, UUID mriIndex, String sqlDatabaseName, String node){ // public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String mriTable, String lockId, String musicTxDigestTable) { partition = new DatabasePartition(toRanges(tables), mriIndex, null) ; eventual = new Eventual(toRanges(eventualTables)); @@ -49,16 +49,20 @@ public class NodeConfiguration { this.sqlDatabaseName = sqlDatabaseName; } + protected List<Range> toRanges(List<String> tables){ + List<Range> newRange = new ArrayList<>(); + for(String table: tables) { + newRange.add(new Range(table)); + } + return newRange; + } + protected List<Range> toRanges(String tables){ if(tables.isEmpty()){ return new ArrayList<>(); } - List<Range> newRange = new ArrayList<>(); String[] tablesArray=tables.split(","); - for(String table: tablesArray) { - newRange.add(new Range(table)); - } - return newRange; + return toRanges(new ArrayList<>(Arrays.asList(tablesArray))); } public String toJson() { diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java index 343a8b8..0598271 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java @@ -26,6 +26,7 @@ import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.mixins.MusicMixin; import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.mdbc.tables.MusicTxDigestDaemon; import com.google.gson.Gson; import org.onap.music.datastore.PreparedQueryObject; @@ -46,6 +47,7 @@ public class TablesConfiguration { private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TablesConfiguration.class); private List<PartitionInformation> partitions; + private List<String> eventual; String tableToPartitionName; private String musicNamespace; private String partitionInformationTableName; @@ -83,16 +85,17 @@ public class TablesConfiguration { logger.info("Creating empty row with id "+partitionId); MusicMixin.createEmptyMriRow(musicNamespace,partitionInfo.mriTableName,UUID.fromString(partitionId), - partitionInfo.owner,null,partitionInfo.getTables(),true); + partitionInfo.owner,null, + partitionInfo.getTables(),true); //3) Create config for this node StringBuilder newStr = new StringBuilder(); for(Range r: partitionInfo.tables){ - newStr.append(r.toString().toUpperCase()).append(","); + newStr.append(r.toString()).append(","); } logger.info("Appending row to configuration "+partitionId); - nodeConfigs.add(new NodeConfiguration(newStr.toString(),"",UUID.fromString(partitionId), + nodeConfigs.add(new NodeConfiguration(newStr.toString(),eventual,UUID.fromString(partitionId), sqlDatabaseName, partitionInfo.owner)); } return nodeConfigs; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json new file mode 100644 index 0000000..430525f --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json @@ -0,0 +1,11 @@ +{ + "internalNamespace": "music_internal", + "internalReplicationFactor": 1, + "musicNamespace": "namespace", + "musicReplicationFactor": 1, + "mriTableName": "musicrangeinformation", + "mtxdTableName": "musictxdigest", + "eventualMtxdTableName":"musicevetxdigest", + "nodeInfoTableName":"nodeinfo", + "rangeDependencyTableName":"musicrangedependency" +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json index 8cbbfec..559d597 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json @@ -3,21 +3,19 @@ { "tables": [ { - "table": "PERSONS" + "table": "public.PERSONS" } ], "owner": "", "mriTableName": "musicrangeinformation", - "mtxdTableName": "musictxdigest", "partitionId": "" } ], - "internalNamespace": "music_internal", - "internalReplicationFactor": 1, + "eventual": ["public.eventualPERSONS"], "musicNamespace": "namespace", - "musicReplicationFactor": 1, "tableToPartitionName": "tabletopartition", "partitionInformationTableName": "partitioninfo", "redoHistoryTableName": "redohistory", "sqlDatabaseName": "test" } + diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java new file mode 100644 index 0000000..543cbd0 --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java @@ -0,0 +1,100 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ + +package org.onap.music.mdbc.mixins; + +import java.util.concurrent.atomic.AtomicBoolean; +import javax.websocket.RemoteEndpoint.Async; +import org.onap.music.logging.EELFLoggerDelegate; + +public class AsyncUpdateHandler { + + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(AsyncUpdateHandler.class); + + Object finishMonitor; + Object taskMonitor; + AtomicBoolean pendingUpdate; + Runnable handler; + + AsyncUpdateHandler(Runnable handlerToRun){ + handler=handlerToRun; + finishMonitor=new Object(); + taskMonitor = new Object(); + pendingUpdate=new AtomicBoolean(false); + createUpdateExecutor(); + } + + void createUpdateExecutor(){ + Runnable newRunnable = ()-> UpdateExecutor(); + Thread newThread = new Thread(newRunnable); + newThread.setDaemon(true); + newThread.start(); + } + + public void processNewUpdate(){ + pendingUpdate.set(true); + synchronized (taskMonitor) { + taskMonitor.notifyAll(); + } + } + + void UpdateExecutor(){ + while(true) { + synchronized (taskMonitor) { + try { + if(!pendingUpdate.get()){ + taskMonitor.wait(); + } + } catch (InterruptedException e) { + logger.error("Update Executor received an interrup exception"); + } + } + startUpdate(); + } + } + + void startUpdate(){ + synchronized (finishMonitor) { + //Keep running until there are no more requests + while (pendingUpdate.getAndSet(false)) { + if(handler!=null){ + handler.run(); + } + } + finishMonitor.notifyAll(); + } + + } + + public void waitForAllPendingUpdates(){ + //Wait until there are no more requests and the thread is not longer running + //We could use a join, but given that the thread could be temporally null, then it is easier to + //separate the wait from the thread that is running + synchronized(finishMonitor){ + while(pendingUpdate.get()) { + try { + finishMonitor.wait(); + } catch (InterruptedException e) { + logger.error("waitForAllPendingUpdate received exception"); + } + } + } + } +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java index a594918..dd71d97 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; + import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; @@ -111,6 +112,13 @@ public interface DBInterface { * @return a ResultSet containing the rows returned from the query */ ResultSet executeSQLRead(String sql); + + /** + * This method is used to verify that all data structures needed for commit are ready for commit + * Either for the MusicMixin (e.g. staging table) or internally for the DBMixin + * Important this is not for actual commit operation, it should be treated as a signal. + */ + void preCommitHook(); void synchronizeData(String tableName); @@ -133,4 +141,6 @@ public interface DBInterface { void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException; Connection getSQLConnection(); + + String getSchema(); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java index d822615..324b0f6 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java @@ -61,12 +61,13 @@ public class MixinFactory { con = cl.getConstructor(MusicInterface.class, String.class, Connection.class, Properties.class); if (con != null) { logger.info(EELFLoggerDelegate.applicationLogger,"Found match: "+miname); - return (DBInterface) con.newInstance(mi, url, conn, info); + DBInterface newdbi = (DBInterface) con.newInstance(mi, url, conn, info); + return newdbi; } } } } catch (Exception e) { - logger.error(EELFLoggerDelegate.errorLogger,"createDBInterface: "+e); + logger.error(EELFLoggerDelegate.errorLogger,"createDBInterface error for "+cl.getName(),e); } } return null; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java index 71f1b8b..fc8897f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java @@ -303,8 +303,8 @@ public interface MusicInterface { void deleteOldMriRows(Map<UUID,String> oldRowsAndLocks) throws MDBCServiceException; List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException; - + void deleteMriRow(MusicRangeInformationRow row) throws MDBCServiceException; void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index cb37a55..5a322f3 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -61,7 +61,6 @@ import org.onap.music.mdbc.StateManager; import org.onap.music.mdbc.TableInfo; import org.onap.music.mdbc.ownership.Dag; import org.onap.music.mdbc.ownership.DagNode; -import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint; import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.MusicTxDigestId; @@ -99,6 +98,8 @@ public class MusicMixin implements MusicInterface { public static final String KEY_MUSIC_RFACTOR = "music_rfactor"; /** The property name to use to provide the replication factor for Cassandra. */ public static final String KEY_MUSIC_NAMESPACE = "music_namespace"; + /** The property name to use to provide a timeout to mdbc (ownership) */ + public static final String KEY_TIMEOUT = "mdbc_timeout"; /** The property name to use to provide a flag indicating if compression is required */ public static final String KEY_COMPRESSION = "mdbc_compression"; /** Namespace for the tables in MUSIC (Cassandra) */ @@ -106,13 +107,15 @@ public class MusicMixin implements MusicInterface { /** The default property value to use for the Cassandra IP address. */ public static final String DEFAULT_MUSIC_ADDRESS = "localhost"; /** The default property value to use for the Cassandra replication factor. */ - public static final int DEFAULT_MUSIC_RFACTOR = 1; + public static final int DEFAULT_MUSIC_RFACTOR = 3; + /** The default property value to use for the MDBC timeout */ + public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours /** The default primary string column, if none is provided. */ public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid"; /** Type of the primary key, if none is defined by the user */ public static final String MDBC_PRIMARYKEY_TYPE = "uuid"; public static final boolean DEFAULT_COMPRESSION = true; - + //TODO: Control network topology strategy with a configuration file entry public static final boolean ENABLE_NETWORK_TOPOLOGY_STRATEGY = false; //\TODO Add logic to change the names when required and create the tables when necessary @@ -254,6 +257,7 @@ public class MusicMixin implements MusicInterface { public static void createKeyspace(String keyspace, int replicationFactor) throws MDBCServiceException { Map<String,Object> replicationInfo = new HashMap<>(); replicationInfo.put("'class'", "'NetworkTopologyStrategy'"); + if (ENABLE_NETWORK_TOPOLOGY_STRATEGY && replicationFactor==3) { replicationInfo.put("'dc1'", 1); replicationInfo.put("'dc2'", 1); @@ -673,7 +677,7 @@ public class MusicMixin implements MusicInterface { } if(rs!=null) { for (Row row : rs) { - set.add(row.getString("TABLE_NAME").toUpperCase()); + set.add(row.getString("TABLE_NAME")); } } return set; @@ -1295,15 +1299,13 @@ public class MusicMixin implements MusicInterface { logger.warn("Trying tcommit log with null partition"); return; } + List<Range> snapshot = partition.getSnapshot(); if(snapshot==null || snapshot.isEmpty()){ logger.warn("Trying to commit log with empty ranges"); return; } - // first deal with commit for eventually consistent tables - filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper); - UUID mriIndex = partition.getMRIIndex(); String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex; //0. See if reference to lock was already created @@ -1475,7 +1477,7 @@ public class MusicMixin implements MusicInterface { for(TupleValue t: log){ //final String tableName = t.getString(0); final UUID id = t.getUUID(1); - digestIds.add(new MusicTxDigestId(id,index++)); + digestIds.add(new MusicTxDigestId(partitionIndex,id,index++)); } List<Range> partitions = new ArrayList<>(); Set<String> tables = newRow.getSet("keys",String.class); @@ -1526,11 +1528,14 @@ public class MusicMixin implements MusicInterface { pQueryObject.appendQueryString(cql); pQueryObject.addValue(baseRange.getTable()); Row newRow; + //TODO Change this when music fix the "." problem in the primary key + final String table = baseRange.getTable(); + final String tableWithoutDot = table.replaceAll("\\.",""); try { - newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,baseRange.getTable(),null); + newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,tableWithoutDot,null); } catch (MDBCServiceException e) { - logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName); - throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e); + logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName+" trying for table "+tableWithoutDot); + throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information for table "+tableWithoutDot, e); } return getRangeDependenciesFromCassandraRow(newRow); } @@ -1838,7 +1843,6 @@ public class MusicMixin implements MusicInterface { PreparedQueryObject query = new PreparedQueryObject(); int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR); - String cql = String.format("INSERT INTO %s.%s (txid,transactiondigest,compressed,year,txTimeId ) VALUES (?,?,?,?,now());",this.music_ns, this.musicEventualTxDigestTableName); query.appendQueryString(cql); @@ -1856,6 +1860,7 @@ public class MusicMixin implements MusicInterface { } } + @Override public StagingTable getTxDigest(MusicTxDigestId id) throws MDBCServiceException { String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName); @@ -1883,8 +1888,7 @@ public class MusicMixin implements MusicInterface { } return changes; } - - + @Override public LinkedHashMap<UUID, StagingTable> getEveTxDigest(String nodeName) throws MDBCServiceException { int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR); @@ -1901,7 +1905,7 @@ public class MusicMixin implements MusicInterface { LinkedHashMap<UUID, StagingTable> ecDigestInformation = new LinkedHashMap<>(); UUID musicevetxdigestNodeinfoTimeID = getTxTimeIdFromNodeInfo(nodeName); PreparedQueryObject pQueryObject = new PreparedQueryObject(); - + if (musicevetxdigestNodeinfoTimeID != null) { // this will fetch only few records based on the time-stamp condition. cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) AND txtimeid > ? LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString()); @@ -1912,7 +1916,7 @@ public class MusicMixin implements MusicInterface { cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString()); pQueryObject.appendQueryString(cql); } - + // I need to get a ResultSet of all the records and give each row to the below HashMap. ResultSet rs = executeMusicRead(pQueryObject); while (!rs.isExhausted()) { @@ -1920,8 +1924,8 @@ public class MusicMixin implements MusicInterface { ByteBuffer digest = row.getBytes("transactiondigest"); Boolean compressed = row.getBool("compressed"); //String txTimeId = row.getString("txtimeid"); //??? - UUID txTimeId = row.getUUID("txtimeid"); - + UUID txTimeId = row.getUUID("txtimeid"); + try { if(compressed){ digest=StagingTable.Decompress(digest); @@ -1932,7 +1936,7 @@ public class MusicMixin implements MusicInterface { throw e; } ecDigestInformation.put(txTimeId, changes); - } + } return ecDigestInformation; } @@ -2218,7 +2222,7 @@ public class MusicMixin implements MusicInterface { * @param cql the CQL to be sent to Cassandra */ private static void executeMusicWriteQuery(String keyspace, String table, String cql) - throws MDBCServiceException { + throws MDBCServiceException { PreparedQueryObject pQueryObject = new PreparedQueryObject(); pQueryObject.appendQueryString(cql); ResultType rt = null; @@ -2254,6 +2258,9 @@ public class MusicMixin implements MusicInterface { throw new MDBCServiceException("Error executing atomic get", e); } } + if(result==null){ + throw new MDBCServiceException("Error executing atomic get for primary key: "+primaryKey); + } if(result.isExhausted()){ return null; } @@ -2418,11 +2425,17 @@ public class MusicMixin implements MusicInterface { pQueryObject.addValue(row.getPartitionIndex()); ReturnType rt ; try { - rt = MusicCore.atomicPut(music_ns, musicRangeDependencyTableName, row.getPartitionIndex().toString(), + rt = MusicCore.atomicPut(music_ns, musicRangeInformationTableName, row.getPartitionIndex().toString(), pQueryObject, null); } catch (MusicLockingException|MusicQueryException|MusicServiceException e) { logger.error("Failure when deleting mri row"); new MDBCServiceException("Error deleting mri row",e); } } + + public StateManager getStateManager() { + return stateManager; + } + + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java index 6d6e691..15caf3f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java @@ -40,6 +40,7 @@ import org.json.JSONTokener; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.mdbc.Configuration; import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; @@ -75,7 +76,7 @@ public class MySQLMixin implements DBInterface { public static final String TRANS_TBL = "MDBC_TRANSLOG"; private static final String CREATE_TBL_SQL = "CREATE TABLE IF NOT EXISTS "+TRANS_TBL+ - " (IX INT AUTO_INCREMENT, OP CHAR(1), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA VARCHAR(1024), " + + " (IX INT AUTO_INCREMENT, OP CHAR(1), SCHEMANAME VARCHAR(255), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA VARCHAR(1024), " + "CONNECTION_ID INT, PRIMARY KEY (IX));"; private final MusicInterface mi; @@ -85,6 +86,10 @@ public class MySQLMixin implements DBInterface { private final Map<String, TableInfo> tables; private PreparedStatement deleteStagingStatement; private boolean server_tbl_created = false; + private boolean useAsyncStagingUpdate = false; + private Object stagingHandlerLock = new Object(); + private AsyncUpdateHandler stagingHandler = null; + private StagingTable currentStaging=null; public MySQLMixin() { this.mi = null; @@ -100,12 +105,35 @@ public class MySQLMixin implements DBInterface { this.dbName = getDBName(conn); this.jdbcConn = conn; this.tables = new HashMap<String, TableInfo>(); + useAsyncStagingUpdate = Boolean.parseBoolean(info.getProperty(Configuration.KEY_ASYNC_STAGING_TABLE_UPDATE, + Configuration.ASYNC_STAGING_TABLE_UPDATE)); + } + + class StagingTableUpdateRunnable implements Runnable{ + + private MySQLMixin mixin; + private StagingTable staging; + + StagingTableUpdateRunnable(MySQLMixin mixin, StagingTable staging){ + this.mixin=mixin; + this.staging=staging; + } + + @Override + public void run() { + try { + this.mixin.updateStagingTable(staging); + } catch (NoSuchFieldException|MDBCServiceException e) { + this.mixin.logger.error("Error when updating the staging table"); + } + } } private PreparedStatement getStagingDeletePreparedStatement() throws SQLException { return jdbcConn.prepareStatement("DELETE FROM "+TRANS_TBL+" WHERE (IX BETWEEN ? AND ? ) AND " + "CONNECTION_ID = ?;"); } + // This is used to generate a unique connId for this connection to the DB. private int generateConnID(Connection conn) { int rv = (int) System.currentTimeMillis(); // random-ish @@ -156,12 +184,20 @@ public class MySQLMixin implements DBInterface { } return dbname; } - + + @Override public String getDatabaseName() { return this.dbName; } @Override + public String getSchema() {return this.dbName;} + + /** + * Get a set of the table names in the database. + * @return the set + */ + @Override public Set<String> getSQLTableSet() { Set<String> set = new TreeSet<String>(); String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'"; @@ -183,7 +219,7 @@ public class MySQLMixin implements DBInterface { @Override public Set<Range> getSQLRangeSet() { Set<String> set = new TreeSet<String>(); - String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'"; + String sql = "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'"; try { Statement stmt = jdbcConn.createStatement(); ResultSet rs = stmt.executeQuery(sql); @@ -244,9 +280,19 @@ mysql> describe tables; TableInfo ti = tables.get(tableName); if (ti == null) { try { - String tbl = tableName;//.toUpperCase(); - String sql = "SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME='"+tbl+"'"; - ResultSet rs = executeSQLRead(sql); + final String[] split = tableName.split("\\."); + String tbl = (split.length==2)?split[1]:tableName; + String localSchema = (split.length==2)?split[0]:getSchema(); + StringBuilder sql=new StringBuilder(); + sql.append("SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA="); + if(localSchema==null) { + sql.append("DATABASE() AND TABLE_NAME='"); + } + else { + sql.append("'").append(localSchema).append("' AND TABLE_NAME='"); + } + sql.append(tbl).append("';"); + ResultSet rs = executeSQLRead(sql.toString()); if (rs != null) { ti = new TableInfo(); while (rs.next()) { @@ -296,9 +342,13 @@ mysql> describe tables; } } @Override - public void createSQLTriggers(String tableName) { - // Don't create triggers for the table the triggers write into!!! + public void createSQLTriggers(String table) { + final String[] split = table.split("\\."); + String schemaName = (split.length==2)?split[0]:getSchema(); + String tableName = (split.length==2)?split[1]:table; + if (tableName.equals(TRANS_TBL)) + // Don't create triggers for the table the triggers write into!!! return; try { if (!server_tbl_created) { @@ -321,12 +371,12 @@ mysql> describe tables; //msm.register(name); } // No SELECT trigger - executeSQLWrite(generateTrigger(tableName, "INSERT")); - executeSQLWrite(generateTrigger(tableName, "UPDATE")); + executeSQLWrite(generateTrigger(schemaName,tableName, "INSERT")); + executeSQLWrite(generateTrigger(schemaName,tableName, "UPDATE")); //\TODO: save key row instead of the whole row for delete - executeSQLWrite(generateTrigger(tableName, "DELETE")); + executeSQLWrite(generateTrigger(schemaName,tableName, "DELETE")); } catch (SQLException e) { - if (e.getMessage().equals("Trigger already exists")) { + if (e.getMessage().equals("Trigger already exists") || e.getMessage().endsWith("already exists")){ //only warn if trigger already exists logger.warn(EELFLoggerDelegate.applicationLogger, "createSQLTriggers" + e); } else { @@ -343,7 +393,7 @@ END; OLD.field refers to the old value NEW.field refers to the new value */ - private String generateTrigger(String tableName, String op) { + private String generateTrigger(String schema, String tableName, String op) { boolean isdelete = op.equals("DELETE"); boolean isinsert = op.equals("INSERT"); boolean isupdate = op.equals("UPDATE"); @@ -371,25 +421,27 @@ NEW.field refers to the new value //\TODO check if using mysql driver, so instead check the exception //\TODO add conditional for update, if primary key is still the same, use null in the KEYDATA col StringBuilder sb = new StringBuilder() - .append("CREATE TRIGGER ") // IF NOT EXISTS not supported by MySQL! - .append(String.format("%s_%s", op.substring(0, 1), tableName)) - .append(" AFTER ") - .append(op) - .append(" ON ") - .append(tableName.toUpperCase()) - .append(" FOR EACH ROW INSERT INTO ") - .append(TRANS_TBL) - .append(" (TABLENAME, OP, KEYDATA, ROWDATA, CONNECTION_ID) VALUES('") - .append(tableName.toUpperCase()) - .append("', ") - .append(isdelete ? "'D'" : (op.equals("INSERT") ? "'I'" : "'U'")) - .append(", ") - .append( (keyJson.length()>"JSON_OBJECT()".length()) ? keyJson.toString() : "NULL") - .append(", ") - .append(newJson.toString()) - .append(", ") - .append("CONNECTION_ID()") - .append(")"); + .append("CREATE TRIGGER ") // IF NOT EXISTS not supported by MySQL! + .append(String.format("%s_%s", op.substring(0, 1), tableName)) + .append(" AFTER ") + .append(op) + .append(" ON ") + .append(tableName) + .append(" FOR EACH ROW INSERT INTO ") + .append(TRANS_TBL) + .append(" (SCHEMANAME, TABLENAME, OP, KEYDATA, ROWDATA, CONNECTION_ID) VALUES('") + .append( (schema==null)?this.getSchema():schema ) + .append("', '") + .append(tableName) + .append("', ") + .append(isdelete ? "'D'" : (op.equals("INSERT") ? "'I'" : "'U'")) + .append(", ") + .append( (keyJson.length()>"JSON_OBJECT()".length()) ? keyJson.toString() : "NULL") + .append(", ") + .append(newJson.toString()) + .append(", ") + .append("CONNECTION_ID()") + .append(")"); return sb.toString(); } private String[] getTriggerNames(String tableName) { @@ -521,6 +573,16 @@ NEW.field refers to the new value return rs; } + @Override + public void preCommitHook() { + synchronized (stagingHandlerLock){ + //\TODO check if this can potentially block forever in certain scenarios + if(stagingHandler!=null){ + stagingHandler.waitForAllPendingUpdates(); + } + } + } + /** * This method executes a write query in the sql database. * @param sql the SQL to be sent to MySQL @@ -569,11 +631,24 @@ NEW.field refers to the new value String[] parts = sql.trim().split(" "); String cmd = parts[0].toLowerCase(); if ("delete".equals(cmd) || "insert".equals(cmd) || "update".equals(cmd)) { - try { - this.updateStagingTable(transactionDigest); - } catch (NoSuchFieldException|MDBCServiceException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + if (useAsyncStagingUpdate) { + synchronized (stagingHandlerLock){ + if(stagingHandler==null||currentStaging!=transactionDigest){ + Runnable newRunnable = new StagingTableUpdateRunnable(this, transactionDigest); + currentStaging=transactionDigest; + stagingHandler=new AsyncUpdateHandler(newRunnable); + } + //else we can keep using the current staging Handler + } + stagingHandler.processNewUpdate(); + } else { + + try { + this.updateStagingTable(transactionDigest); + } catch (NoSuchFieldException | MDBCServiceException e) { + // TODO Auto-generated catch block + this.logger.error("Error updating the staging table"); + } } } } @@ -604,7 +679,7 @@ NEW.field refers to the new value throws NoSuchFieldException, MDBCServiceException { // copy from DB.MDBC_TRANSLOG where connid == myconnid // then delete from MDBC_TRANSLOG - String sql2 = "SELECT IX, TABLENAME, OP, ROWDATA,KEYDATA FROM "+TRANS_TBL +" WHERE CONNECTION_ID = " + this.connId; + String sql2 = "SELECT IX, SCHEMANAME, TABLENAME, OP, ROWDATA, KEYDATA FROM " + TRANS_TBL +" WHERE CONNECTION_ID = " + this.connId; Integer biggestIx = Integer.MIN_VALUE; Integer smallestIx = Integer.MAX_VALUE; try { @@ -616,10 +691,11 @@ NEW.field refers to the new value smallestIx = Integer.min(smallestIx,ix); String op = rs.getString("OP"); SQLOperation opType = toOpEnum(op); + String schema= rs.getString("SCHEMANAME"); String tbl = rs.getString("TABLENAME"); String newRowStr = rs.getString("ROWDATA"); String rowStr = rs.getString("KEYDATA"); - Range range = new Range(tbl); + Range range = new Range(schema+"."+tbl); transactionDigests.addOperation(range,opType,newRowStr,rowStr); rows.add(ix); } @@ -1056,7 +1132,7 @@ NEW.field refers to the new value private void clearReplayedOperations(Statement jdbcStmt) throws SQLException { logger.info("Clearing replayed operations"); String sql = "DELETE FROM " + TRANS_TBL + " WHERE CONNECTION_ID = " + this.connId; - jdbcStmt.executeQuery(sql); + jdbcStmt.executeUpdate(sql); } @Override diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java new file mode 100755 index 0000000..0f66731 --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java @@ -0,0 +1,1066 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ +package org.onap.music.mdbc.mixins; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.*; +import java.util.Map.Entry; +import net.sf.jsqlparser.JSQLParserException; +import net.sf.jsqlparser.parser.CCJSqlParserUtil; +import net.sf.jsqlparser.statement.delete.Delete; +import net.sf.jsqlparser.statement.insert.Insert; +import net.sf.jsqlparser.statement.update.Update; +import org.json.JSONObject; +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.mdbc.Configuration; +import org.onap.music.mdbc.MDBCUtils; +import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.TableInfo; +import org.onap.music.mdbc.mixins.MySQLMixin.StagingTableUpdateRunnable; +import org.onap.music.mdbc.tables.Operation; +import org.onap.music.mdbc.query.SQLOperation; +import org.onap.music.mdbc.tables.StagingTable; +import org.postgresql.util.PGInterval; +import org.postgresql.util.PGobject; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +/** + * This class provides the methods that MDBC needs in order to mirror data to/from a + * <a href="https://dev.mysql.com/">MySQL</a> or <a href="http://mariadb.org/">MariaDB</a> database instance. This class + * uses the <code>JSON_OBJECT()</code> database function, which means it requires the following minimum versions of + * either database: + * <table summary=""> + * <tr> + * <th>DATABASE</th> + * <th>VERSION</th> + * </tr> + * <tr> + * <td>MySQL</td> + * <td>5.7.8</td> + * </tr> + * <tr> + * <td>MariaDB</td> + * <td>10.2.3 (Note: 10.2.3 is currently (July 2017) a <i>beta</i> release)</td> + * </tr> + * </table> + * + * @author Robert P. Eby + */ +public class PostgresMixin implements DBInterface { + + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(PostgresMixin.class); + + public static final String MIXIN_NAME = "postgres"; + public static final String TRANS_TBL_SCHEMA = "audit"; + public static final String TRANS_TBL = "mdbc_translog"; + + private final MusicInterface mi; + private final String connId; + private final String dbName; + private final String schema; + private final Connection jdbcConn; + private final Map<String, TableInfo> tables; + private PreparedStatement deleteStagingStatement; + private boolean useAsyncStagingUpdate = false; + private Object stagingHandlerLock = new Object(); + private AsyncUpdateHandler stagingHandler = null; + private StagingTable currentStaging = null; + + public PostgresMixin() { + this.mi = null; + this.connId = ""; + this.dbName = null; + this.schema = null; + this.jdbcConn = null; + this.tables = null; + this.deleteStagingStatement = null; + } + + private void initializeDeleteStatement() throws SQLException { + deleteStagingStatement = jdbcConn.prepareStatement("DELETE FROM " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + + " WHERE (ix BETWEEN ? AND ? ) AND " + "connection_id = ?;"); + } + + public PostgresMixin(MusicInterface mi, String url, Connection conn, Properties info) throws SQLException { + this.mi = mi; + this.connId = generateConnID(conn); + this.dbName = getDBName(conn); + this.schema = getSchema(conn); + this.jdbcConn = conn; + this.tables = new HashMap<>(); + useAsyncStagingUpdate = Boolean.parseBoolean(info.getProperty(Configuration.KEY_ASYNC_STAGING_TABLE_UPDATE, + Configuration.ASYNC_STAGING_TABLE_UPDATE)); + initializePostgresTriggersStructures(); + initializeDeleteStatement(); + } + + class StagingTableUpdateRunnable implements Runnable { + + private PostgresMixin mixin; + private StagingTable staging; + + StagingTableUpdateRunnable(PostgresMixin mixin, StagingTable staging) { + this.mixin = mixin; + this.staging = staging; + } + + @Override + public void run() { + try { + this.mixin.updateStagingTable(staging); + } catch (NoSuchFieldException | MDBCServiceException e) { + this.mixin.logger.error("Error when updating the staging table"); + } + } + } + + + private void createTriggerTable() throws SQLException { + final String createSchemaSQL = "CREATE SCHEMA IF NOT EXISTS " + TRANS_TBL_SCHEMA + ";"; + final String revokeCreatePrivilegesSQL = "REVOKE CREATE ON schema " + TRANS_TBL_SCHEMA + " FROM public;"; + final String createTableSQL = "CREATE TABLE IF NOT EXISTS " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " (" + + "ix serial," + "op TEXT NOT NULL CHECK (op IN ('I','D','U'))," + "schema_name text NOT NULL," + + "table_name text NOT NULL," + "original_data json," + "new_data json," + "connection_id text," + + "PRIMARY KEY (connection_id,ix)" + ") WITH (fillfactor=100);"; + final String revokeSQL = "REVOKE INSERT,UPDATE,DELETE,TRUNCATE,REFERENCES,TRIGGER ON " + TRANS_TBL_SCHEMA + "." + + TRANS_TBL + " FROM public;"; + final String grantSelectSQL = "GRANT SELECT ON " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " TO public;"; + final String createIndexSQL = "CREATE INDEX IF NOT EXISTS logged_actions_connection_id_idx" + " ON " + + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " (connection_id);"; + Map<String, String> sqlStatements = new LinkedHashMap<String, String>() { + { + put("create_schema", createSchemaSQL); + put("revoke_privileges", revokeCreatePrivilegesSQL); + put("create_table", createTableSQL); + put("revoke_sql", revokeSQL); + put("grant_select", grantSelectSQL); + put("create_index", createIndexSQL); + } + }; + for (Entry<String, String> query : sqlStatements.entrySet()) { + int retryCount = 0; + boolean ready = false; + while (retryCount < 3 && !ready) { + try { + Statement statement = jdbcConn.createStatement(); + statement.executeUpdate(query.getValue()); + if (!jdbcConn.getAutoCommit()) { + jdbcConn.commit(); + } + statement.close(); + ready = true; + } catch (SQLException e) { + if (e.getMessage().equalsIgnoreCase("ERROR: tuple concurrently updated") || e.getMessage() + .toLowerCase().startsWith("error: duplicate key value violates unique constraint")) { + logger.warn("Error creating schema, retrying. for " + query.getKey(), e); + try { + Thread.sleep(100); + } catch (InterruptedException e1) { + } + } else { + logger.error("Error executing " + query.getKey(), e); + throw e; + } + } + retryCount++; + } + } + } + + private String updateTriggerSection() { + return "IF (TG_OP = 'UPDATE') THEN\n" + "v_old_data := row_to_json(OLD);\n" + + "v_new_data := row_to_json(NEW);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + + " (op,schema_name,table_name,original_data,new_data,connection_id)\n" + + "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_old_data,v_new_data,pg_backend_pid());\n" + + "RETURN NEW; "; + } + + private String insertTriggerSection() { + // \TODO add additinoal conditional on change "IF NEW IS DISTINCT FROM OLD THEN" + return "IF (TG_OP = 'INSERT') THEN\n" + "v_new_data := row_to_json(NEW);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA + + "." + TRANS_TBL + " (op,schema_name,table_name,new_data,connection_id)\n" + + "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_new_data,pg_backend_pid());\n" + + "RETURN NEW; "; + } + + private String deleteTriggerSection() { + return "IF (TG_OP = 'DELETE') THEN\n" + "v_old_data := row_to_json(OLD);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA + + "." + TRANS_TBL + " (op,schema_name,table_name,original_data,connection_id)\n" + + "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_old_data,pg_backend_pid());\n" + + "RETURN OLD; "; + } + + private String functionName(SQLOperation type) { + final String functionName = (type.equals(SQLOperation.UPDATE)) ? "if_updated_func" + : (type.equals(SQLOperation.INSERT)) ? "if_inserted_func" : "if_deleted_func"; + return "audit." + functionName + "()"; + } + + private void createTriggerFunctions(SQLOperation type) throws SQLException { + StringBuilder functionSQL = + new StringBuilder("CREATE OR REPLACE FUNCTION " + functionName(type) + " RETURNS TRIGGER AS $body$\n" + + "DECLARE\n" + "v_old_data json;\n" + "v_new_data json;\n" + "BEGIN\n"); + switch (type) { + case UPDATE: + functionSQL.append(updateTriggerSection()); + break; + case INSERT: + functionSQL.append(insertTriggerSection()); + break; + case DELETE: + functionSQL.append(deleteTriggerSection()); + break; + default: + throw new IllegalArgumentException("Invalid operation type for creation of trigger functions"); + } + functionSQL.append("ELSE\n" + + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - Other action occurred: %, at %',TG_OP,now();\n" + + "RETURN NULL;\n" + "END IF;\n" + "EXCEPTION\n" + "WHEN data_exception THEN\n" + + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [DATA EXCEPTION] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n" + + "RETURN NULL;\n" + "WHEN unique_violation THEN\n" + + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [UNIQUE] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n" + + "RETURN NULL;\n" + "WHEN OTHERS THEN\n" + + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [OTHER] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n" + + "RETURN NULL;\n" + "END;\n" + "$body$\n" + "LANGUAGE plpgsql\n" + "SECURITY DEFINER\n" + + "SET search_path = pg_catalog, audit;"); + int retryCount = 0; + boolean ready = false; + while (retryCount < 3 && !ready) { + try { + executeSQLWrite(functionSQL.toString()); + ready = true; + } catch (SQLException e) { + if (e.getMessage().equalsIgnoreCase("ERROR: tuple concurrently updated")) { + logger.warn("Error creating schema, retrying. ", e); + try { + Thread.sleep(200); + } catch (InterruptedException e1) { + } + } else { + logger.error("Error executing creation of trigger function", e); + throw e; + } + } + retryCount++; + } + } + + private void initializePostgresTriggersStructures() throws SQLException { + try { + createTriggerTable(); + } catch (SQLException e) { + logger.error("Error creating the trigger tables in postgres", e); + throw e; + } + try { + createTriggerFunctions(SQLOperation.INSERT); + createTriggerFunctions(SQLOperation.UPDATE); + createTriggerFunctions(SQLOperation.DELETE); + } catch (SQLException e) { + logger.error("Error creating the trigger functions in postgres", e); + throw e; + } + } + + // This is used to generate a unique connId for this connection to the DB. + private String generateConnID(Connection conn) { + String rv = Integer.toString((int) System.currentTimeMillis()); // random-ish + try { + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT pg_backend_pid() AS IX"); + if (rs.next()) { + rv = rs.getString("IX"); + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "generateConnID: problem generating a connection ID!"); + } + return rv; + } + + /** + * Get the name of this DBnterface mixin object. + * + * @return the name + */ + @Override + public String getMixinName() { + return MIXIN_NAME; + } + + @Override + public void close() { + // nothing yet + } + + /** + * Determines the db name associated with the connection This is the private/internal method that actually + * determines the name + * + * @param conn + * @return + */ + private String getDBName(Connection conn) { + String dbname = "mdbc"; // default name + try { + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT current_database();"); + if (rs.next()) { + dbname = rs.getString("current_database"); + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "getDBName: problem getting database name from mysql"); + } + return dbname; + } + + /** + * Determines the db name associated with the connection This is the private/internal method that actually + * determines the name + * + * @param conn + * @return + */ + private String getSchema(Connection conn) { + String schema = "public"; // default name + try { + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT current_schema();"); + if (rs.next()) { + schema = rs.getString("current_schema"); + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "getDBName: problem getting database name from mysql"); + } + return schema; + } + + @Override + public String getSchema() { + return schema; + } + + @Override + public String getDatabaseName() { + return this.dbName; + } + + /** + * Get a set of the table names in the database. + * + * @return the set + */ + @Override + public Set<Range> getSQLRangeSet() { + Set<String> set = new TreeSet<String>(); + String sql = + "SELECT table_name FROM information_schema.tables WHERE table_type='BASE TABLE' AND table_schema=current_schema();"; + try { + Statement stmt = jdbcConn.createStatement(); + ResultSet rs = stmt.executeQuery(sql); + while (rs.next()) { + String s = rs.getString("table_name"); + set.add(s); + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "getSQLTableSet: " + e); + } + logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set); + Set<Range> rangeSet = new HashSet<>(); + for (String table : set) { + rangeSet.add(new Range(table)); + } + return rangeSet; + } + + /** + * Return a TableInfo object for the specified table. This method first looks in a cache of previously constructed + * TableInfo objects for the table. If not found, it queries the INFORMATION_SCHEMA.COLUMNS table to obtain the + * column names, types, and indexes of the table. It creates a new TableInfo object with the results. + * + * @param tableName the table to look up + * @return a TableInfo object containing the info we need, or null if the table does not exist + */ + @Override + public TableInfo getTableInfo(String tableName) { + // \TODO: invalidate "tables" when a table schema is modified (uncommon), but needs to be handled + TableInfo ti = tables.get(tableName); + if (ti == null) { + try { + String tbl, localSchema; + final String[] split = tableName.split("\\."); + if (split.length == 2) { + localSchema = split[0]; + tbl = split[1]; + } else { + tbl = tableName; + localSchema = this.schema; + } + String sql; + if (schema == null) { + sql = "select column_name, data_type from information_schema.columns where table_schema=current_schema() and table_name='" + + tbl + "';"; + } else { + sql = "select column_name, data_type from information_schema.columns where table_schema='" + + localSchema + "' and table_name='" + tbl + "';"; + } + ResultSet rs = executeSQLRead(sql); + if (rs != null) { + ti = new TableInfo(); + while (rs.next()) { + String name = rs.getString("column_name"); + String type = rs.getString("data_type"); + ti.columns.add(name); + ti.coltype.add(mapDatatypeNameToType(type)); + } + rs.getStatement().close(); + } else { + logger.error(EELFLoggerDelegate.errorLogger, + "Cannot retrieve table info for table " + tableName + " from POSTGRES."); + return null; + } + final String keysql = + "SELECT a.attname as column_name FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid" + + " AND a.attnum = ANY(i.indkey) WHERE i.indrelid = '" + tbl + "'::regclass " + + " AND i.indisprimary;"; + ResultSet rs2 = executeSQLRead(keysql); + Set<String> keycols = new HashSet<>(); + if (rs2 != null) { + while (rs2.next()) { + String name = rs2.getString("column_name"); + keycols.add(name); + } + rs2.getStatement().close(); + } else { + logger.error(EELFLoggerDelegate.errorLogger, + "Cannot retrieve table info for table " + tableName + " from MySQL."); + } + for (String col : ti.columns) { + if (keycols.contains(col)) { + ti.iskey.add(true); + } else { + ti.iskey.add(false); + } + } + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, + "Cannot retrieve table info for table " + tableName + " from MySQL: " + e); + return null; + } + tables.put(tableName, ti); + } + return ti; + } + + // Map Postgres data type names to the java.sql.Types equivalent + private int mapDatatypeNameToType(String nm) { + switch (nm) { + case "character": + return Types.CHAR; + case "national character": + return Types.NCHAR; + case "character varying": + return Types.VARCHAR; + case "national character varying": + return Types.NVARCHAR; + case "text": + return Types.VARCHAR; + case "bytea": + return Types.BINARY; + case "smallint": + return Types.SMALLINT; + case "integer": + return Types.INTEGER; + case "bigint": + return Types.BIGINT; + case "smallserial": + return Types.SMALLINT; + case "serial": + return Types.INTEGER; + case "bigserial": + return Types.BIGINT; + case "real": + return Types.REAL; + case "double precision": + return Types.DOUBLE; + case "numeric": + return Types.NUMERIC; + case "decimal": + return Types.DECIMAL; + case "date": + return Types.DATE; + case "time with time zone": + return Types.TIME_WITH_TIMEZONE; + case "time without time zone": + return Types.TIME; + case "timestamp without time zone": + return Types.TIMESTAMP; + case "timestamp with time zone": + return Types.TIMESTAMP_WITH_TIMEZONE; + case "boolean": + return Types.BIT; + case "bit": + return Types.BIT; + case "oid": + return Types.BIGINT; + case "xml": + return Types.SQLXML; + case "array": + return Types.ARRAY; + case "tinyint": + return Types.TINYINT; + case "uuid": + case "money": + case "interval": + case "bit varying": + case "box": + case "point": + case "lseg": + case "path": + case "polygon": + case "circle": + case "json": + case "inet": + case "cidr": + case "macaddr": + case "tsvector": + case "tsquery": + return Types.OTHER; + default: + logger.error(EELFLoggerDelegate.errorLogger, "unrecognized and/or unsupported data type " + nm); + return Types.VARCHAR; + } + } + + @Override + public void createSQLTriggers(String tableName) { + // Don't create triggers for the table the triggers write into!!! + if (tableName.equals(TRANS_TBL) || tableName.equals(TRANS_TBL_SCHEMA + "." + TRANS_TBL)) + return; + try { + // No SELECT trigger + executeSQLWrite(generateTrigger(tableName, SQLOperation.INSERT)); + executeSQLWrite(generateTrigger(tableName, SQLOperation.DELETE)); + executeSQLWrite(generateTrigger(tableName, SQLOperation.UPDATE)); + } catch (SQLException e) { + if (e.getMessage().trim().endsWith("already exists")) { + // only warn if trigger already exists + logger.warn(EELFLoggerDelegate.applicationLogger, "createSQLTriggers" + e); + } else { + logger.error(EELFLoggerDelegate.errorLogger, "createSQLTriggers: " + e); + } + } + } + + private String generateTrigger(String tableName, SQLOperation op) { + StringBuilder triggerSql = new StringBuilder("CREATE TRIGGER ").append(getTriggerName(tableName, op)) + .append(" AFTER " + op + " ON ").append(tableName).append(" FOR EACH ROW EXECUTE PROCEDURE ") + .append(functionName(op)).append(';'); + return triggerSql.toString(); + } + + private String getTriggerName(String tableName, SQLOperation op) { + switch (op) { + case DELETE: + return "D_" + tableName; + case UPDATE: + return "U_" + tableName; + case INSERT: + return "I_" + tableName; + default: + throw new IllegalArgumentException("Invalid option in trigger operation type"); + } + } + + private String[] getTriggerNames(String tableName) { + return new String[] {getTriggerName(tableName, SQLOperation.INSERT), + getTriggerName(tableName, SQLOperation.DELETE), getTriggerName(tableName, SQLOperation.UPDATE),}; + } + + @Override + public void dropSQLTriggers(String tableName) { + try { + for (String name : getTriggerNames(tableName)) { + logger.debug(EELFLoggerDelegate.applicationLogger, "REMOVE trigger " + name + " from postgres"); + executeSQLWrite("DROP TRIGGER IF EXISTS " + name + ";"); + } + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "dropSQLTriggers: " + e); + } + } + + @Override + public void insertRowIntoSqlDb(String tableName, Map<String, Object> map) { + throw new org.apache.commons.lang.NotImplementedException("Function not implemented yet in postgres"); + } + + @Override + public void deleteRowFromSqlDb(String tableName, Map<String, Object> map) { + throw new org.apache.commons.lang.NotImplementedException("Function not implemented yet in postgres"); + } + + /** + * This method executes a read query in the SQL database. Methods that call this method should be sure to call + * resultset.getStatement().close() when done in order to free up resources. + * + * @param sql the query to run + * @return a ResultSet containing the rows returned from the query + */ + @Override + public ResultSet executeSQLRead(String sql) { + logger.debug(EELFLoggerDelegate.applicationLogger, "Executing sql read in postgres"); + logger.debug("Executing SQL read:" + sql); + ResultSet rs; + try { + Statement stmt = jdbcConn.createStatement(); + rs = stmt.executeQuery(sql); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "executeSQLRead" + e); + return null; + } + return rs; + } + + /** + * This method executes a write query in the sql database. + * + * @param sql the SQL to be sent to MySQL + * @throws SQLException if an underlying JDBC method throws an exception + */ + protected void executeSQLWrite(String sql) throws SQLException { + logger.debug(EELFLoggerDelegate.applicationLogger, "Executing SQL write:" + sql); + Statement stmt = jdbcConn.createStatement(); + stmt.execute(sql); + stmt.close(); + } + + @Override + public void preCommitHook() { + synchronized (stagingHandlerLock) { + // \TODO check if this can potentially block forever in certain scenarios + if (stagingHandler != null) { + stagingHandler.waitForAllPendingUpdates(); + } + } + } + + /** + * Code to be run within the DB driver before a SQL statement is executed. This is where tables can be synchronized + * before a SELECT, for those databases that do not support SELECT triggers. + * + * @param sql the SQL statement that is about to be executed + * @return list of keys that will be updated, if they can't be determined afterwards (i.e. sql table doesn't have + * primary key) + */ + @Override + public void preStatementHook(final String sql) { + if (sql == null) { + return; + } + // \TODO: check if anything needs to be executed here for postgres + } + + /** + * Code to be run within the DB driver after a SQL statement has been executed. This is where remote statement + * actions can be copied back to Cassandra/MUSIC. + * + * @param sql the SQL statement that was executed + */ + @Override + public void postStatementHook(final String sql, StagingTable transactionDigest) { + if (sql != null) { + String[] parts = sql.trim().split(" "); + String cmd = parts[0].toLowerCase(); + if ("delete".equals(cmd) || "insert".equals(cmd) || "update".equals(cmd)) { + if (useAsyncStagingUpdate) { + synchronized (stagingHandlerLock) { + if (stagingHandler == null || currentStaging != transactionDigest) { + Runnable newRunnable = + new PostgresMixin.StagingTableUpdateRunnable(this, transactionDigest); + currentStaging = transactionDigest; + stagingHandler = new AsyncUpdateHandler(newRunnable); + } + // else we can keep using the current staging Handler + } + stagingHandler.processNewUpdate(); + } else { + + try { + this.updateStagingTable(transactionDigest); + } catch (NoSuchFieldException | MDBCServiceException e) { + // TODO Auto-generated catch block + this.logger.error("Error updating the staging table"); + } + } + } + } + } + + private SQLOperation toOpEnum(String operation) throws NoSuchFieldException { + switch (operation.toLowerCase()) { + case "i": + return SQLOperation.INSERT; + case "d": + return SQLOperation.DELETE; + case "u": + return SQLOperation.UPDATE; + case "s": + return SQLOperation.SELECT; + default: + logger.error(EELFLoggerDelegate.errorLogger, "Invalid operation selected: [" + operation + "]"); + throw new NoSuchFieldException("Invalid operation enum"); + } + + } + + /** + * Copy data that is in transaction table into music interface + * + * @param transactionDigests + * @throws NoSuchFieldException + */ + private void updateStagingTable(StagingTable transactionDigests) throws NoSuchFieldException, MDBCServiceException { + String selectSql = "select ix, op, schema_name, table_name, original_data,new_data FROM " + TRANS_TBL_SCHEMA + + "." + TRANS_TBL + " where connection_id = '" + this.connId + "';"; + Integer biggestIx = Integer.MIN_VALUE; + Integer smallestIx = Integer.MAX_VALUE; + try { + ResultSet rs = executeSQLRead(selectSql); + Set<Integer> rows = new TreeSet<Integer>(); + while (rs.next()) { + int ix = rs.getInt("ix"); + biggestIx = Integer.max(biggestIx, ix); + smallestIx = Integer.min(smallestIx, ix); + String op = rs.getString("op"); + SQLOperation opType = toOpEnum(op); + String schema = rs.getString("schema_name"); + String tbl = rs.getString("table_name"); + String original = rs.getString("original_data"); + String newData = rs.getString("new_data"); + Range range = new Range(schema + "." + tbl); + transactionDigests.addOperation(range, opType, newData, original); + rows.add(ix); + } + rs.getStatement().close(); + if (rows.size() > 0) { + logger.debug("Staging delete: Executing with vals [" + smallestIx + "," + biggestIx + "," + this.connId + + "]"); + this.deleteStagingStatement.setInt(1, smallestIx); + this.deleteStagingStatement.setInt(2, biggestIx); + this.deleteStagingStatement.setString(3, this.connId); + this.deleteStagingStatement.execute(); + } + } catch (SQLException e) { + logger.warn("Exception in postStatementHook: " + e); + e.printStackTrace(); + } + } + + + + /** + * Update music with data from MySQL table + * + * @param tableName - name of table to update in music + */ + @Override + public void synchronizeData(String tableName) {} + + /** + * Return a list of "reserved" names, that should not be used by MySQL client/MUSIC These are reserved for mdbc + */ + @Override + public List<String> getReservedTblNames() { + ArrayList<String> rsvdTables = new ArrayList<String>(); + rsvdTables.add(TRANS_TBL_SCHEMA + "." + TRANS_TBL); + rsvdTables.add(TRANS_TBL); + // Add others here as necessary + return rsvdTables; + } + + @Override + public String getPrimaryKey(String sql, String tableName) { + return null; + } + + /** + * Parse the transaction digest into individual events + * + * @param transaction - base 64 encoded, serialized digest + */ + public void replayTransaction(StagingTable transaction, List<Range> ranges) + throws SQLException, MDBCServiceException { + boolean autocommit = jdbcConn.getAutoCommit(); + jdbcConn.setAutoCommit(false); + Statement jdbcStmt = jdbcConn.createStatement(); + final ArrayList<Operation> opList = transaction.getOperationList(); + + for (Operation op : opList) { + if (Range.overlaps(ranges, op.getTable())) { + try { + replayOperationIntoDB(jdbcStmt, op); + } catch (SQLException | MDBCServiceException e) { + // rollback transaction + logger.error("Unable to replay: " + op.getOperationType() + "->" + op.getVal() + "." + + "Rolling back the entire digest replay."); + jdbcConn.rollback(); + throw e; + } + } + } + + clearReplayedOperations(jdbcStmt); + jdbcConn.commit(); + jdbcStmt.close(); + jdbcConn.setAutoCommit(autocommit); + } + + @Override + public void disableForeignKeyChecks() throws SQLException { + Statement disable = jdbcConn.createStatement(); + disable.execute("SET session_replication_role = 'replica';"); + disable.closeOnCompletion(); + } + + @Override + public void enableForeignKeyChecks() throws SQLException { + Statement enable = jdbcConn.createStatement(); + enable.execute("SET session_replication_role = 'origin';"); + enable.closeOnCompletion(); + } + + @Override + public void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException { + replayTransaction(txDigest, ranges); + } + + /** + * Replays operation into database, usually from txDigest + * + * @param jdbcStmt: Connection used to perform the replay + * @param op: operation to be replayed + * @throws SQLException + * @throws MDBCServiceException + */ + private void replayOperationIntoDB(Statement jdbcStmt, Operation op) throws SQLException, MDBCServiceException { + logger.debug("Replaying Operation: " + op.getOperationType() + "->" + op.getVal()); + JSONObject newVal = op.getVal(); + JSONObject oldVal = null; + try { + oldVal = op.getKey(); + } catch (MDBCServiceException e) { + // Ignore exception, in postgres the structure of the operation is different + } + + + TableInfo ti = getTableInfo(op.getTable()); + final List<String> keyColumns = ti.getKeyColumns(); + + // build and replay the queries + String sql = constructSQL(op, keyColumns, newVal, oldVal); + if (sql == null) + return; + + try { + logger.debug("Replaying operation: " + sql); + int updated = jdbcStmt.executeUpdate(sql); + + if (updated == 0) { + // This applies only for replaying transactions involving Eventually Consistent tables + logger.warn( + "Error Replaying operation: " + sql + "; Replacing insert/replace/viceversa and replaying "); + + buildAndExecuteSQLInverse(jdbcStmt, op, keyColumns, newVal, oldVal); + } + } catch (SQLException sqlE) { + // This applies for replaying transactions involving Eventually Consistent tables + // or transactions that replay on top of existing keys + logger.warn( + "Error Replaying operation: " + sql + ";" + "Replacing insert/replace/viceversa and replaying "); + + buildAndExecuteSQLInverse(jdbcStmt, op, keyColumns, newVal, oldVal); + + } + } + + protected void buildAndExecuteSQLInverse(Statement jdbcStmt, Operation op, List<String> keyColumns, + JSONObject newVals, JSONObject oldVals) throws SQLException, MDBCServiceException { + String sqlInverse = constructSQLInverse(op, keyColumns, newVals, oldVals); + if (sqlInverse == null) + return; + logger.debug("Replaying operation: " + sqlInverse); + jdbcStmt.executeUpdate(sqlInverse); + } + + protected String constructSQLInverse(Operation op, List<String> keyColumns, JSONObject newVals, JSONObject oldVals) + throws MDBCServiceException { + String sqlInverse = null; + switch (op.getOperationType()) { + case INSERT: + sqlInverse = constructUpdate(op.getTable(), keyColumns, newVals, oldVals); + break; + case UPDATE: + sqlInverse = constructInsert(op.getTable(), newVals); + break; + default: + break; + } + return sqlInverse; + } + + protected String constructSQL(Operation op, List<String> keyColumns, JSONObject newVals, JSONObject oldVals) + throws MDBCServiceException { + String sql = null; + switch (op.getOperationType()) { + case INSERT: + sql = constructInsert(op.getTable(), newVals); + break; + case UPDATE: + sql = constructUpdate(op.getTable(), keyColumns, newVals, oldVals); + break; + case DELETE: + sql = constructDelete(op.getTable(), keyColumns, oldVals); + break; + case SELECT: + // no update happened, do nothing + break; + default: + logger.error(op.getOperationType() + "not implemented for replay"); + } + return sql; + } + + private String constructDelete(String tableName, List<String> keyColumns, JSONObject oldVals) + throws MDBCServiceException { + if (oldVals == null) { + throw new MDBCServiceException("Trying to update row with an empty old val exception"); + } + StringBuilder sql = new StringBuilder(); + sql.append("DELETE FROM "); + sql.append(tableName + " WHERE "); + sql.append(getPrimaryKeyConditional(keyColumns, oldVals)); + sql.append(";"); + return sql.toString(); + } + + private String constructInsert(String tableName, JSONObject newVals) { + StringBuilder keys = new StringBuilder(); + StringBuilder vals = new StringBuilder(); + String sep = ""; + for (String col : newVals.keySet()) { + keys.append(sep + col); + vals.append(sep + "'" + newVals.get(col) + "'"); + sep = ", "; + } + StringBuilder sql = new StringBuilder(); + sql.append("INSERT INTO ").append(tableName + " (").append(keys).append(") VALUES (").append(vals).append(");"); + return sql.toString(); + } + + private String constructUpdate(String tableName, List<String> keyColumns, JSONObject newVals, JSONObject oldVals) + throws MDBCServiceException { + if (oldVals == null) { + throw new MDBCServiceException("Trying to update row with an empty old val exception"); + } + StringBuilder sql = new StringBuilder(); + sql.append("UPDATE ").append(tableName).append(" SET "); + String sep = ""; + for (String key : newVals.keySet()) { + sql.append(sep).append(key).append("=\"").append(newVals.get(key)).append("\""); + sep = ", "; + } + sql.append(" WHERE "); + sql.append(getPrimaryKeyConditional(keyColumns, oldVals)); + sql.append(";"); + return sql.toString(); + } + + /** + * Create an SQL string for AND'ing all of the primary keys + * + * @param keyColumns list with the name of the columns that are key + * @param vals json with the contents of the old row + * @return string in the form of PK1=Val1 AND PK2=Val2 AND PK3=Val3 + */ + private String getPrimaryKeyConditional(List<String> keyColumns, JSONObject vals) { + StringBuilder keyCondStmt = new StringBuilder(); + String and = ""; + for (String key : keyColumns) { + // We cannot use the default primary key for the sql table and operations + if (!key.equals(mi.getMusicDefaultPrimaryKeyName())) { + Object val = vals.get(key); + keyCondStmt.append(and + key + "=\"" + val + "\""); + and = " AND "; + } + } + return keyCondStmt.toString(); + } + + /** + * Cleans out the transaction table, removing the replayed operations + * + * @param jdbcStmt + * @throws SQLException + */ + private void clearReplayedOperations(Statement jdbcStmt) throws SQLException { + logger.info("Clearing replayed operations"); + String sql = + "DELETE FROM " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " WHERE CONNECTION_ID = '" + this.connId + "';"; + jdbcStmt.executeUpdate(sql); + } + + @Override + public Connection getSQLConnection() { + return jdbcConn; + } + + @Override + public Set<String> getSQLTableSet() { + Set<String> set = new TreeSet<String>(); + String sql = + "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=CURRENT_SCHEMA() AND TABLE_TYPE='BASE TABLE'"; + try { + Statement stmt = jdbcConn.createStatement(); + ResultSet rs = stmt.executeQuery(sql); + while (rs.next()) { + String s = rs.getString("TABLE_NAME"); + set.add(s); + } + stmt.close(); + } catch (SQLException e) { + logger.error(EELFLoggerDelegate.errorLogger, "getSQLTableSet: " + e); + System.out.println("getSQLTableSet: " + e); + e.printStackTrace(); + } + logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set); + System.out.println("getSQLTableSet returning: " + set); + return set; + } + +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java index 86088f9..171ad79 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java @@ -190,28 +190,6 @@ public class Utils { } return list; } - - public static void registerDefaultDrivers() { - Properties pr = null; - try { - pr = new Properties(); - pr.load(Utils.class.getResourceAsStream("/mdbc.properties")); - } - catch (IOException e) { - logger.error("Could not load property file > " + e.getMessage()); - } - - @SuppressWarnings("unused") - List<Class<?>> list = new ArrayList<Class<?>>(); - String drivers = pr.getProperty("DEFAULT_DRIVERS"); - for (String driver: drivers.split("[ ,]")) { - logger.info(EELFLoggerDelegate.applicationLogger, "Registering jdbc driver '" + driver + "'"); - try { - @SuppressWarnings("unused") - Class<?> cl = Class.forName(driver.trim()); - } catch (ClassNotFoundException e) { - logger.error(EELFLoggerDelegate.errorLogger,"Driver class "+driver+" not found."); - } - } - } + + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java index 4f3a3bf..5c6fae4 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java @@ -89,9 +89,10 @@ public class MusicTxDigestDaemon implements Runnable { logger.error("Music interface or DB interface is null in background daemon"); return; } + MdbcConnection conn = null; while (true) { try { - MdbcConnection conn = (MdbcConnection) stateManager.getConnection("daemon"); + conn = (MdbcConnection) stateManager.getConnection("daemon"); if (conn == null) { logger.error("Connection created is null in background daemon"); return; @@ -111,18 +112,20 @@ public class MusicTxDigestDaemon implements Runnable { } //2) for each partition I don't own final Set<Range> warmupRanges = stateManager.getRangesToWarmup(); - final List<DatabasePartition> currentPartitions = stateManager.getPartitions(); - List<Range> missingRanges = new ArrayList<>(); - if (currentPartitions.size() != 0) { - for (DatabasePartition part : currentPartitions) { - List<Range> partitionRanges = part.getSnapshot(); - warmupRanges.removeAll(partitionRanges); - } - try { - stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges)); - } catch (MDBCServiceException e) { - logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage()); - continue; + if (warmupRanges!=null) { + final List<DatabasePartition> currentPartitions = stateManager.getPartitions(); + List<Range> missingRanges = new ArrayList<>(); + if (currentPartitions.size() != 0) { + for (DatabasePartition part : currentPartitions) { + List<Range> partitionRanges = part.getSnapshot(); + warmupRanges.removeAll(partitionRanges); + } + try { + stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges)); + } catch (MDBCServiceException e) { + logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage()); + continue; + } } } @@ -138,6 +141,12 @@ public class MusicTxDigestDaemon implements Runnable { } catch (InterruptedException | SQLException e) { logger.error("MusicTxDigest background daemon stopped " + e.getMessage(), e); Thread.currentThread().interrupt(); + } finally { + try { + if (conn!=null && !conn.isClosed()) conn.close(); + } catch (SQLException e) { + logger.error("MusicTxDigest background daemon error closing" + e.getMessage(), e); + } } } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java index 8fa49a9..db9e455 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java @@ -22,10 +22,18 @@ package org.onap.music.mdbc.tables; import java.util.UUID; public final class MusicTxDigestId { + public final UUID mriId; public final UUID transactionId; public final int index; + public MusicTxDigestId(UUID mriRowId, UUID digestId, int index) { + this.mriId=mriRowId; + this.transactionId= digestId; + this.index=index; + } + public MusicTxDigestId(UUID digestId, int index) { + this.mriId = null; this.transactionId= digestId; this.index=index; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java index d406ad3..3258d0f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java @@ -66,7 +66,7 @@ public final class Operation implements Serializable{ } public JSONObject getKey() throws MDBCServiceException { - if(KEY==null){ + if(KEY==null||KEY.isEmpty()){ throw new MDBCServiceException("This operation ["+TYPE.toString()+"] doesn't contain a key"); } JSONObject keys = new JSONObject(new JSONTokener(KEY)); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java index 4fdd7ac..4ef9d30 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java @@ -186,7 +186,10 @@ public class StagingTable { } OpType newType = (type==SQLOperation.INSERT)?OpType.INSERT:(type==SQLOperation.DELETE)? OpType.DELETE:OpType.UPDATE; - Row.Builder rowBuilder = Row.newBuilder().setTable(range.getTable()).setType(newType).setVal(newVal); + Row.Builder rowBuilder = Row.newBuilder().setTable(range.getTable()).setType(newType); + if(newVal!=null) { + rowBuilder.setVal(newVal); + } if(keys!=null){ rowBuilder.setKey(keys); } @@ -252,10 +255,10 @@ public class StagingTable { } synchronized public boolean isEventualEmpty() { - return (eventuallyBuilder.getRowsCount()==0); + return (eventuallyBuilder!=null) && (eventuallyBuilder.getRowsCount()==0); } - - synchronized public void clear() throws MDBCServiceException { + + synchronized public void clear() throws MDBCServiceException { if(!builderInitialized){ throw new MDBCServiceException("This type of staging table is unmutable, please use the constructor" + "with no parameters"); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java index 7624201..f6239d1 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java @@ -24,6 +24,8 @@ import org.onap.music.mdbc.configurations.NodeConfiguration; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; +import java.util.ArrayList; +import java.util.Arrays; import java.util.UUID; public class CreatePartition { @@ -51,7 +53,9 @@ public class CreatePartition { } public void convert(){ - config = new NodeConfiguration(tables, eventual,UUID.fromString(mriIndex),"test",""); + String[] tablesArray=eventual.split(","); + ArrayList<String> eventualTables = (new ArrayList<>(Arrays.asList(tablesArray))); + config = new NodeConfiguration(tables, eventualTables,UUID.fromString(mriIndex),"test",""); } public void saveToFile(){ diff --git a/mdbc-server/src/main/resources/mdbc.properties b/mdbc-server/src/main/resources/mdbc.properties index 73e8f77..49fdfd2 100755 --- a/mdbc-server/src/main/resources/mdbc.properties +++ b/mdbc-server/src/main/resources/mdbc.properties @@ -4,10 +4,15 @@ MIXINS= \ org.onap.music.mdbc.mixins.MySQLMixin \ org.onap.music.mdbc.mixins.MusicMixin \ - org.onap.music.mdbc.mixins.Music2Mixin + org.onap.music.mdbc.mixins.Music2Mixin \ + org.onap.music.mdbc.mixins.PostgresMixin + +DEFAULT_DB_MIXIN= mysql + +DEFAULT_MUSIC_MIXIN= cassandra2 DEFAULT_DRIVERS=\ - org.h2.Driver \ - com.mysql.jdbc.Driver + org.mariadb.jdbc.Driver \ + org.postgresql.Driver -txdaemonsleeps=15
\ No newline at end of file +txdaemonsleeps=15 |