/* * ============LICENSE_START==================================================== * org.onap.music.mdbc * ============================================================================= * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. * ============================================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END====================================================== */ package org.onap.music.mdbc; import java.sql.Array; import java.sql.Blob; import java.sql.CallableStatement; import java.sql.Clob; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.NClob; import java.sql.PreparedStatement; import java.sql.SQLClientInfoException; import java.sql.SQLException; import java.sql.SQLWarning; import java.sql.SQLXML; import java.sql.Savepoint; import java.sql.Statement; import java.sql.Struct; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.UUID; import java.util.concurrent.Executor; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.exceptions.MusicDeadlockException; import org.onap.music.exceptions.QueryException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.logging.format.AppMessages; import org.onap.music.logging.format.ErrorSeverity; import org.onap.music.logging.format.ErrorTypes; import org.onap.music.mdbc.mixins.DBInterface; import org.onap.music.mdbc.mixins.LockResult; import org.onap.music.mdbc.mixins.MixinFactory; import org.onap.music.mdbc.mixins.MusicInterface; import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn; import org.onap.music.mdbc.ownership.Dag; import org.onap.music.mdbc.ownership.DagNode; import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint; import org.onap.music.mdbc.query.QueryProcessor; import org.onap.music.mdbc.query.SQLOperation; import org.onap.music.mdbc.query.SQLOperationType; import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.TxCommitProgress; /** * ProxyConnection is a proxy to a JDBC driver Connection. It uses the MusicSqlManager to copy * data to and from Cassandra and the underlying JDBC database as needed. It will notify the underlying * MusicSqlManager of any calls to commit(), rollback() or setAutoCommit(). * Otherwise it just forwards all requests to the underlying Connection of the 'real' database. * * @author Robert Eby */ public class MdbcConnection implements Connection { private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MdbcConnection.class); private final String id; // This is the transaction id, assigned to this connection. There is no need to change the id, if connection is reused private final Connection jdbcConn; // the JDBC Connection to the actual underlying database private final MusicInterface mi; private final TxCommitProgress progressKeeper; private final DBInterface dbi; private final StagingTable transactionDigest; /** Set of tables in db */ private final Set table_set; private final StateManager statemanager; /** partition owned for this transaction */ private DatabasePartition partition; /** ranges needed for this transaction */ private Set rangesUsed; private String ownerId = UUID.randomUUID().toString(); public MdbcConnection(String id, String url, Connection c, Properties info, MusicInterface mi, TxCommitProgress progressKeeper, DatabasePartition partition, StateManager statemanager) throws MDBCServiceException { this.id = id; this.table_set = Collections.synchronizedSet(new HashSet()); this.transactionDigest = new StagingTable(new HashSet<>(statemanager.getEventualRanges())); if (c == null) { throw new MDBCServiceException("Connection is null"); } this.jdbcConn = c; info.putAll(MDBCUtils.getMdbcProperties()); String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT); this.dbi = MixinFactory.createDBInterface(mixinDb, mi, url, jdbcConn, info); this.mi = mi; try { this.setAutoCommit(c.getAutoCommit()); } catch (SQLException e) { logger.error("Failure in autocommit"); logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL); } this.progressKeeper = progressKeeper; this.partition = partition; this.statemanager = statemanager; logger.debug("Mdbc connection created with id: "+id); } public DBInterface getDatabaseInterface(){ return this.dbi; } @Override public T unwrap(Class iface) throws SQLException { logger.error(EELFLoggerDelegate.errorLogger, "proxyconn unwrap: " + iface.getName()); return jdbcConn.unwrap(iface); } @Override public boolean isWrapperFor(Class iface) throws SQLException { logger.error(EELFLoggerDelegate.errorLogger, "proxystatement iswrapperfor: " + iface.getName()); return jdbcConn.isWrapperFor(iface); } @Override public Statement createStatement() throws SQLException { return new MdbcCallableStatement(jdbcConn.createStatement(), this); } @Override public PreparedStatement prepareStatement(String sql) throws SQLException { //TODO: grab the sql call from here and all the other preparestatement calls return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql), sql, this); } @Override public CallableStatement prepareCall(String sql) throws SQLException { return new MdbcCallableStatement(jdbcConn.prepareCall(sql), this); } @Override public String nativeSQL(String sql) throws SQLException { return jdbcConn.nativeSQL(sql); } @Override public void setAutoCommit(boolean autoCommit) throws SQLException { boolean b = jdbcConn.getAutoCommit(); if (b != autoCommit) { if(progressKeeper!=null) progressKeeper.commitRequested(id); logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b); if (b) { musicCommit(); } if(progressKeeper!=null) { progressKeeper.setMusicDone(id); } jdbcConn.setAutoCommit(autoCommit); if(progressKeeper!=null) { progressKeeper.setSQLDone(id); } if(progressKeeper!=null&&progressKeeper.isComplete(id)){ progressKeeper.reinitializeTxProgress(id); } } } @Override public boolean getAutoCommit() throws SQLException { return jdbcConn.getAutoCommit(); } private void musicCommit() throws SQLException { if(progressKeeper.isComplete(id)) { return; } if(progressKeeper != null) { progressKeeper.commitRequested(id); } dbi.preCommitHook(); try { partition = mi.splitPartitionIfNecessary(partition, rangesUsed, ownerId); } catch (MDBCServiceException e) { logger.warn(EELFLoggerDelegate.errorLogger, "Failure to split partition '" + partition.getMRIIndex() + "' trying to continue", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL); } try { logger.debug(EELFLoggerDelegate.applicationLogger, " commit "); // transaction was committed -- add all the updates into the REDO-Log in MUSIC MusicTxDigestId digestCreated = mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper); statemanager.getOwnAndCheck().updateAlreadyApplied(mi, dbi, partition.getSnapshot(), partition.getMRIIndex(), digestCreated); } catch (MDBCServiceException e) { //If the commit fail, then a new commitId should be used logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL); throw new SQLException("Failure commiting to MUSIC", e); } } /** * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed, * they are performed now and copied into MUSIC. * @throws SQLException */ @Override public void commit() throws SQLException { musicCommit(); if(progressKeeper != null) { progressKeeper.setMusicDone(id); } jdbcConn.commit(); if(progressKeeper != null) { progressKeeper.setSQLDone(id); } //MusicMixin.releaseZKLocks(MusicMixin.currentLockMap.get(getConnID())); if(progressKeeper.isComplete(id)){ progressKeeper.reinitializeTxProgress(id); } //\TODO try to execute outside of the critical path of commit try { if(partition != null) { mi.relinquish(partition); } } catch (MDBCServiceException e) { logger.warn("Error trying to relinquish: "+partition.toString()); } } /** * Perform a rollback, as requested by the JDBC driver. If any row updates have been delayed, * they are discarded. */ @Override public void rollback() throws SQLException { logger.debug(EELFLoggerDelegate.applicationLogger, "Rollback");; try { transactionDigest.clear(); } catch (MDBCServiceException e) { throw new SQLException("Failure to clear the transaction digest",e); } jdbcConn.rollback(); progressKeeper.reinitializeTxProgress(id); //\TODO try to execute outside of the critical path of commit try { if(partition != null) { mi.relinquish(partition); } } catch (MDBCServiceException e) { logger.warn("Error trying to relinquish: "+partition.toString()); } } /** * Close this MdbcConnection. */ @Override public void close() throws SQLException { logger.debug("Closing mdbc connection with id:"+id); if (dbi != null) { dbi.close(); } if (jdbcConn != null && !jdbcConn.isClosed()) { logger.debug("Closing jdbc from mdbc with id:"+id); jdbcConn.close(); logger.debug("Connection was closed for id:" + id); } try { mi.relinquish(partition); } catch (MDBCServiceException e) { throw new SQLException("Failure during relinquish of partition",e); } // Warning! Make sure this call remains AFTER the call to jdbcConn.close(), // otherwise you're going to get stuck in an infinite loop. statemanager.closeConnection(id); } @Override public boolean isClosed() throws SQLException { return jdbcConn.isClosed(); } @Override public DatabaseMetaData getMetaData() throws SQLException { return jdbcConn.getMetaData(); } @Override public void setReadOnly(boolean readOnly) throws SQLException { jdbcConn.setReadOnly(readOnly); } @Override public boolean isReadOnly() throws SQLException { return jdbcConn.isReadOnly(); } @Override public void setCatalog(String catalog) throws SQLException { jdbcConn.setCatalog(catalog); } @Override public String getCatalog() throws SQLException { return jdbcConn.getCatalog(); } @Override public void setTransactionIsolation(int level) throws SQLException { jdbcConn.setTransactionIsolation(level); } @Override public int getTransactionIsolation() throws SQLException { return jdbcConn.getTransactionIsolation(); } @Override public SQLWarning getWarnings() throws SQLException { return jdbcConn.getWarnings(); } @Override public void clearWarnings() throws SQLException { jdbcConn.clearWarnings(); } @Override public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { return new MdbcCallableStatement(jdbcConn.createStatement(resultSetType, resultSetConcurrency), this); } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { return new MdbcCallableStatement(jdbcConn.prepareStatement(sql, resultSetType, resultSetConcurrency), sql, this); } @Override public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { return new MdbcCallableStatement(jdbcConn.prepareCall(sql, resultSetType, resultSetConcurrency), this); } @Override public Map> getTypeMap() throws SQLException { return jdbcConn.getTypeMap(); } @Override public void setTypeMap(Map> map) throws SQLException { jdbcConn.setTypeMap(map); } @Override public void setHoldability(int holdability) throws SQLException { jdbcConn.setHoldability(holdability); } @Override public int getHoldability() throws SQLException { return jdbcConn.getHoldability(); } @Override public Savepoint setSavepoint() throws SQLException { return jdbcConn.setSavepoint(); } @Override public Savepoint setSavepoint(String name) throws SQLException { return jdbcConn.setSavepoint(name); } @Override public void rollback(Savepoint savepoint) throws SQLException { jdbcConn.rollback(savepoint); } @Override public void releaseSavepoint(Savepoint savepoint) throws SQLException { jdbcConn.releaseSavepoint(savepoint); } @Override public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return new MdbcCallableStatement(jdbcConn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability), this); } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return new MdbcCallableStatement(jdbcConn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql, this); } @Override public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return new MdbcCallableStatement(jdbcConn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability), this); } @Override public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql, autoGeneratedKeys), sql, this); } @Override public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql, columnIndexes), sql, this); } @Override public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql, columnNames), sql, this); } @Override public Clob createClob() throws SQLException { return jdbcConn.createClob(); } @Override public Blob createBlob() throws SQLException { return jdbcConn.createBlob(); } @Override public NClob createNClob() throws SQLException { return jdbcConn.createNClob(); } @Override public SQLXML createSQLXML() throws SQLException { return jdbcConn.createSQLXML(); } @Override public boolean isValid(int timeout) throws SQLException { return jdbcConn.isValid(timeout); } @Override public void setClientInfo(String name, String value) throws SQLClientInfoException { jdbcConn.setClientInfo(name, value); } @Override public void setClientInfo(Properties properties) throws SQLClientInfoException { jdbcConn.setClientInfo(properties); } @Override public String getClientInfo(String name) throws SQLException { return jdbcConn.getClientInfo(name); } @Override public Properties getClientInfo() throws SQLException { return jdbcConn.getClientInfo(); } @Override public Array createArrayOf(String typeName, Object[] elements) throws SQLException { return jdbcConn.createArrayOf(typeName, elements); } @Override public Struct createStruct(String typeName, Object[] attributes) throws SQLException { return jdbcConn.createStruct(typeName, attributes); } @Override public void setSchema(String schema) throws SQLException { jdbcConn.setSchema(schema); } @Override public String getSchema() throws SQLException { return jdbcConn.getSchema(); } @Override public void abort(Executor executor) throws SQLException { jdbcConn.abort(executor); } @Override public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { jdbcConn.setNetworkTimeout(executor, milliseconds); } @Override public int getNetworkTimeout() throws SQLException { return jdbcConn.getNetworkTimeout(); } /** * Code to be run within the DB driver before a SQL statement is executed. This is where tables * can be synchronized before a SELECT, for those databases that do not support SELECT triggers. * @param sql the SQL statement that is about to be executed */ public void preStatementHook(final String sql) throws MDBCServiceException, SQLException { // some debug specific logic if(sql.startsWith("DEBUG")) { // if the SQL follows this convention: "DEBUG:TABLE_A,TABLE_B", // DAG information pertaining to the tables will get printed throw new SQLException("\nThis call was made for debugging purposes only\n" + statemanager.getOwnAndCheck().getDebugInfo(mi,sql.split(":")[1])); } //TODO: verify ownership of keys here //Parse tables from the sql query Map> tableToQueryType = QueryProcessor.parseSqlQuery(sql, table_set); //Check ownership of keys String defaultSchema = dbi.getSchema(); Set queryTables = MDBCUtils.getTables(defaultSchema, tableToQueryType); if (this.rangesUsed==null) { rangesUsed = queryTables; } else { rangesUsed.addAll(queryTables); } // filter out ranges that fall under Eventually consistent // category as these tables do not need ownership Set scRanges = filterEveTables(rangesUsed); DatabasePartition tempPartition = own(scRanges, MDBCUtils.getOperationType(tableToQueryType)); if(tempPartition!=null && tempPartition != partition) { this.partition.updateDatabasePartition(tempPartition); } dbi.preStatementHook(sql); } private Set filterEveTables(Set queryTables) { queryTables.removeAll(statemanager.getEventualRanges()); return queryTables; } /** * Code to be run within the DB driver after a SQL statement has been executed. This is where remote * statement actions can be copied back to Cassandra/MUSIC. * @param sql the SQL statement that was executed */ public void postStatementHook(String sql) { dbi.postStatementHook(sql, transactionDigest); } public void initDatabase() throws QueryException { dbi.initTables(); createTriggers(); } /** * Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the * proxy first starts, and whenever there is the possibility that tables were created or dropped. It is synchronized * in order to prevent multiple threads from running this code in parallel. */ public void createTriggers() throws QueryException { Set set1 = dbi.getSQLTableSet(); // set of tables in the database logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1); for (String tableName : set1) { // This map will be filled in if this table was previously discovered if (!table_set.contains(tableName.toUpperCase()) && !dbi.getReservedTblNames().contains(tableName.toUpperCase())) { logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName); try { dbi.createSQLTriggers(tableName); mi.createPartitionIfNeeded(new Range(tableName)); table_set.add(tableName.toUpperCase()); } catch (Exception e) { logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); //logger.error(EELFLoggerDelegate.errorLogger, "Exception synchronizeTables: "+e); throw new QueryException(e); } } } } public DBInterface getDBInterface() { return this.dbi; } /** * Take ownership of ranges given, and replay the transactions * @param ranges * @return * @throws MDBCServiceException */ private DatabasePartition own(Set ranges, SQLOperationType lockType) throws MDBCServiceException { if(ranges==null||ranges.isEmpty()){ return null; } DatabasePartition newPartition = null; OwnershipAndCheckpoint ownAndCheck = statemanager.getOwnAndCheck(); UUID ownOpId = MDBCUtils.generateTimebasedUniqueKey(); try { final OwnershipReturn ownershipReturn = ownAndCheck.own(mi, ranges, partition, ownOpId, lockType, ownerId); if(ownershipReturn==null){ return null; } Dag dag = ownershipReturn.getDag(); if(dag!=null) { ownAndCheck.checkpoint(this.mi, this.dbi, dag, ranges, ownershipReturn.getOwnershipId()); //TODO: need to update pointer in alreadyapplied if a merge happened instead of in prestatement hook newPartition = new DatabasePartition(ownershipReturn.getRanges(), ownershipReturn.getRangeId(), ownershipReturn.getLockId()); } } catch (MDBCServiceException e) { MusicDeadlockException de = Utils.getDeadlockException(e); if (de!=null) { //release all partitions mi.releaseAllLocksForOwner(de.getOwner(), de.getKeyspace(), de.getTable()); //rollback transaction try { rollback(); } catch (SQLException e1) { throw new MDBCServiceException("Failed to rollback transaction after detecting deadlock while taking ownership of table, which, wow", e1); } } throw e; } finally { ownAndCheck.stopOwnershipTimeoutClock(ownOpId); } return newPartition; } public void relinquishIfRequired(DatabasePartition partition) throws MDBCServiceException { mi.relinquishIfRequired(partition); } public Connection getConnection(){ return jdbcConn; } public DatabasePartition getPartition() { return partition; } public StagingTable getTransactionDigest(){ return transactionDigest; } }