diff options
author | 2018-10-29 10:43:15 -0400 | |
---|---|---|
committer | 2018-11-29 18:26:26 -0500 | |
commit | 9cc93ae782739f4fd637fa8d30a986ce2e14ae3e (patch) | |
tree | edacbf99b11797f90370c7957f0f7a86e2be1820 | |
parent | 76d8bc46fdf9b36548dff46b9d1c91bf7c56f6ac (diff) |
ownership and relinquish
Change-Id: I625bd61adfac11febdb25b179efbc6134a276f12
Issue-ID: MUSIC-219
Signed-off-by: Enrique Saurez <enrique.saurez@gmail.com>
21 files changed, 3076 insertions, 2207 deletions
@@ -1,7 +1,8 @@ .idea target -mdbc.iml +*.iml .project .settings .classpath .checkstyle +catalina* diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java index 2ca621a..ea76598 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -27,7 +27,6 @@ import java.util.*; import org.onap.music.logging.EELFLoggerDelegate; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import org.onap.music.mdbc.tables.MriReference; /** * A database range contain information about what ranges should be hosted in the current MDBC instance @@ -35,115 +34,164 @@ import org.onap.music.mdbc.tables.MriReference; * @author Enrique Saurez */ public class DatabasePartition { - private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(DatabasePartition.class); - - private UUID musicRangeInformationIndex;//Index that can be obtained either from - private String lockId; - protected List<Range> ranges; - - /** - * Each range represents a partition of the database, a database partition is a union of this partitions. - * The only requirement is that the ranges are not overlapping. - */ - - public DatabasePartition(UUID mriIndex) { - this(new ArrayList<Range>(), mriIndex,""); - } - - public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String lockId) { - ranges = knownRanges; - - if(mriIndex != null) { - this.setMusicRangeInformationIndex(mriIndex); - } - else { - this.setMusicRangeInformationIndex(null); - } - this.setLockId(lockId); - - } - - public UUID getMusicRangeInformationIndex() { - return musicRangeInformationIndex; - } - - public void setMusicRangeInformationIndex(UUID musicRangeInformationIndex) { - this.musicRangeInformationIndex = musicRangeInformationIndex; - } - - /** - * Add a new range to the ones own by the local MDBC - * @param newRange range that is being added - * @throws IllegalArgumentException - */ - public synchronized void addNewRange(Range newRange) { - //Check overlap - for(Range r : ranges) { - if(r.overlaps(newRange)) { - throw new IllegalArgumentException("Range is already contain by a previous range"); - } - } - ranges.add(newRange); - } - - /** - * Delete a range that is being modified - * @param rangeToDel limits of the range - */ - public synchronized void deleteRange(Range rangeToDel) { - if(!ranges.contains(rangeToDel)) { - logger.error(EELFLoggerDelegate.errorLogger,"Range doesn't exist"); - throw new IllegalArgumentException("Invalid table"); - } - ranges.remove(rangeToDel); - } - - /** - * Get all the ranges that are currently owned - * @return ranges - */ - public synchronized Range[] getSnapshot() { - return (Range[]) ranges.toArray(); - } - - /** - * Serialize the ranges - * @return serialized ranges - */ + private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(DatabasePartition.class); + + private UUID musicRangeInformationIndex;//Index that can be obtained either from + private String lockId; + protected List<Range> ranges; + + private boolean ready; + + /** + * Each range represents a partition of the database, a database partition is a union of this partitions. + * The only requirement is that the ranges are not overlapping. + */ + + public DatabasePartition() { + this(new ArrayList<Range>(),null,""); + } + + public DatabasePartition(UUID mriIndex) { + this(new ArrayList<Range>(), mriIndex,""); + } + + public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String lockId) { + if(mriIndex==null){ + ready = false; + } + else{ + ready = true; + } + ranges = knownRanges; + + this.setMusicRangeInformationIndex(mriIndex); + this.setLockId(lockId); + } + + /** + * This function is used to change the contents of this, with the contents of a different object + * @param otherPartition partition that is used to substitute the local contents + */ + public void updateDatabasePartition(DatabasePartition otherPartition){ + musicRangeInformationIndex = otherPartition.musicRangeInformationIndex;//Index that can be obtained either from + lockId = otherPartition.lockId; + ranges = otherPartition.ranges; + ready = otherPartition.ready; + } + + public String toString(){ + StringBuilder builder = new StringBuilder().append("Row: ["+musicRangeInformationIndex.toString()+"], lockId: ["+lockId +"], ranges: ["); + for(Range r: ranges){ + builder.append(r.toString()).append(","); + } + builder.append("]"); + return builder.toString(); + } + + + public boolean isLocked(){return lockId != null && !lockId.isEmpty(); } + + public boolean isReady() { + return ready; + } + + public void setReady(boolean ready) { + this.ready = ready; + } + + public UUID getMusicRangeInformationIndex() { + return musicRangeInformationIndex; + } + + public void setMusicRangeInformationIndex(UUID musicRangeInformationIndex) { + this.musicRangeInformationIndex = musicRangeInformationIndex; + } + + /** + * Add a new range to the ones own by the local MDBC + * @param newRange range that is being added + * @throws IllegalArgumentException + */ + public synchronized void addNewRange(Range newRange) { + //Check overlap + for(Range r : ranges) { + if(r.overlaps(newRange)) { + throw new IllegalArgumentException("Range is already contain by a previous range"); + } + } + ranges.add(newRange); + } + + /** + * Delete a range that is being modified + * @param rangeToDel limits of the range + */ + public synchronized void deleteRange(Range rangeToDel) { + if(!ranges.contains(rangeToDel)) { + logger.error(EELFLoggerDelegate.errorLogger,"Range doesn't exist"); + throw new IllegalArgumentException("Invalid table"); + } + ranges.remove(rangeToDel); + } + + /** + * Get all the ranges that are currently owned + * @return ranges + */ + public synchronized List<Range> getSnapshot() { + List<Range> newRange = new ArrayList<>(); + for(Range r : ranges){ + newRange.add(r.clone()); + } + return newRange; + } + + /** + * Serialize the ranges + * @return serialized ranges + */ public String toJson() { - GsonBuilder builder = new GsonBuilder(); - builder.setPrettyPrinting().serializeNulls();; + GsonBuilder builder = new GsonBuilder(); + builder.setPrettyPrinting().serializeNulls();; Gson gson = builder.create(); - return gson.toJson(this); + return gson.toJson(this); } - + /** * Function to obtain the configuration * @param filepath path to the database range * @return a new object of type DatabaseRange * @throws FileNotFoundException */ - + public static DatabasePartition readJsonFromFile( String filepath) throws FileNotFoundException { - BufferedReader br; - try { - br = new BufferedReader( - new FileReader(filepath)); - } catch (FileNotFoundException e) { - logger.error(EELFLoggerDelegate.errorLogger,"File was not found when reading json"+e); - throw e; - } - Gson gson = new Gson(); - DatabasePartition range = gson.fromJson(br, DatabasePartition.class); - return range; - } - - public String getLockId() { - return lockId; - } - - public void setLockId(String lockId) { - this.lockId = lockId; - } + BufferedReader br; + try { + br = new BufferedReader( + new FileReader(filepath)); + } catch (FileNotFoundException e) { + logger.error(EELFLoggerDelegate.errorLogger,"File was not found when reading json"+e); + throw e; + } + Gson gson = new Gson(); + DatabasePartition range = gson.fromJson(br, DatabasePartition.class); + return range; + } + + public String getLockId() { + return lockId; + } + public void setLockId(String lockId) { + this.lockId = lockId; + } + + public boolean isContained(Range range){ + for(Range r: ranges){ + if(r.overlaps(range)){ + return true; + } + } + return false; + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java index 2723490..3f45d98 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java @@ -25,6 +25,7 @@ import java.util.Base64; import java.util.Deque; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -122,4 +123,12 @@ public class MDBCUtils { } return prop; } + + public static List<Range> getTables(Map<String,List<String>> queryParsed){ + List<Range> ranges = new ArrayList<>(); + for(String table: queryParsed.keySet()){ + ranges.add(new Range(table)); + } + return ranges; + } }
\ No newline at end of file diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java index cac4139..7574841 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -37,9 +37,11 @@ import java.sql.Struct; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.UUID; import java.util.concurrent.Executor; import org.onap.music.exceptions.MDBCServiceException; @@ -51,6 +53,9 @@ import org.onap.music.logging.format.ErrorTypes; import org.onap.music.mdbc.mixins.DBInterface; import org.onap.music.mdbc.mixins.MixinFactory; import org.onap.music.mdbc.mixins.MusicInterface; +import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn; +import org.onap.music.mdbc.query.QueryProcessor; +import org.onap.music.mdbc.tables.MusicTxDigest; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.TxCommitProgress; @@ -64,461 +69,487 @@ import org.onap.music.mdbc.tables.TxCommitProgress; * @author Robert Eby */ public class MdbcConnection implements Connection { - private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MdbcConnection.class); - - private final String id; // This is the transaction id, assigned to this connection. There is no need to change the id, if connection is reused - private final Connection jdbcConn; // the JDBC Connection to the actual underlying database - private final MusicInterface mi; - private final TxCommitProgress progressKeeper; - private final DatabasePartition partition; - private final DBInterface dbi; - private final HashMap<Range,StagingTable> transactionDigest; - private final Set<String> table_set; - - public MdbcConnection(String id, String url, Connection c, Properties info, MusicInterface mi, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException { - this.id = id; - this.table_set = Collections.synchronizedSet(new HashSet<String>()); - this.transactionDigest = new HashMap<Range,StagingTable>(); - - if (c == null) { - throw new MDBCServiceException("Connection is null"); - } - this.jdbcConn = c; - info.putAll(MDBCUtils.getMdbcProperties()); - String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT); - this.dbi = MixinFactory.createDBInterface(mixinDb, mi, url, jdbcConn, info); - this.mi = mi; - try { - this.setAutoCommit(c.getAutoCommit()); - } catch (SQLException e) { + private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MdbcConnection.class); + + private final String id; // This is the transaction id, assigned to this connection. There is no need to change the id, if connection is reused + private final Connection jdbcConn; // the JDBC Connection to the actual underlying database + private final MusicInterface mi; + private final TxCommitProgress progressKeeper; + private final DatabasePartition partition; + private final DBInterface dbi; + private final HashMap<Range,StagingTable> transactionDigest; + private final Set<String> table_set; + + public MdbcConnection(String id, String url, Connection c, Properties info, MusicInterface mi, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException { + this.id = id; + this.table_set = Collections.synchronizedSet(new HashSet<String>()); + this.transactionDigest = new HashMap<Range,StagingTable>(); + + if (c == null) { + throw new MDBCServiceException("Connection is null"); + } + this.jdbcConn = c; + info.putAll(MDBCUtils.getMdbcProperties()); + String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT); + this.dbi = MixinFactory.createDBInterface(mixinDb, mi, url, jdbcConn, info); + this.mi = mi; + try { + this.setAutoCommit(c.getAutoCommit()); + } catch (SQLException e) { logger.error("Failure in autocommit"); - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL); - } - - // Verify the tables in MUSIC match the tables in the database - // and create triggers on any tables that need them - try { - this.synchronizeTables(); - } catch (QueryException e) { - logger.error("Error syncrhonizing tables"); - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL); - } - this.progressKeeper = progressKeeper; - this.partition = partition; - + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL); + } + + // Verify the tables in MUSIC match the tables in the database + // and create triggers on any tables that need them + try { + this.synchronizeTables(); + } catch (QueryException e) { + logger.error("Error syncrhonizing tables"); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL); + } + this.progressKeeper = progressKeeper; + this.partition = partition; + logger.debug("Mdbc connection created with id: "+id); - } - - @Override - public <T> T unwrap(Class<T> iface) throws SQLException { - logger.error(EELFLoggerDelegate.errorLogger, "proxyconn unwrap: " + iface.getName()); - return jdbcConn.unwrap(iface); - } - - @Override - public boolean isWrapperFor(Class<?> iface) throws SQLException { - logger.error(EELFLoggerDelegate.errorLogger, "proxystatement iswrapperfor: " + iface.getName()); - return jdbcConn.isWrapperFor(iface); - } - - @Override - public Statement createStatement() throws SQLException { - return new MdbcCallableStatement(jdbcConn.createStatement(), this); - } - - @Override - public PreparedStatement prepareStatement(String sql) throws SQLException { - //TODO: grab the sql call from here and all the other preparestatement calls - return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql), sql, this); - } - - @Override - public CallableStatement prepareCall(String sql) throws SQLException { - return new MdbcCallableStatement(jdbcConn.prepareCall(sql), this); - } - - @Override - public String nativeSQL(String sql) throws SQLException { - return jdbcConn.nativeSQL(sql); - } - - @Override - public void setAutoCommit(boolean autoCommit) throws SQLException { - boolean b = jdbcConn.getAutoCommit(); - if (b != autoCommit) { - if(progressKeeper!=null) progressKeeper.commitRequested(id); - logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b); - if (b) { - // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction - if(id == null || id.isEmpty()) { - logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); - throw new SQLException("tx id is null"); - } - try { - mi.commitLog(partition, transactionDigest, id, progressKeeper); - } catch (MDBCServiceException e) { - // TODO Auto-generated catch block - logger.error("Cannot commit log to music" + e.getStackTrace()); - throw new SQLException(e.getMessage()); - } - } - if(progressKeeper!=null) { + } + + @Override + public <T> T unwrap(Class<T> iface) throws SQLException { + logger.error(EELFLoggerDelegate.errorLogger, "proxyconn unwrap: " + iface.getName()); + return jdbcConn.unwrap(iface); + } + + @Override + public boolean isWrapperFor(Class<?> iface) throws SQLException { + logger.error(EELFLoggerDelegate.errorLogger, "proxystatement iswrapperfor: " + iface.getName()); + return jdbcConn.isWrapperFor(iface); + } + + @Override + public Statement createStatement() throws SQLException { + return new MdbcCallableStatement(jdbcConn.createStatement(), this); + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + //TODO: grab the sql call from here and all the other preparestatement calls + return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql), sql, this); + } + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + return new MdbcCallableStatement(jdbcConn.prepareCall(sql), this); + } + + @Override + public String nativeSQL(String sql) throws SQLException { + return jdbcConn.nativeSQL(sql); + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + boolean b = jdbcConn.getAutoCommit(); + if (b != autoCommit) { + if(progressKeeper!=null) progressKeeper.commitRequested(id); + logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b); + if (b) { + // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction + if(id == null || id.isEmpty()) { + logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); + throw new SQLException("tx id is null"); + } + try { + mi.commitLog(partition, transactionDigest, id, progressKeeper); + } catch (MDBCServiceException e) { + // TODO Auto-generated catch block + logger.error("Cannot commit log to music" + e.getStackTrace()); + throw new SQLException(e.getMessage()); + } + } + if(progressKeeper!=null) { progressKeeper.setMusicDone(id); - } - jdbcConn.setAutoCommit(autoCommit); + } + jdbcConn.setAutoCommit(autoCommit); if(progressKeeper!=null) { progressKeeper.setSQLDone(id); } if(progressKeeper!=null&&progressKeeper.isComplete(id)){ progressKeeper.reinitializeTxProgress(id); } - } - } - - @Override - public boolean getAutoCommit() throws SQLException { - return jdbcConn.getAutoCommit(); - } - - /** - * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed, - * they are performed now and copied into MUSIC. - * @throws SQLException - */ - @Override - public void commit() throws SQLException { - if(progressKeeper.isComplete(id)) { - return; - } - if(progressKeeper != null) { - progressKeeper.commitRequested(id); - } - - try { - logger.debug(EELFLoggerDelegate.applicationLogger, " commit "); - // transaction was committed -- add all the updates into the REDO-Log in MUSIC - mi.commitLog(partition, transactionDigest, id, progressKeeper); - } catch (MDBCServiceException e) { - //If the commit fail, then a new commitId should be used - logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL); - throw new SQLException("Failure commiting to MUSIC"); - } - - if(progressKeeper != null) { - progressKeeper.setMusicDone(id); - } - - jdbcConn.commit(); - - if(progressKeeper != null) { - progressKeeper.setSQLDone(id); - } - //MusicMixin.releaseZKLocks(MusicMixin.currentLockMap.get(getConnID())); + } + } + + @Override + public boolean getAutoCommit() throws SQLException { + return jdbcConn.getAutoCommit(); + } + + /** + * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed, + * they are performed now and copied into MUSIC. + * @throws SQLException + */ + @Override + public void commit() throws SQLException { + if(progressKeeper.isComplete(id)) { + return; + } + if(progressKeeper != null) { + progressKeeper.commitRequested(id); + } + + try { + logger.debug(EELFLoggerDelegate.applicationLogger, " commit "); + // transaction was committed -- add all the updates into the REDO-Log in MUSIC + mi.commitLog(partition, transactionDigest, id, progressKeeper); + } catch (MDBCServiceException e) { + //If the commit fail, then a new commitId should be used + logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL); + throw new SQLException("Failure commiting to MUSIC"); + } + + if(progressKeeper != null) { + progressKeeper.setMusicDone(id); + } + + jdbcConn.commit(); + + if(progressKeeper != null) { + progressKeeper.setSQLDone(id); + } + //MusicMixin.releaseZKLocks(MusicMixin.currentLockMap.get(getConnID())); if(progressKeeper.isComplete(id)){ - progressKeeper.reinitializeTxProgress(id); + progressKeeper.reinitializeTxProgress(id); + } + + //\TODO try to execute outside of the critical path of commit + try { + relinquishIfRequired(partition); + } catch (MDBCServiceException e) { + logger.warn("Error trying to relinquish: "+partition.toString()); } - } - - /** - * Perform a rollback, as requested by the JDBC driver. If any row updates have been delayed, - * they are discarded. - */ - @Override - public void rollback() throws SQLException { - logger.debug(EELFLoggerDelegate.applicationLogger, "Rollback");; - transactionDigest.clear(); - jdbcConn.rollback(); - progressKeeper.reinitializeTxProgress(id); - } - - /** - * Close this MdbcConnection. - */ - @Override - public void close() throws SQLException { - logger.debug("Closing mdbc connection with id:"+id); - if (dbi != null) { - dbi.close(); - } - if (jdbcConn != null && !jdbcConn.isClosed()) { + } + + /** + * Perform a rollback, as requested by the JDBC driver. If any row updates have been delayed, + * they are discarded. + */ + @Override + public void rollback() throws SQLException { + logger.debug(EELFLoggerDelegate.applicationLogger, "Rollback");; + transactionDigest.clear(); + jdbcConn.rollback(); + progressKeeper.reinitializeTxProgress(id); + } + + /** + * Close this MdbcConnection. + */ + @Override + public void close() throws SQLException { + logger.debug("Closing mdbc connection with id:"+id); + if (dbi != null) { + dbi.close(); + } + if (jdbcConn != null && !jdbcConn.isClosed()) { logger.debug("Closing jdbc from mdbc with id:"+id); - jdbcConn.close(); - logger.debug("Connection was closed for id:" + id); - } - } - - @Override - public boolean isClosed() throws SQLException { - return jdbcConn.isClosed(); - } - - @Override - public DatabaseMetaData getMetaData() throws SQLException { - return jdbcConn.getMetaData(); - } - - @Override - public void setReadOnly(boolean readOnly) throws SQLException { - jdbcConn.setReadOnly(readOnly); - } - - @Override - public boolean isReadOnly() throws SQLException { - return jdbcConn.isReadOnly(); - } - - @Override - public void setCatalog(String catalog) throws SQLException { - jdbcConn.setCatalog(catalog); - } - - @Override - public String getCatalog() throws SQLException { - return jdbcConn.getCatalog(); - } - - @Override - public void setTransactionIsolation(int level) throws SQLException { - jdbcConn.setTransactionIsolation(level); - } - - @Override - public int getTransactionIsolation() throws SQLException { - return jdbcConn.getTransactionIsolation(); - } - - @Override - public SQLWarning getWarnings() throws SQLException { - return jdbcConn.getWarnings(); - } - - @Override - public void clearWarnings() throws SQLException { - jdbcConn.clearWarnings(); - } - - @Override - public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { - return new MdbcCallableStatement(jdbcConn.createStatement(resultSetType, resultSetConcurrency), this); - } - - @Override - public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) - throws SQLException { - return new MdbcCallableStatement(jdbcConn.prepareStatement(sql, resultSetType, resultSetConcurrency), sql, this); - } - - @Override - public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { - return new MdbcCallableStatement(jdbcConn.prepareCall(sql, resultSetType, resultSetConcurrency), this); - } - - @Override - public Map<String, Class<?>> getTypeMap() throws SQLException { - return jdbcConn.getTypeMap(); - } - - @Override - public void setTypeMap(Map<String, Class<?>> map) throws SQLException { - jdbcConn.setTypeMap(map); - } - - @Override - public void setHoldability(int holdability) throws SQLException { - jdbcConn.setHoldability(holdability); - } - - @Override - public int getHoldability() throws SQLException { - return jdbcConn.getHoldability(); - } - - @Override - public Savepoint setSavepoint() throws SQLException { - return jdbcConn.setSavepoint(); - } - - @Override - public Savepoint setSavepoint(String name) throws SQLException { - return jdbcConn.setSavepoint(name); - } - - @Override - public void rollback(Savepoint savepoint) throws SQLException { - jdbcConn.rollback(savepoint); - } - - @Override - public void releaseSavepoint(Savepoint savepoint) throws SQLException { - jdbcConn.releaseSavepoint(savepoint); - } - - @Override - public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) - throws SQLException { - return new MdbcCallableStatement(jdbcConn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability), this); - } - - @Override - public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, - int resultSetHoldability) throws SQLException { - return new MdbcCallableStatement(jdbcConn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql, this); - } - - @Override - public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, - int resultSetHoldability) throws SQLException { - return new MdbcCallableStatement(jdbcConn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability), this); - } - - @Override - public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { - return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql, autoGeneratedKeys), sql, this); - } - - @Override - public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { - return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql, columnIndexes), sql, this); - } - - @Override - public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { - return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql, columnNames), sql, this); - } - - @Override - public Clob createClob() throws SQLException { - return jdbcConn.createClob(); - } - - @Override - public Blob createBlob() throws SQLException { - return jdbcConn.createBlob(); - } - - @Override - public NClob createNClob() throws SQLException { - return jdbcConn.createNClob(); - } - - @Override - public SQLXML createSQLXML() throws SQLException { - return jdbcConn.createSQLXML(); - } - - @Override - public boolean isValid(int timeout) throws SQLException { - return jdbcConn.isValid(timeout); - } - - @Override - public void setClientInfo(String name, String value) throws SQLClientInfoException { - jdbcConn.setClientInfo(name, value); - } - - @Override - public void setClientInfo(Properties properties) throws SQLClientInfoException { - jdbcConn.setClientInfo(properties); - } - - @Override - public String getClientInfo(String name) throws SQLException { - return jdbcConn.getClientInfo(name); - } - - @Override - public Properties getClientInfo() throws SQLException { - return jdbcConn.getClientInfo(); - } - - @Override - public Array createArrayOf(String typeName, Object[] elements) throws SQLException { - return jdbcConn.createArrayOf(typeName, elements); - } - - @Override - public Struct createStruct(String typeName, Object[] attributes) throws SQLException { - return jdbcConn.createStruct(typeName, attributes); - } - - @Override - public void setSchema(String schema) throws SQLException { - jdbcConn.setSchema(schema); - } - - @Override - public String getSchema() throws SQLException { - return jdbcConn.getSchema(); - } - - @Override - public void abort(Executor executor) throws SQLException { - jdbcConn.abort(executor); - } - - @Override - public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { - jdbcConn.setNetworkTimeout(executor, milliseconds); - } - - @Override - public int getNetworkTimeout() throws SQLException { - return jdbcConn.getNetworkTimeout(); - } - - - /** - * Code to be run within the DB driver before a SQL statement is executed. This is where tables - * can be synchronized before a SELECT, for those databases that do not support SELECT triggers. - * @param sql the SQL statement that is about to be executed - */ - public void preStatementHook(String sql) { - dbi.preStatementHook(sql); - } - - /** - * Code to be run within the DB driver after a SQL statement has been executed. This is where remote - * statement actions can be copied back to Cassandra/MUSIC. - * @param sql the SQL statement that was executed - */ - public void postStatementHook(String sql) { - dbi.postStatementHook(sql, transactionDigest); - } - - /** - * Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the - * proxy first starts, and whenever there is the possibility that tables were created or dropped. It is synchronized - * in order to prevent multiple threads from running this code in parallel. - */ - public void synchronizeTables() throws QueryException { - Set<String> set1 = dbi.getSQLTableSet(); // set of tables in the database - logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1); - for (String tableName : set1) { - // This map will be filled in if this table was previously discovered - tableName = tableName.toUpperCase(); - if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) { - logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName); - try { - TableInfo ti = dbi.getTableInfo(tableName); - mi.initializeMusicForTable(ti,tableName); - //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted - ti = dbi.getTableInfo(tableName); - mi.createDirtyRowTable(ti,tableName); - dbi.createSQLTriggers(tableName); - table_set.add(tableName); - dbi.synchronizeData(tableName); - logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" + - table_set.size() + "/" + set1.size() + "tables uploaded"); - } catch (Exception e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); - //logger.error(EELFLoggerDelegate.errorLogger, "Exception synchronizeTables: "+e); - throw new QueryException(); - } - } - } - } - - public DBInterface getDBInterface() { - return this.dbi; - } + jdbcConn.close(); + logger.debug("Connection was closed for id:" + id); + } + } + + @Override + public boolean isClosed() throws SQLException { + return jdbcConn.isClosed(); + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + return jdbcConn.getMetaData(); + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException { + jdbcConn.setReadOnly(readOnly); + } + + @Override + public boolean isReadOnly() throws SQLException { + return jdbcConn.isReadOnly(); + } + + @Override + public void setCatalog(String catalog) throws SQLException { + jdbcConn.setCatalog(catalog); + } + + @Override + public String getCatalog() throws SQLException { + return jdbcConn.getCatalog(); + } + + @Override + public void setTransactionIsolation(int level) throws SQLException { + jdbcConn.setTransactionIsolation(level); + } + + @Override + public int getTransactionIsolation() throws SQLException { + return jdbcConn.getTransactionIsolation(); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return jdbcConn.getWarnings(); + } + + @Override + public void clearWarnings() throws SQLException { + jdbcConn.clearWarnings(); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + return new MdbcCallableStatement(jdbcConn.createStatement(resultSetType, resultSetConcurrency), this); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) + throws SQLException { + return new MdbcCallableStatement(jdbcConn.prepareStatement(sql, resultSetType, resultSetConcurrency), sql, this); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return new MdbcCallableStatement(jdbcConn.prepareCall(sql, resultSetType, resultSetConcurrency), this); + } + + @Override + public Map<String, Class<?>> getTypeMap() throws SQLException { + return jdbcConn.getTypeMap(); + } + + @Override + public void setTypeMap(Map<String, Class<?>> map) throws SQLException { + jdbcConn.setTypeMap(map); + } + + @Override + public void setHoldability(int holdability) throws SQLException { + jdbcConn.setHoldability(holdability); + } + + @Override + public int getHoldability() throws SQLException { + return jdbcConn.getHoldability(); + } + + @Override + public Savepoint setSavepoint() throws SQLException { + return jdbcConn.setSavepoint(); + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + return jdbcConn.setSavepoint(name); + } + + @Override + public void rollback(Savepoint savepoint) throws SQLException { + jdbcConn.rollback(savepoint); + } + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + jdbcConn.releaseSavepoint(savepoint); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) + throws SQLException { + return new MdbcCallableStatement(jdbcConn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability), this); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, + int resultSetHoldability) throws SQLException { + return new MdbcCallableStatement(jdbcConn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql, this); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, + int resultSetHoldability) throws SQLException { + return new MdbcCallableStatement(jdbcConn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability), this); + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql, autoGeneratedKeys), sql, this); + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql, columnIndexes), sql, this); + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql, columnNames), sql, this); + } + + @Override + public Clob createClob() throws SQLException { + return jdbcConn.createClob(); + } + + @Override + public Blob createBlob() throws SQLException { + return jdbcConn.createBlob(); + } + + @Override + public NClob createNClob() throws SQLException { + return jdbcConn.createNClob(); + } + + @Override + public SQLXML createSQLXML() throws SQLException { + return jdbcConn.createSQLXML(); + } + + @Override + public boolean isValid(int timeout) throws SQLException { + return jdbcConn.isValid(timeout); + } + + @Override + public void setClientInfo(String name, String value) throws SQLClientInfoException { + jdbcConn.setClientInfo(name, value); + } + + @Override + public void setClientInfo(Properties properties) throws SQLClientInfoException { + jdbcConn.setClientInfo(properties); + } + + @Override + public String getClientInfo(String name) throws SQLException { + return jdbcConn.getClientInfo(name); + } + + @Override + public Properties getClientInfo() throws SQLException { + return jdbcConn.getClientInfo(); + } + + @Override + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + return jdbcConn.createArrayOf(typeName, elements); + } + + @Override + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + return jdbcConn.createStruct(typeName, attributes); + } + + @Override + public void setSchema(String schema) throws SQLException { + jdbcConn.setSchema(schema); + } + + @Override + public String getSchema() throws SQLException { + return jdbcConn.getSchema(); + } + + @Override + public void abort(Executor executor) throws SQLException { + jdbcConn.abort(executor); + } + + @Override + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + jdbcConn.setNetworkTimeout(executor, milliseconds); + } + + @Override + public int getNetworkTimeout() throws SQLException { + return jdbcConn.getNetworkTimeout(); + } + + /** + * Code to be run within the DB driver before a SQL statement is executed. This is where tables + * can be synchronized before a SELECT, for those databases that do not support SELECT triggers. + * @param sql the SQL statement that is about to be executed + */ + public void preStatementHook(final String sql) throws MDBCServiceException { + //TODO: verify ownership of keys here + //Parse tables from the sql query + Map<String, List<String>> tableToInstruction = QueryProcessor.extractTableFromQuery(sql); + //Check ownership of keys + own(MDBCUtils.getTables(tableToInstruction)); + dbi.preStatementHook(sql); + } + + + /** + * Code to be run within the DB driver after a SQL statement has been executed. This is where remote + * statement actions can be copied back to Cassandra/MUSIC. + * @param sql the SQL statement that was executed + */ + public void postStatementHook(String sql) { + dbi.postStatementHook(sql, transactionDigest); + } + + /** + * Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the + * proxy first starts, and whenever there is the possibility that tables were created or dropped. It is synchronized + * in order to prevent multiple threads from running this code in parallel. + */ + public void synchronizeTables() throws QueryException { + Set<String> set1 = dbi.getSQLTableSet(); // set of tables in the database + logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1); + for (String tableName : set1) { + // This map will be filled in if this table was previously discovered + tableName = tableName.toUpperCase(); + if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) { + logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName); + try { + TableInfo ti = dbi.getTableInfo(tableName); + mi.initializeMusicForTable(ti,tableName); + //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted + ti = dbi.getTableInfo(tableName); + mi.createDirtyRowTable(ti,tableName); + dbi.createSQLTriggers(tableName); + table_set.add(tableName); + dbi.synchronizeData(tableName); + logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" + + table_set.size() + "/" + set1.size() + "tables uploaded"); + } catch (Exception e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); + //logger.error(EELFLoggerDelegate.errorLogger, "Exception synchronizeTables: "+e); + throw new QueryException(); + } + } + } + } + + public DBInterface getDBInterface() { + return this.dbi; + } + + public void own(List<Range> ranges) throws MDBCServiceException { + final OwnershipReturn ownershipReturn = mi.own(ranges, partition); + final List<UUID> oldRangeIds = ownershipReturn.getOldIRangeds(); + //\TODO: do in parallel for all range ids + for(UUID oldRange : oldRangeIds) { + MusicTxDigest.replayDigestForPartition(mi, oldRange,dbi); + } + } + + public void relinquishIfRequired(DatabasePartition partition) throws MDBCServiceException { + mi.relinquishIfRequired(partition); + } + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java index 092aa94..68a40a8 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java @@ -46,6 +46,7 @@ import java.util.Calendar; import org.apache.commons.lang3.StringUtils; +import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; /** @@ -93,11 +94,13 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat r = stmt.executeQuery(sql); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); logger.error(EELFLoggerDelegate.errorLogger, "executeQuery: exception "+nm); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } return r; } @@ -112,11 +115,13 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat n = stmt.executeUpdate(sql); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); logger.error(EELFLoggerDelegate.errorLogger, "executeUpdate: exception "+nm+" "+e); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } return n; } @@ -193,7 +198,7 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat b = stmt.execute(sql); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); logger.error(EELFLoggerDelegate.errorLogger, "execute: exception "+nm+" "+e); // Note: this seems to be the only call Camunda uses, so it is the only one I am fixing for now. @@ -205,6 +210,8 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat logger.error(EELFLoggerDelegate.errorLogger, " Exception "+nm+" "+e); throw e; } + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } return b; } @@ -305,11 +312,13 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat n = stmt.executeUpdate(sql, autoGeneratedKeys); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } return n; } @@ -323,12 +332,14 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat n = stmt.executeUpdate(sql, columnIndexes); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; - } + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); + } return n; } @@ -341,11 +352,13 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat n = stmt.executeUpdate(sql, columnNames); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } return n; } @@ -359,11 +372,13 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat b = stmt.execute(sql, autoGeneratedKeys); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } return b; } @@ -377,11 +392,13 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat b = stmt.execute(sql, columnIndexes); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } return b; } @@ -395,11 +412,13 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat b = stmt.execute(sql, columnNames); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } return b; } @@ -443,13 +462,17 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat r = ((PreparedStatement)stmt).executeQuery();; mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { e.printStackTrace(); String nm = e.getClass().getName(); logger.error(EELFLoggerDelegate.errorLogger,"executeQuery: exception "+nm); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } + + return r; } @@ -462,12 +485,14 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat n = ((PreparedStatement)stmt).executeUpdate(); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { e.printStackTrace(); String nm = e.getClass().getName(); logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } return n; } @@ -583,7 +608,7 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat b = ((PreparedStatement)stmt).execute(); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { e.printStackTrace(); String nm = e.getClass().getName(); // Note: this seems to be the only call Camunda uses, so it is the only one I am fixing for now. @@ -595,6 +620,8 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm); throw e; } + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } return b; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java index 989b6e4..a70c359 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java @@ -57,7 +57,7 @@ public class MdbcServerLogic extends JdbcMeta{ public MdbcServerLogic(String Url, Properties info, NodeConfiguration config) throws SQLException, MDBCServiceException { super(Url,info); - this.manager = new StateManager(Url,info,config.partition,config.nodeName, "test"); //FIXME: db name should not be passed in ahead of time + this.manager = new StateManager(Url,info,config.nodeName, "test"); //FIXME: db name should not be passed in ahead of time this.info = info; int concurrencyLevel = Integer.parseInt( info.getProperty(ConnectionCacheSettings.CONCURRENCY_LEVEL.key(), diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java index 97f4be4..26b9acd 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -25,6 +25,7 @@ import java.sql.SQLException; import java.sql.SQLWarning; import java.sql.Statement; +import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.exceptions.QueryException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.logging.format.AppMessages; @@ -38,398 +39,416 @@ import org.onap.music.logging.format.ErrorTypes; * @author Robert Eby */ public class MdbcStatement implements Statement { - private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MdbcStatement.class); - private static final String DATASTAX_PREFIX = "com.datastax.driver"; - - final Statement stmt; // the Statement that we are proxying - final MdbcConnection mConn; - //\TODO We may need to all pass the connection object to support autocommit - - public MdbcStatement(Statement s, MdbcConnection mConn) { - this.stmt = s; - this.mConn = mConn; - } - - public MdbcStatement(Statement stmt, String sql, MdbcConnection mConn) { - //\TODO why there is a constructor with a sql parameter in a not PreparedStatement - this.stmt = stmt; - this.mConn = mConn; - } - - @Override - public <T> T unwrap(Class<T> iface) throws SQLException { - logger.error(EELFLoggerDelegate.errorLogger, "proxystatement unwrap: " + iface.getName()); - return stmt.unwrap(iface); - } - - @Override - public boolean isWrapperFor(Class<?> iface) throws SQLException { - logger.error(EELFLoggerDelegate.errorLogger, "proxystatement isWrapperFor: " + iface.getName()); - return stmt.isWrapperFor(iface); - } - - @Override - public ResultSet executeQuery(String sql) throws SQLException { - logger.debug(EELFLoggerDelegate.applicationLogger,"executeQuery: "+sql); - ResultSet r = null; - try { - mConn.preStatementHook(sql); - r = stmt.executeQuery(sql); - mConn.postStatementHook(sql); - synchronizeTables(sql); - } catch (Exception e) { - String nm = e.getClass().getName(); - logger.error(EELFLoggerDelegate.errorLogger, "executeQuery: exception "+nm); - if (!nm.startsWith(DATASTAX_PREFIX)) - throw e; - } - return r; - } - - @Override - public int executeUpdate(String sql) throws SQLException { - logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql); - - int n = 0; - try { - mConn.preStatementHook(sql); - n = stmt.executeUpdate(sql); - mConn.postStatementHook(sql); - synchronizeTables(sql); - } catch (Exception e) { - String nm = e.getClass().getName(); - logger.error(EELFLoggerDelegate.errorLogger, "executeUpdate: exception "+nm+" "+e); - if (!nm.startsWith(DATASTAX_PREFIX)) - throw e; - } - return n; - } - - @Override - public void close() throws SQLException { - logger.debug(EELFLoggerDelegate.applicationLogger,"Statement close: "); - stmt.close(); - } - - @Override - public int getMaxFieldSize() throws SQLException { - logger.debug(EELFLoggerDelegate.applicationLogger,"getMaxFieldSize"); - return stmt.getMaxFieldSize(); - } - - @Override - public void setMaxFieldSize(int max) throws SQLException { - stmt.setMaxFieldSize(max); - } - - @Override - public int getMaxRows() throws SQLException { - return stmt.getMaxRows(); - } - - @Override - public void setMaxRows(int max) throws SQLException { - stmt.setMaxRows(max); - } - - @Override - public void setEscapeProcessing(boolean enable) throws SQLException { - stmt.setEscapeProcessing(enable); - } - - @Override - public int getQueryTimeout() throws SQLException { - return stmt.getQueryTimeout(); - } - - @Override - public void setQueryTimeout(int seconds) throws SQLException { - //\TODO: we also need to implement a higher level timeout in MDBC - logger.debug(EELFLoggerDelegate.applicationLogger,"setQueryTimeout seconds "+ seconds); - stmt.setQueryTimeout(seconds); - } - - @Override - public void cancel() throws SQLException { - stmt.cancel(); - } - - @Override - public SQLWarning getWarnings() throws SQLException { - return stmt.getWarnings(); - } - - @Override - public void clearWarnings() throws SQLException { - stmt.clearWarnings(); - } - - @Override - public void setCursorName(String name) throws SQLException { - stmt.setCursorName(name); - } - - @Override - public boolean execute(String sql) throws SQLException { - logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql); - boolean b = false; - //\TODO Add the result of the postStatementHook to b - try { - mConn.preStatementHook(sql); - b = stmt.execute(sql); - mConn.postStatementHook(sql); - synchronizeTables(sql); - } catch (Exception e) { - String nm = e.getClass().getName(); - logger.error(EELFLoggerDelegate.errorLogger, "execute: exception "+nm+" "+e); - // Note: this seems to be the only call Camunda uses, so it is the only one I am fixing for now. - boolean ignore = nm.startsWith(DATASTAX_PREFIX); + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MdbcStatement.class); + private static final String DATASTAX_PREFIX = "com.datastax.driver"; + + final Statement stmt; // the Statement that we are proxying + final MdbcConnection mConn; + //\TODO We may need to all pass the connection object to support autocommit + + public MdbcStatement(Statement s, MdbcConnection mConn) { + this.stmt = s; + this.mConn = mConn; + } + + public MdbcStatement(Statement stmt, String sql, MdbcConnection mConn) { + //\TODO why there is a constructor with a sql parameter in a not PreparedStatement + this.stmt = stmt; + this.mConn = mConn; + } + + @Override + public <T> T unwrap(Class<T> iface) throws SQLException { + logger.error(EELFLoggerDelegate.errorLogger, "proxystatement unwrap: " + iface.getName()); + return stmt.unwrap(iface); + } + + @Override + public boolean isWrapperFor(Class<?> iface) throws SQLException { + logger.error(EELFLoggerDelegate.errorLogger, "proxystatement isWrapperFor: " + iface.getName()); + return stmt.isWrapperFor(iface); + } + + @Override + public ResultSet executeQuery(String sql) throws SQLException { + logger.debug(EELFLoggerDelegate.applicationLogger,"executeQuery: "+sql); + ResultSet r = null; + try { + mConn.preStatementHook(sql); + r = stmt.executeQuery(sql); + mConn.postStatementHook(sql); + synchronizeTables(sql); + } catch (SQLException e) { + String nm = e.getClass().getName(); + logger.error(EELFLoggerDelegate.errorLogger, "executeQuery: exception "+nm); + if (!nm.startsWith(DATASTAX_PREFIX)) + throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); + } + return r; + } + + @Override + public int executeUpdate(String sql) throws SQLException { + logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql); + + int n = 0; + try { + mConn.preStatementHook(sql); + n = stmt.executeUpdate(sql); + mConn.postStatementHook(sql); + synchronizeTables(sql); + } catch (SQLException e) { + String nm = e.getClass().getName(); + logger.error(EELFLoggerDelegate.errorLogger, "executeUpdate: exception "+nm+" "+e); + if (!nm.startsWith(DATASTAX_PREFIX)) + throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); + } + return n; + } + + @Override + public void close() throws SQLException { + logger.debug(EELFLoggerDelegate.applicationLogger,"Statement close: "); + stmt.close(); + } + + @Override + public int getMaxFieldSize() throws SQLException { + logger.debug(EELFLoggerDelegate.applicationLogger,"getMaxFieldSize"); + return stmt.getMaxFieldSize(); + } + + @Override + public void setMaxFieldSize(int max) throws SQLException { + stmt.setMaxFieldSize(max); + } + + @Override + public int getMaxRows() throws SQLException { + return stmt.getMaxRows(); + } + + @Override + public void setMaxRows(int max) throws SQLException { + stmt.setMaxRows(max); + } + + @Override + public void setEscapeProcessing(boolean enable) throws SQLException { + stmt.setEscapeProcessing(enable); + } + + @Override + public int getQueryTimeout() throws SQLException { + return stmt.getQueryTimeout(); + } + + @Override + public void setQueryTimeout(int seconds) throws SQLException { + //\TODO: we also need to implement a higher level timeout in MDBC + logger.debug(EELFLoggerDelegate.applicationLogger,"setQueryTimeout seconds "+ seconds); + stmt.setQueryTimeout(seconds); + } + + @Override + public void cancel() throws SQLException { + stmt.cancel(); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return stmt.getWarnings(); + } + + @Override + public void clearWarnings() throws SQLException { + stmt.clearWarnings(); + } + + @Override + public void setCursorName(String name) throws SQLException { + stmt.setCursorName(name); + } + + @Override + public boolean execute(String sql) throws SQLException { + logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql); + boolean b = false; + //\TODO Add the result of the postStatementHook to b + try { + mConn.preStatementHook(sql); + b = stmt.execute(sql); + mConn.postStatementHook(sql); + synchronizeTables(sql); + } catch (SQLException e) { + String nm = e.getClass().getName(); + logger.error(EELFLoggerDelegate.errorLogger, "execute: exception "+nm+" "+e); + // Note: this seems to be the only call Camunda uses, so it is the only one I am fixing for now. + boolean ignore = nm.startsWith(DATASTAX_PREFIX); // ignore |= (nm.startsWith("org.h2.jdbc.JdbcSQLException") && e.getMessage().contains("already exists")); - if (ignore) { - logger.warn("execute: exception (IGNORED) "+nm); - } else { - logger.error(EELFLoggerDelegate.errorLogger, " Exception "+nm+" "+e); - throw e; - } - } - return b; - } - - @Override - public ResultSet getResultSet() throws SQLException { - return stmt.getResultSet(); - } - - @Override - public int getUpdateCount() throws SQLException { - return stmt.getUpdateCount(); - } - - @Override - public boolean getMoreResults() throws SQLException { - return stmt.getMoreResults(); - } - - @Override - public void setFetchDirection(int direction) throws SQLException { - stmt.setFetchDirection(direction); - } - - @Override - public int getFetchDirection() throws SQLException { - return stmt.getFetchDirection(); - } - - @Override - public void setFetchSize(int rows) throws SQLException { - stmt.setFetchSize(rows); - } - - @Override - public int getFetchSize() throws SQLException { - return stmt.getFetchSize(); - } - - @Override - public int getResultSetConcurrency() throws SQLException { - return stmt.getResultSetConcurrency(); - } - - @Override - public int getResultSetType() throws SQLException { - return stmt.getResultSetType(); - } - - @Override - public void addBatch(String sql) throws SQLException { - stmt.addBatch(sql); - } - - @Override - public void clearBatch() throws SQLException { - stmt.clearBatch(); - } - - @Override - public int[] executeBatch() throws SQLException { - logger.debug(EELFLoggerDelegate.applicationLogger,"executeBatch: "); - int[] n = null; - try { - logger.debug(EELFLoggerDelegate.applicationLogger,"executeBatch() is not supported by MDBC; your results may be incorrect as a result."); - n = stmt.executeBatch(); - synchronizeTables(null); - } catch (Exception e) { - String nm = e.getClass().getName(); - logger.error(EELFLoggerDelegate.errorLogger,"executeBatch: exception "+nm); - if (!nm.startsWith(DATASTAX_PREFIX)) - throw e; - } - return n; - } - - @Override - public Connection getConnection() throws SQLException { - return stmt.getConnection(); - } - - @Override - public boolean getMoreResults(int current) throws SQLException { - return stmt.getMoreResults(current); - } - - @Override - public ResultSet getGeneratedKeys() throws SQLException { - return stmt.getGeneratedKeys(); - } - - @Override - public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { - logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql); - int n = 0; - try { - mConn.preStatementHook(sql); - n = stmt.executeUpdate(sql, autoGeneratedKeys); - mConn.postStatementHook(sql); - synchronizeTables(sql); - } catch (Exception e) { - String nm = e.getClass().getName(); - logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm); - if (!nm.startsWith(DATASTAX_PREFIX)) - throw e; - } - return n; - } - - @Override - public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { - logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql); - int n = 0; - try { - mConn.preStatementHook(sql); - n = stmt.executeUpdate(sql, columnIndexes); - mConn.postStatementHook(sql); - synchronizeTables(sql); - } catch (Exception e) { - String nm = e.getClass().getName(); - logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm); - if (!nm.startsWith(DATASTAX_PREFIX)) - throw e; - } - return n; - } - - @Override - public int executeUpdate(String sql, String[] columnNames) throws SQLException { - logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql); - int n = 0; - try { - mConn.preStatementHook(sql); - n = stmt.executeUpdate(sql, columnNames); - mConn.postStatementHook(sql); - synchronizeTables(sql); - } catch (Exception e) { - String nm = e.getClass().getName(); - logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm); - if (!nm.startsWith(DATASTAX_PREFIX)) - throw e; - } - return n; - } - - @Override - public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { - logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql); - boolean b = false; - try { - mConn.preStatementHook(sql); - b = stmt.execute(sql, autoGeneratedKeys); - mConn.postStatementHook(sql); - synchronizeTables(sql); - } catch (Exception e) { - String nm = e.getClass().getName(); - logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm); - if (!nm.startsWith(DATASTAX_PREFIX)) - throw e; - } - return b; - } - - @Override - public boolean execute(String sql, int[] columnIndexes) throws SQLException { - logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql); - boolean b = false; - try { - mConn.preStatementHook(sql); - b = stmt.execute(sql, columnIndexes); - mConn.postStatementHook(sql); - synchronizeTables(sql); - } catch (Exception e) { - String nm = e.getClass().getName(); - logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm); - if (!nm.startsWith(DATASTAX_PREFIX)) - throw e; - } - return b; - } - - @Override - public boolean execute(String sql, String[] columnNames) throws SQLException { - logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql); - //\TODO Idem to the other execute without columnNames - boolean b = false; - try { - mConn.preStatementHook(sql); - b = stmt.execute(sql, columnNames); - mConn.postStatementHook(sql); - synchronizeTables(sql); - } catch (Exception e) { - String nm = e.getClass().getName(); - logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm); - if (!nm.startsWith(DATASTAX_PREFIX)) - throw e; - } - return b; - } - - @Override - public int getResultSetHoldability() throws SQLException { - return stmt.getResultSetHoldability(); - } - - @Override - public boolean isClosed() throws SQLException { - return stmt.isClosed(); - } - - @Override - public void setPoolable(boolean poolable) throws SQLException { - stmt.setPoolable(poolable); - } - - @Override - public boolean isPoolable() throws SQLException { - return stmt.isPoolable(); - } - - @Override - public void closeOnCompletion() throws SQLException { - stmt.closeOnCompletion(); - } - - @Override - public boolean isCloseOnCompletion() throws SQLException { - return stmt.isCloseOnCompletion(); - } - - protected void synchronizeTables(String sql) { - if (sql == null || sql.trim().toLowerCase().startsWith("create")) { - if (mConn != null) { - try { - mConn.synchronizeTables(); - } catch (QueryException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); - } - } - } - } + if (ignore) { + logger.warn("execute: exception (IGNORED) "+nm); + } else { + logger.error(EELFLoggerDelegate.errorLogger, " Exception "+nm+" "+e); + throw e; + } + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); + } + return b; + } + + @Override + public ResultSet getResultSet() throws SQLException { + return stmt.getResultSet(); + } + + @Override + public int getUpdateCount() throws SQLException { + return stmt.getUpdateCount(); + } + + @Override + public boolean getMoreResults() throws SQLException { + return stmt.getMoreResults(); + } + + @Override + public void setFetchDirection(int direction) throws SQLException { + stmt.setFetchDirection(direction); + } + + @Override + public int getFetchDirection() throws SQLException { + return stmt.getFetchDirection(); + } + + @Override + public void setFetchSize(int rows) throws SQLException { + stmt.setFetchSize(rows); + } + + @Override + public int getFetchSize() throws SQLException { + return stmt.getFetchSize(); + } + + @Override + public int getResultSetConcurrency() throws SQLException { + return stmt.getResultSetConcurrency(); + } + + @Override + public int getResultSetType() throws SQLException { + return stmt.getResultSetType(); + } + + @Override + public void addBatch(String sql) throws SQLException { + stmt.addBatch(sql); + } + + @Override + public void clearBatch() throws SQLException { + stmt.clearBatch(); + } + + @Override + public int[] executeBatch() throws SQLException { + logger.debug(EELFLoggerDelegate.applicationLogger,"executeBatch: "); + int[] n = null; + try { + logger.debug(EELFLoggerDelegate.applicationLogger,"executeBatch() is not supported by MDBC; your results may be incorrect as a result."); + n = stmt.executeBatch(); + synchronizeTables(null); + } catch (Exception e) { + String nm = e.getClass().getName(); + logger.error(EELFLoggerDelegate.errorLogger,"executeBatch: exception "+nm); + if (!nm.startsWith(DATASTAX_PREFIX)) + throw e; + } + return n; + } + + @Override + public Connection getConnection() throws SQLException { + return stmt.getConnection(); + } + + @Override + public boolean getMoreResults(int current) throws SQLException { + return stmt.getMoreResults(current); + } + + @Override + public ResultSet getGeneratedKeys() throws SQLException { + return stmt.getGeneratedKeys(); + } + + @Override + public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { + logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql); + int n = 0; + try { + mConn.preStatementHook(sql); + n = stmt.executeUpdate(sql, autoGeneratedKeys); + mConn.postStatementHook(sql); + synchronizeTables(sql); + } catch (SQLException e) { + String nm = e.getClass().getName(); + logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm); + if (!nm.startsWith(DATASTAX_PREFIX)) + throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); + } + return n; + } + + @Override + public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { + logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql); + int n = 0; + try { + mConn.preStatementHook(sql); + n = stmt.executeUpdate(sql, columnIndexes); + mConn.postStatementHook(sql); + synchronizeTables(sql); + } catch (SQLException e) { + String nm = e.getClass().getName(); + logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm); + if (!nm.startsWith(DATASTAX_PREFIX)) + throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); + } + return n; + } + + @Override + public int executeUpdate(String sql, String[] columnNames) throws SQLException { + logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql); + int n = 0; + try { + mConn.preStatementHook(sql); + n = stmt.executeUpdate(sql, columnNames); + mConn.postStatementHook(sql); + synchronizeTables(sql); + } catch (SQLException e) { + String nm = e.getClass().getName(); + logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm); + if (!nm.startsWith(DATASTAX_PREFIX)) + throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); + } + return n; + } + + @Override + public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { + logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql); + boolean b = false; + try { + mConn.preStatementHook(sql); + b = stmt.execute(sql, autoGeneratedKeys); + mConn.postStatementHook(sql); + synchronizeTables(sql); + } catch (SQLException e) { + String nm = e.getClass().getName(); + logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm); + if (!nm.startsWith(DATASTAX_PREFIX)) + throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); + } + return b; + } + + @Override + public boolean execute(String sql, int[] columnIndexes) throws SQLException { + logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql); + boolean b = false; + try { + mConn.preStatementHook(sql); + b = stmt.execute(sql, columnIndexes); + mConn.postStatementHook(sql); + synchronizeTables(sql); + } catch (SQLException e) { + String nm = e.getClass().getName(); + logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm); + if (!nm.startsWith(DATASTAX_PREFIX)) + throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); + } + return b; + } + + @Override + public boolean execute(String sql, String[] columnNames) throws SQLException { + logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql); + //\TODO Idem to the other execute without columnNames + boolean b = false; + try { + mConn.preStatementHook(sql); + b = stmt.execute(sql, columnNames); + mConn.postStatementHook(sql); + synchronizeTables(sql); + } catch (SQLException e) { + String nm = e.getClass().getName(); + logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm); + if (!nm.startsWith(DATASTAX_PREFIX)) + throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); + } + return b; + } + + @Override + public int getResultSetHoldability() throws SQLException { + return stmt.getResultSetHoldability(); + } + + @Override + public boolean isClosed() throws SQLException { + return stmt.isClosed(); + } + + @Override + public void setPoolable(boolean poolable) throws SQLException { + stmt.setPoolable(poolable); + } + + @Override + public boolean isPoolable() throws SQLException { + return stmt.isPoolable(); + } + + @Override + public void closeOnCompletion() throws SQLException { + stmt.closeOnCompletion(); + } + + @Override + public boolean isCloseOnCompletion() throws SQLException { + return stmt.isCloseOnCompletion(); + } + + protected void synchronizeTables(String sql) { + if (sql == null || sql.trim().toLowerCase().startsWith("create")) { + if (mConn != null) { + try { + mConn.synchronizeTables(); + } catch (QueryException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); + } + } + } + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java index 3175f94..059793d 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java @@ -45,9 +45,9 @@ import java.sql.Timestamp; import java.util.Calendar; import java.util.Map; -import org.apache.log4j.Logger; - +import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.exceptions.QueryException; +import org.onap.music.logging.EELFLoggerDelegate; /** * ProxyStatement is a proxy Statement that front ends Statements from the underlying JDBC driver. It passes all operations through, @@ -56,7 +56,7 @@ import org.onap.music.exceptions.QueryException; * @author Robert Eby */ public class ProxyStatement implements CallableStatement { - private static final Logger logger = Logger.getLogger(ProxyStatement.class); + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(ProxyStatement.class); private static final String DATASTAX_PREFIX = "com.datastax.driver"; private final Statement stmt; // the Statement that we are proxying @@ -86,12 +86,15 @@ public class ProxyStatement implements CallableStatement { r = stmt.executeQuery(sql); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); logger.warn("executeQuery: exception "+nm); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } + return r; } @@ -104,11 +107,13 @@ public class ProxyStatement implements CallableStatement { n = stmt.executeUpdate(sql); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); logger.warn("executeUpdate: exception "+nm); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } return n; } @@ -182,7 +187,7 @@ public class ProxyStatement implements CallableStatement { b = stmt.execute(sql); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); // Note: this seems to be the only call Camunda uses, so it is the only one I am fixing for now. boolean ignore = nm.startsWith(DATASTAX_PREFIX); @@ -193,6 +198,8 @@ public class ProxyStatement implements CallableStatement { logger.warn("execute: exception "+nm); throw e; } + } catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } return b; } @@ -256,16 +263,18 @@ public class ProxyStatement implements CallableStatement { public int[] executeBatch() throws SQLException { logger.debug("executeBatch"); int[] n = null; + //\TODO check if this is still invalid in Metric try { logger.warn("executeBatch() is not supported by MDBC; your results may be incorrect as a result."); n = stmt.executeBatch(); synchronizeTables(null); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); logger.warn("executeBatch: exception "+nm); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; - } + } + return n; } @@ -293,11 +302,13 @@ public class ProxyStatement implements CallableStatement { n = stmt.executeUpdate(sql, autoGeneratedKeys); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); logger.warn("executeUpdate: exception "+nm); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; + }catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } return n; } @@ -311,11 +322,13 @@ public class ProxyStatement implements CallableStatement { n = stmt.executeUpdate(sql, columnIndexes); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); logger.warn("executeUpdate: exception "+nm); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; + }catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } return n; } @@ -329,11 +342,13 @@ public class ProxyStatement implements CallableStatement { n = stmt.executeUpdate(sql, columnNames); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); logger.warn("executeUpdate: exception "+nm); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; + }catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } return n; } @@ -347,11 +362,13 @@ public class ProxyStatement implements CallableStatement { b = stmt.execute(sql, autoGeneratedKeys); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); logger.warn("execute: exception "+nm); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; + }catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } return b; } @@ -365,11 +382,13 @@ public class ProxyStatement implements CallableStatement { b = stmt.execute(sql, columnIndexes); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); logger.warn("execute: exception "+nm); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; + }catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } return b; } @@ -383,11 +402,13 @@ public class ProxyStatement implements CallableStatement { b = stmt.execute(sql, columnNames); mConn.postStatementHook(sql); synchronizeTables(sql); - } catch (Exception e) { + } catch (SQLException e) { String nm = e.getClass().getName(); logger.warn("execute: exception "+nm); if (!nm.startsWith(DATASTAX_PREFIX)) throw e; + }catch (MDBCServiceException e) { + throw new SQLException(e.getMessage()); } return b; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java index e182368..4bccbba 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java @@ -29,11 +29,11 @@ import java.util.Objects; * In the future we may decide to partition ranges differently * @author Enrique Saurez */ -public class Range implements Serializable { +public class Range implements Serializable, Cloneable{ private static final long serialVersionUID = 1610744496930800088L; - final public String table; + public String table; public Range(String table) { this.table = table; @@ -58,8 +58,22 @@ public class Range implements Serializable { public int hashCode(){ return Objects.hash(table); } - + + @Override + protected Range clone() { + Range newRange = null; + try{ + newRange = (Range) super.clone(); + newRange.table = this.table; + } + catch(CloneNotSupportedException cns){ + //\TODO add logging + } + return newRange; + + } public boolean overlaps(Range other) { - return table == other.table; + return table.equals(other.table); } + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index c13fe16..f281548 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -19,8 +19,9 @@ */ package org.onap.music.mdbc; +import java.util.ArrayList; +import java.util.List; import org.onap.music.exceptions.MDBCServiceException; -import org.onap.music.exceptions.MusicServiceException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.logging.format.AppMessages; import org.onap.music.logging.format.ErrorSeverity; @@ -71,16 +72,16 @@ public class StateManager { /** Identifier for this server instance */ private String mdbcServerName; + private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition - @SuppressWarnings("unused") - private DatabasePartition ranges; - public StateManager(String sqlDBUrl, Properties info, DatabasePartition ranges, String mdbcServerName, String sqlDBName) throws MDBCServiceException { + public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException { this.sqlDBName = sqlDBName; - this.ranges = ranges; this.sqlDBUrl = sqlDBUrl; this.info = info; this.mdbcServerName = mdbcServerName; + + this.connectionRanges = new HashMap<>(); this.transactionInfo = new TxCommitProgress(); //\fixme this might not be used, delete? try { @@ -102,9 +103,7 @@ public class StateManager { } /** - * Initialize the connections to music, set up any necessary tables in music - * @param mixin - * @param cassandraUrl + * Initialize all the interfaces and datastructures * @throws MDBCServiceException */ protected void initMusic() throws MDBCServiceException { @@ -118,7 +117,8 @@ public class StateManager { Class.forName("org.mariadb.jdbc.Driver"); } catch (ClassNotFoundException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, + ErrorTypes.GENERALSERVICEERROR); return; } try { @@ -129,7 +129,8 @@ public class StateManager { Statement stmt = sqlConnection.createStatement(); stmt.execute(sql.toString()); } catch (SQLException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, + ErrorTypes.GENERALSERVICEERROR); throw new MDBCServiceException(e.getMessage()); } } @@ -138,15 +139,11 @@ public class StateManager { return this.musicInterface; } - public DatabasePartition getRanges() { - return ranges; + public List<DatabasePartition> getRanges() { + return new ArrayList<>(connectionRanges.values()); } - public void setRanges(DatabasePartition ranges) { - this.ranges = ranges; - } - - + public void closeConnection(String connectionId){ //\TODO check if there is a race condition if(mdbcConnections.containsKey(connectionId)) { @@ -156,10 +153,15 @@ public class StateManager { if(conn!=null) conn.close(); } catch (SQLException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, + ErrorTypes.GENERALSERVICEERROR); } mdbcConnections.remove(connectionId); } + if(connectionRanges.containsKey(connectionId)){ + //TODO release lock? + connectionRanges.remove(connectionId); + } } /** @@ -179,20 +181,34 @@ public class StateManager { } catch (ClassNotFoundException e) { // TODO Auto-generated catch block - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, + ErrorTypes.GENERALSERVICEERROR); return; } try { sqlConnection = DriverManager.getConnection(this.sqlDBUrl+"/"+this.sqlDBName, this.info); } catch (SQLException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, + ErrorTypes.QUERYERROR); sqlConnection = null; } + //check if a range was already created for this connection + //TODO: later we could try to match it to some more sticky client id + DatabasePartition ranges; + if(connectionRanges.containsKey(id)){ + ranges=connectionRanges.get(id); + } + else{ + ranges=new DatabasePartition(); + connectionRanges.put(id,ranges); + } //Create MDBC connection try { - newConnection = new MdbcConnection(id, this.sqlDBUrl+"/"+this.sqlDBName, sqlConnection, info, this.musicInterface, transactionInfo,ranges); + newConnection = new MdbcConnection(id, this.sqlDBUrl+"/"+this.sqlDBName, sqlConnection, info, this.musicInterface, + transactionInfo,ranges); } catch (MDBCServiceException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, + ErrorTypes.QUERYERROR); newConnection = null; return; } @@ -227,7 +243,8 @@ public class StateManager { } catch (ClassNotFoundException e) { // TODO Auto-generated catch block - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, + ErrorTypes.QUERYERROR); } //Create connection to local SQL DB @@ -235,14 +252,27 @@ public class StateManager { sqlConnection = DriverManager.getConnection(this.sqlDBUrl+"/"+this.sqlDBName, this.info); } catch (SQLException e) { logger.error("sql connection was not created correctly"); - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, + ErrorTypes.QUERYERROR); sqlConnection = null; } + //check if a range was already created for this connection + //TODO: later we could try to match it to some more sticky client id + DatabasePartition ranges; + if(connectionRanges.containsKey(id)){ + ranges=connectionRanges.get(id); + } + else{ + ranges=new DatabasePartition(); + connectionRanges.put(id,ranges); + } //Create MDBC connection try { - newConnection = new MdbcConnection(id,this.sqlDBUrl+"/"+this.sqlDBName, sqlConnection, info, this.musicInterface, transactionInfo,ranges); + newConnection = new MdbcConnection(id,this.sqlDBUrl+"/"+this.sqlDBName, sqlConnection, info, this.musicInterface, + transactionInfo,ranges); } catch (MDBCServiceException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, + ErrorTypes.QUERYERROR); newConnection = null; } logger.info(EELFLoggerDelegate.applicationLogger,"Connection created for connection: "+id); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java index 73622b1..383b4b3 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java @@ -111,11 +111,13 @@ public interface DBInterface { List<String> getReservedTblNames(); String getPrimaryKey(String sql, String tableName); - + + String applyDigest(Map<Range,StagingTable> digest); + /** * Replay a given TxDigest into the local DB * @param digest * @throws SQLException if replay cannot occur correctly */ - public void replayTransaction(HashMap<Range,StagingTable> digest) throws SQLException; + void replayTransaction(HashMap<Range,StagingTable> digest) throws SQLException; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java index f4231d8..64e9253 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java @@ -28,12 +28,12 @@ import org.json.JSONObject; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.exceptions.MusicServiceException; +import org.onap.music.exceptions.MusicLockingException; import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.StagingTable; -import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.TxCommitProgress; @@ -42,7 +42,26 @@ import org.onap.music.mdbc.tables.TxCommitProgress; * * @author Robert P. Eby */ -public interface MusicInterface { +public interface MusicInterface { + class OwnershipReturn{ + private final String ownerId; + private final UUID rangeId; + private List<UUID> oldIds; + public OwnershipReturn(String ownerId, UUID rangeId, List<UUID> oldIds){ + this.ownerId=ownerId; + this.rangeId=rangeId; + this.oldIds=oldIds; + } + public String getOwnerId(){ + return ownerId; + } + public UUID getRangeId(){ + return rangeId; + } + public List<UUID> getOldIRangeds(){ + return oldIds; + } + } /** * Get the name of this MusicInterface mixin object. * @return the name @@ -57,7 +76,7 @@ public interface MusicInterface { * generates a key or placeholder for what is required for a primary key * @return a primary key */ - String generateUniqueKey(); + UUID generateUniqueKey(); /** * Find the key used with Music for a table that was created without a primary index @@ -68,7 +87,8 @@ public interface MusicInterface { * @param dbRow row obtained from the SQL layer * @return key associated with the row */ - String getMusicKeyFromRowWithoutPrimaryIndexes(TableInfo ti, String table, JSONObject dbRow); + String getMusicKeyFromRowWithoutPrimaryIndexes(TableInfo ti, String table, JSONObject dbRow) + ; /** * Do what is needed to close down the MUSIC connection. */ @@ -157,30 +177,98 @@ public interface MusicInterface { /** * Commits the corresponding REDO-log into MUSIC * - * @param partition - * @param transactionDigest digest of the transaction that is being committed into the Redo log in music. It has to be a HashMap, because it is required to be serializable + * @param partition information related to ownership of partitions, used to verify ownership when commiting the Tx + * @param transactionDigest digest of the transaction that is being committed into the Redo log in music. It has to + * be a HashMap, because it is required to be serializable * @param txId id associated with the log being send * @param progressKeeper data structure that is used to handle to detect failures, and know what to do * @throws MDBCServiceException */ void commitLog(DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException; + + /** + * This function is used to obtain the information related to a specific row in the MRI table + * @param partitionIndex index of the row that is going to be retrieved + * @return all the information related to the table + * @throws MDBCServiceException + */ MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException; + /** + * This function is used to create a new row in the MRI table + * @param info the information used to create the row + * @return the new partition object that contain the new information used to create the row + * @throws MDBCServiceException + */ DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException; - + + /** + * This function is used to append an index to the redo log in a MRI row + * @param mriRowId mri row index to which we are going to append the index to the redo log + * @param partition information related to ownership of partitions, used to verify ownership + * @param newRecord index of the new record to be appended to the redo log + * @throws MDBCServiceException + */ void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException; - + + /** + * This functions adds the tx digest to + * @param newId id used as index in the MTD table + * @param transactionDigest digest that contains all the changes performed in the transaction + * @throws MDBCServiceException + */ void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException; + /** + * Function used to retrieve a given transaction digest and deserialize it + * @param id of the transaction digest to be retrieved + * @return the deserialize transaction digest that can be applied to the local SQL database + * @throws MDBCServiceException + */ HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException; - - void own(List<Range> ranges); - void appendRange(String rangeId, List<Range> ranges); + /** + * Use this functions to verify ownership, and own new ranges + * @param ranges the ranges that should be own after calling this function + * @param partition current information of the ownership in the system + * @return an object indicating the status of the own function result + * @throws MDBCServiceException + */ + OwnershipReturn own(List<Range> ranges, DatabasePartition partition) throws MDBCServiceException; + + /** + * This function relinquish ownership, if it is time to do it, it should be used at the end of a commit operation + * @param partition information of the partition that is currently being owned + * @throws MDBCServiceException + */ + void relinquishIfRequired(DatabasePartition partition) throws MDBCServiceException; + + /** + * This function is in charge of owning all the ranges requested and creating a new row that show the ownership of all + * those ranges. + * @param rangeId new id to be used in the new row + * @param ranges ranges to be owned by the end of the function called + * @param partition current ownership status + * @return + * @throws MDBCServiceException + */ + OwnershipReturn appendRange(String rangeId, List<Range> ranges, DatabasePartition partition) throws MDBCServiceException; + + /** + * This functions relinquishes a range + * @param ownerId id of the current ownerh + * @param rangeId id of the range to be relinquished + * @throws MusicLockingException + */ + void relinquish(String ownerId, String rangeId) throws MDBCServiceException; - void relinquish(String ownerId, String rangeId); - public List<UUID> getPartitionIndexes(); + /** + * This function return all the range indexes that are currently hold by any of the connections in the system + * @return list of ids of rows in MRI + */ + List<UUID> getPartitionIndexes() throws MDBCServiceException; + void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index 8ffb6af..7228b55 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -27,19 +27,21 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import org.apache.commons.lang.NotImplementedException; import org.onap.music.mdbc.*; import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.StagingTable; -import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MusicRangeInformationRow; -import org.onap.music.mdbc.tables.MusicTxDigest; import org.onap.music.mdbc.tables.TxCommitProgress; import org.json.JSONObject; @@ -49,7 +51,6 @@ import org.onap.music.exceptions.MusicLockingException; import org.onap.music.exceptions.MusicQueryException; import org.onap.music.exceptions.MusicServiceException; import org.onap.music.main.MusicCore; -import org.onap.music.main.MusicCore.Condition; import org.onap.music.main.ResultType; import org.onap.music.main.ReturnType; @@ -84,104 +85,131 @@ import com.datastax.driver.core.TupleValue; * @author Robert P. Eby */ public class MusicMixin implements MusicInterface { - /** The property name to use to identify this replica to MusicSqlManager */ - public static final String KEY_MY_ID = "myid"; - /** The property name to use for the comma-separated list of replica IDs. */ - public static final String KEY_REPLICAS = "replica_ids"; - /** The property name to use to identify the IP address for Cassandra. */ - public static final String KEY_MUSIC_ADDRESS = "cassandra.host"; - /** The property name to use to provide the replication factor for Cassandra. */ - public static final String KEY_MUSIC_RFACTOR = "music_rfactor"; - /** The property name to use to provide the replication factor for Cassandra. */ - public static final String KEY_MUSIC_NAMESPACE = "music_namespace"; - /** Namespace for the tables in MUSIC (Cassandra) */ - public static final String DEFAULT_MUSIC_NAMESPACE = "namespace"; - /** The default property value to use for the Cassandra IP address. */ - public static final String DEFAULT_MUSIC_ADDRESS = "localhost"; - /** The default property value to use for the Cassandra replication factor. */ - public static final int DEFAULT_MUSIC_RFACTOR = 1; - /** The default primary string column, if none is provided. */ - public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid"; - /** Type of the primary key, if none is defined by the user */ - public static final String MDBC_PRIMARYKEY_TYPE = "uuid"; - - - //\TODO Add logic to change the names when required and create the tables when necessary + /** The property name to use to identify this replica to MusicSqlManager */ + public static final String KEY_MY_ID = "myid"; + /** The property name to use for the comma-separated list of replica IDs. */ + public static final String KEY_REPLICAS = "replica_ids"; + /** The property name to use to identify the IP address for Cassandra. */ + public static final String KEY_MUSIC_ADDRESS = "cassandra.host"; + /** The property name to use to provide the replication factor for Cassandra. */ + public static final String KEY_MUSIC_RFACTOR = "music_rfactor"; + /** The property name to use to provide the replication factor for Cassandra. */ + public static final String KEY_MUSIC_NAMESPACE = "music_namespace"; + /** Namespace for the tables in MUSIC (Cassandra) */ + public static final String DEFAULT_MUSIC_NAMESPACE = "namespace"; + /** The default property value to use for the Cassandra IP address. */ + public static final String DEFAULT_MUSIC_ADDRESS = "localhost"; + /** The default property value to use for the Cassandra replication factor. */ + public static final int DEFAULT_MUSIC_RFACTOR = 1; + /** The default primary string column, if none is provided. */ + public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid"; + /** Type of the primary key, if none is defined by the user */ + public static final String MDBC_PRIMARYKEY_TYPE = "uuid"; + + + //\TODO Add logic to change the names when required and create the tables when necessary private String musicTxDigestTableName = "musictxdigest"; - private String musicRangeInformationTableName = "musicrangeinformation"; - - private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicMixin.class); - - private static final Map<Integer, String> typemap = new HashMap<>(); - static { - // We only support the following type mappings currently (from DB -> Cassandra). - // Anything else will likely cause a NullPointerException - typemap.put(Types.BIGINT, "BIGINT"); // aka. IDENTITY - typemap.put(Types.BLOB, "VARCHAR"); - typemap.put(Types.BOOLEAN, "BOOLEAN"); - typemap.put(Types.CLOB, "BLOB"); - typemap.put(Types.DATE, "VARCHAR"); - typemap.put(Types.DOUBLE, "DOUBLE"); - typemap.put(Types.DECIMAL, "DECIMAL"); - typemap.put(Types.INTEGER, "INT"); - //typemap.put(Types.TIMESTAMP, "TIMESTAMP"); - typemap.put(Types.SMALLINT, "SMALLINT"); - typemap.put(Types.TIMESTAMP, "VARCHAR"); - typemap.put(Types.VARBINARY, "BLOB"); - typemap.put(Types.VARCHAR, "VARCHAR"); - typemap.put(Types.CHAR, "VARCHAR"); - //The "Hacks", these don't have a direct mapping - //typemap.put(Types.DATE, "VARCHAR"); - //typemap.put(Types.DATE, "TIMESTAMP"); - } - - protected final String music_ns; - protected final String myId; - protected final String[] allReplicaIds; - private final String musicAddress; - private final int music_rfactor; - private MusicConnector mCon = null; - private Session musicSession = null; - private boolean keyspace_created = false; - private Map<String, PreparedStatement> ps_cache = new HashMap<>(); - private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>()); - - public MusicMixin() { - //this.logger = null; - this.musicAddress = null; - this.music_ns = null; - this.music_rfactor = 0; - this.myId = null; - this.allReplicaIds = null; - } - - public MusicMixin(String mdbcServerName, Properties info) throws MDBCServiceException { - // Default values -- should be overridden in the Properties - // Default to using the host_ids of the various peers as the replica IDs (this is probably preferred) - this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS); - logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: musicAddress="+musicAddress); - - String s = info.getProperty(KEY_MUSIC_RFACTOR); - this.music_rfactor = (s == null) ? DEFAULT_MUSIC_RFACTOR : Integer.parseInt(s); - - this.myId = info.getProperty(KEY_MY_ID, getMyHostId()); - logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: myId="+myId); + private String musicRangeInformationTableName = "musicrangeinformation"; + + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicMixin.class); + + private class LockResult{ + private final UUID musicRangeInformationIndex; + private final String ownerId; + private final boolean newLock; + public LockResult(UUID rowId, String ownerId, boolean newLock){ + this.musicRangeInformationIndex = rowId; + this.ownerId=ownerId; + this.newLock=newLock; + } + public String getOwnerId(){ + return ownerId; + } + public boolean getNewLock(){ + return newLock; + } + public UUID getIndex() {return musicRangeInformationIndex;} + } - - this.allReplicaIds = info.getProperty(KEY_REPLICAS, getAllHostIds()).split(","); - logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: allReplicaIds="+info.getProperty(KEY_REPLICAS, this.myId)); - this.music_ns = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE); - logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns); - - initializeMetricTables(); + private static final Map<Integer, String> typemap = new HashMap<>(); + static { + // We only support the following type mappings currently (from DB -> Cassandra). + // Anything else will likely cause a NullPointerException + typemap.put(Types.BIGINT, "BIGINT"); // aka. IDENTITY + typemap.put(Types.BLOB, "VARCHAR"); + typemap.put(Types.BOOLEAN, "BOOLEAN"); + typemap.put(Types.CLOB, "BLOB"); + typemap.put(Types.DATE, "VARCHAR"); + typemap.put(Types.DOUBLE, "DOUBLE"); + typemap.put(Types.DECIMAL, "DECIMAL"); + typemap.put(Types.INTEGER, "INT"); + //typemap.put(Types.TIMESTAMP, "TIMESTAMP"); + typemap.put(Types.SMALLINT, "SMALLINT"); + typemap.put(Types.TIMESTAMP, "VARCHAR"); + typemap.put(Types.VARBINARY, "BLOB"); + typemap.put(Types.VARCHAR, "VARCHAR"); + typemap.put(Types.CHAR, "VARCHAR"); + //The "Hacks", these don't have a direct mapping + //typemap.put(Types.DATE, "VARCHAR"); + //typemap.put(Types.DATE, "TIMESTAMP"); + } + + protected final String music_ns; + protected final String myId; + protected final String[] allReplicaIds; + private final String musicAddress; + private final int music_rfactor; + private MusicConnector mCon = null; + private Session musicSession = null; + private boolean keyspace_created = false; + private Map<String, PreparedStatement> ps_cache = new HashMap<>(); + private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>()); + + public MusicMixin() { + //this.logger = null; + this.musicAddress = null; + this.music_ns = null; + this.music_rfactor = 0; + this.myId = null; + this.allReplicaIds = null; + } + + public MusicMixin(String mdbcServerName, Properties info) throws MDBCServiceException { + // Default values -- should be overridden in the Properties + // Default to using the host_ids of the various peers as the replica IDs (this is probably preferred) + this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS); + logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: musicAddress="+musicAddress); + + String s = info.getProperty(KEY_MUSIC_RFACTOR); + this.music_rfactor = (s == null) ? DEFAULT_MUSIC_RFACTOR : Integer.parseInt(s); + + this.myId = info.getProperty(KEY_MY_ID, getMyHostId()); + logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: myId="+myId); + + + this.allReplicaIds = info.getProperty(KEY_REPLICAS, getAllHostIds()).split(","); + logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: allReplicaIds="+info.getProperty(KEY_REPLICAS, this.myId)); + + this.music_ns = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE); + logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns); + + initializeMetricTables(); + } + + public String getMusicTxDigestTableName(){ + return musicTxDigestTableName; } - /** - * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables. - * The keyspace name comes from the initialization properties passed to the JDBC driver. - */ - @Override + public String getMusicRangeInformationTableName(){ + return musicRangeInformationTableName; + } + + /** + * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables. + * The keyspace name comes from the initialization properties passed to the JDBC driver. + */ + @Override public void createKeyspace() throws MDBCServiceException { Map<String,Object> replicationInfo = new HashMap<>(); @@ -190,202 +218,213 @@ public class MusicMixin implements MusicInterface { PreparedQueryObject queryObject = new PreparedQueryObject(); queryObject.appendQueryString( - "CREATE KEYSPACE IF NOT EXISTS " + this.music_ns + + "CREATE KEYSPACE IF NOT EXISTS " + this.music_ns + " WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":")); try { MusicCore.nonKeyRelatedPut(queryObject, "eventual"); - } catch (MusicServiceException e) { + } catch (MusicServiceException e) { if (!e.getMessage().equals("Keyspace "+music_ns+" already exists")) { - throw new MDBCServiceException("Error creating namespace: "+music_ns+". Internal error:"+e.getErrorMessage()); + throw new MDBCServiceException("Error creating namespace: "+music_ns+". Internal error:"+e.getErrorMessage()); + } + } + } + + private String getMyHostId() { + ResultSet rs = null; + try { + rs = executeMusicRead("SELECT HOST_ID FROM SYSTEM.LOCAL"); + } catch (MDBCServiceException e) { + return "UNKNOWN"; + } + Row row = rs.one(); + return (row == null) ? "UNKNOWN" : row.getUUID("HOST_ID").toString(); + } + private String getAllHostIds() { + ResultSet results = null; + try { + results = executeMusicRead("SELECT HOST_ID FROM SYSTEM.PEERS"); + } catch (MDBCServiceException e) { + } + StringBuilder sb = new StringBuilder(myId); + if(results!=null) { + for (Row row : results) { + sb.append(","); + sb.append(row.getUUID("HOST_ID").toString()); } } + return sb.toString(); + } + + /** + * Get the name of this MusicInterface mixin object. + * @return the name + */ + @Override + public String getMixinName() { + return "cassandra"; + } + /** + * Do what is needed to close down the MUSIC connection. + */ + @Override + public void close() { + if (musicSession != null) { + musicSession.close(); + musicSession = null; + } } - private String getMyHostId() { - ResultSet rs = executeMusicRead("SELECT HOST_ID FROM SYSTEM.LOCAL"); - Row row = rs.one(); - return (row == null) ? "UNKNOWN" : row.getUUID("HOST_ID").toString(); - } - private String getAllHostIds() { - ResultSet results = executeMusicRead("SELECT HOST_ID FROM SYSTEM.PEERS"); - StringBuilder sb = new StringBuilder(myId); - for (Row row : results) { - sb.append(","); - sb.append(row.getUUID("HOST_ID").toString()); - } - return sb.toString(); - } - - /** - * Get the name of this MusicInterface mixin object. - * @return the name - */ - @Override - public String getMixinName() { - return "cassandra"; - } - /** - * Do what is needed to close down the MUSIC connection. - */ - @Override - public void close() { - if (musicSession != null) { - musicSession.close(); - musicSession = null; - } - } - - /** - * This function is used to created all the required data structures, both local - */ - private void initializeMetricTables() throws MDBCServiceException { - createKeyspace(); - try { + /** + * This function is used to created all the required data structures, both local + */ + private void initializeMetricTables() throws MDBCServiceException { + createKeyspace(); + try { createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number - createMusicRangeInformationTable(); - } - catch(MDBCServiceException e){ + createMusicRangeInformationTable(); + } + catch(MDBCServiceException e){ logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC"); } - } - - /** - * This method performs all necessary initialization in Music/Cassandra to store the table <i>tableName</i>. - * @param tableName the table to initialize MUSIC for - */ - @Override - public void initializeMusicForTable(TableInfo ti, String tableName) { - /** - * This code creates two tables for every table in SQL: - * (i) a table with the exact same name as the SQL table storing the SQL data. - * (ii) a "dirty bits" table that stores the keys in the Cassandra table that are yet to be - * updated in the SQL table (they were written by some other node). - */ - StringBuilder fields = new StringBuilder(); - StringBuilder prikey = new StringBuilder(); - String pfx = "", pfx2 = ""; - for (int i = 0; i < ti.columns.size(); i++) { - fields.append(pfx) - .append(ti.columns.get(i)) - .append(" ") - .append(typemap.get(ti.coltype.get(i))); - if (ti.iskey.get(i)) { - // Primary key column - prikey.append(pfx2).append(ti.columns.get(i)); - pfx2 = ", "; - } - pfx = ", "; - } - if (prikey.length()==0) { - fields.append(pfx).append(MDBC_PRIMARYKEY_NAME) - .append(" ") - .append(MDBC_PRIMARYKEY_TYPE); - prikey.append(MDBC_PRIMARYKEY_NAME); - } - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", music_ns, tableName, fields.toString(), prikey.toString()); - executeMusicWriteQuery(cql); - } - - // ************************************************** - // Dirty Tables (in MUSIC) methods - // ************************************************** - - /** - * Create a <i>dirty row</i> table for the real table <i>tableName</i>. The primary keys columns from the real table are recreated in - * the dirty table, along with a "REPLICA__" column that names the replica that should update it's internal state from MUSIC. - * @param tableName the table to create a "dirty" table for - */ - @Override - public void createDirtyRowTable(TableInfo ti, String tableName) { - // create dirtybitsTable at all replicas + } + + /** + * This method performs all necessary initialization in Music/Cassandra to store the table <i>tableName</i>. + * @param tableName the table to initialize MUSIC for + */ + @Override + public void initializeMusicForTable(TableInfo ti, String tableName) { + /** + * This code creates two tables for every table in SQL: + * (i) a table with the exact same name as the SQL table storing the SQL data. + * (ii) a "dirty bits" table that stores the keys in the Cassandra table that are yet to be + * updated in the SQL table (they were written by some other node). + */ + StringBuilder fields = new StringBuilder(); + StringBuilder prikey = new StringBuilder(); + String pfx = "", pfx2 = ""; + for (int i = 0; i < ti.columns.size(); i++) { + fields.append(pfx) + .append(ti.columns.get(i)) + .append(" ") + .append(typemap.get(ti.coltype.get(i))); + if (ti.iskey.get(i)) { + // Primary key column + prikey.append(pfx2).append(ti.columns.get(i)); + pfx2 = ", "; + } + pfx = ", "; + } + if (prikey.length()==0) { + fields.append(pfx).append(MDBC_PRIMARYKEY_NAME) + .append(" ") + .append(MDBC_PRIMARYKEY_TYPE); + prikey.append(MDBC_PRIMARYKEY_NAME); + } + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", music_ns, tableName, fields.toString(), prikey.toString()); + executeMusicWriteQuery(cql); + } + + // ************************************************** + // Dirty Tables (in MUSIC) methods + // ************************************************** + + /** + * Create a <i>dirty row</i> table for the real table <i>tableName</i>. The primary keys columns from the real table are recreated in + * the dirty table, along with a "REPLICA__" column that names the replica that should update it's internal state from MUSIC. + * @param tableName the table to create a "dirty" table for + */ + @Override + public void createDirtyRowTable(TableInfo ti, String tableName) { + // create dirtybitsTable at all replicas // for (String repl : allReplicaIds) { //// String dirtyRowsTableName = "dirty_"+tableName+"_"+allReplicaIds[i]; //// String dirtyTableQuery = "CREATE TABLE IF NOT EXISTS "+music_ns+"."+ dirtyRowsTableName+" (dirtyRowKeys text PRIMARY KEY);"; // cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s_%s (dirtyRowKeys TEXT PRIMARY KEY);", music_ns, tableName, repl); // executeMusicWriteQuery(cql); // } - StringBuilder ddl = new StringBuilder("REPLICA__ TEXT"); - StringBuilder cols = new StringBuilder("REPLICA__"); - for (int i = 0; i < ti.columns.size(); i++) { - if (ti.iskey.get(i)) { - // Only use the primary keys columns in the "Dirty" table - ddl.append(", ") - .append(ti.columns.get(i)) - .append(" ") - .append(typemap.get(ti.coltype.get(i))); - cols.append(", ").append(ti.columns.get(i)); - } - } - if(cols.length()==0) { - //fixme - System.err.println("Create dirty row table found no primary key"); - } - ddl.append(", PRIMARY KEY(").append(cols).append(")"); - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s (%s);", music_ns, tableName, ddl.toString()); - executeMusicWriteQuery(cql); - } - /** - * Drop the dirty row table for <i>tableName</i> from MUSIC. - * @param tableName the table being dropped - */ - @Override - public void dropDirtyRowTable(String tableName) { - String cql = String.format("DROP TABLE %s.DIRTY_%s;", music_ns, tableName); - executeMusicWriteQuery(cql); - } - /** - * Mark rows as "dirty" in the dirty rows table for <i>tableName</i>. Rows are marked for all replicas but - * this one (this replica already has the up to date data). - * @param tableName the table we are marking dirty - * @param keys an ordered list of the values being put into the table. The values that correspond to the tables' - * primary key are copied into the dirty row table. - */ - @Override - public void markDirtyRow(TableInfo ti, String tableName, JSONObject keys) { - Object[] keyObj = getObjects(ti,tableName, keys); - StringBuilder cols = new StringBuilder("REPLICA__"); - PreparedQueryObject pQueryObject = null; - StringBuilder vals = new StringBuilder("?"); - List<Object> vallist = new ArrayList<Object>(); - vallist.add(""); // placeholder for replica - for (int i = 0; i < ti.columns.size(); i++) { - if (ti.iskey.get(i)) { - cols.append(", ").append(ti.columns.get(i)); - vals.append(", ").append("?"); - vallist.add(keyObj[i]); - } - } - if(cols.length()==0) { - //FIXME - System.err.println("markDIrtyRow need to fix primary key"); - } - String cql = String.format("INSERT INTO %s.DIRTY_%s (%s) VALUES (%s);", music_ns, tableName, cols.toString(), vals.toString()); + StringBuilder ddl = new StringBuilder("REPLICA__ TEXT"); + StringBuilder cols = new StringBuilder("REPLICA__"); + for (int i = 0; i < ti.columns.size(); i++) { + if (ti.iskey.get(i)) { + // Only use the primary keys columns in the "Dirty" table + ddl.append(", ") + .append(ti.columns.get(i)) + .append(" ") + .append(typemap.get(ti.coltype.get(i))); + cols.append(", ").append(ti.columns.get(i)); + } + } + if(cols.length()==0) { + //fixme + System.err.println("Create dirty row table found no primary key"); + } + ddl.append(", PRIMARY KEY(").append(cols).append(")"); + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s (%s);", music_ns, tableName, ddl.toString()); + executeMusicWriteQuery(cql); + } + /** + * Drop the dirty row table for <i>tableName</i> from MUSIC. + * @param tableName the table being dropped + */ + @Override + public void dropDirtyRowTable(String tableName) { + String cql = String.format("DROP TABLE %s.DIRTY_%s;", music_ns, tableName); + executeMusicWriteQuery(cql); + } + /** + * Mark rows as "dirty" in the dirty rows table for <i>tableName</i>. Rows are marked for all replicas but + * this one (this replica already has the up to date data). + * @param tableName the table we are marking dirty + * @param keys an ordered list of the values being put into the table. The values that correspond to the tables' + * primary key are copied into the dirty row table. + */ + @Override + public void markDirtyRow(TableInfo ti, String tableName, JSONObject keys) { + Object[] keyObj = getObjects(ti,tableName, keys); + StringBuilder cols = new StringBuilder("REPLICA__"); + PreparedQueryObject pQueryObject = null; + StringBuilder vals = new StringBuilder("?"); + List<Object> vallist = new ArrayList<Object>(); + vallist.add(""); // placeholder for replica + for (int i = 0; i < ti.columns.size(); i++) { + if (ti.iskey.get(i)) { + cols.append(", ").append(ti.columns.get(i)); + vals.append(", ").append("?"); + vallist.add(keyObj[i]); + } + } + if(cols.length()==0) { + //FIXME + System.err.println("markDIrtyRow need to fix primary key"); + } + String cql = String.format("INSERT INTO %s.DIRTY_%s (%s) VALUES (%s);", music_ns, tableName, cols.toString(), vals.toString()); /*Session sess = getMusicSession(); PreparedStatement ps = getPreparedStatementFromCache(cql);*/ - String primaryKey; - if(ti.hasKey()) { - primaryKey = getMusicKeyFromRow(ti,tableName, keys); - } - else { - primaryKey = getMusicKeyFromRowWithoutPrimaryIndexes(ti,tableName, keys); - } - System.out.println("markDirtyRow: PK value: "+primaryKey); - - Object pkObj = null; - for (int i = 0; i < ti.columns.size(); i++) { - if (ti.iskey.get(i)) { - pkObj = keyObj[i]; - } - } - for (String repl : allReplicaIds) { - pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - pQueryObject.addValue(tableName); - pQueryObject.addValue(repl); - pQueryObject.addValue(pkObj); - updateMusicDB(tableName, primaryKey, pQueryObject); - //if (!repl.equals(myId)) { + String primaryKey; + if(ti.hasKey()) { + primaryKey = getMusicKeyFromRow(ti,tableName, keys); + } + else { + primaryKey = getMusicKeyFromRowWithoutPrimaryIndexes(ti,tableName, keys); + } + System.out.println("markDirtyRow: PK value: "+primaryKey); + + Object pkObj = null; + for (int i = 0; i < ti.columns.size(); i++) { + if (ti.iskey.get(i)) { + pkObj = keyObj[i]; + } + } + for (String repl : allReplicaIds) { + pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + pQueryObject.addValue(tableName); + pQueryObject.addValue(repl); + pQueryObject.addValue(pkObj); + updateMusicDB(tableName, primaryKey, pQueryObject); + //if (!repl.equals(myId)) { /*logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql); vallist.set(0, repl); BoundStatement bound = ps.bind(vallist.toArray()); @@ -393,37 +432,37 @@ public class MusicMixin implements MusicInterface { synchronized (sess) { sess.execute(bound); }*/ - //} - - } - } - /** - * Remove the entries from the dirty row (for this replica) that correspond to a set of primary keys - * @param tableName the table we are removing dirty entries from - * @param keys the primary key values to use in the DELETE. Note: this is *only* the primary keys, not a full table row. - */ - @Override - public void cleanDirtyRow(TableInfo ti, String tableName, JSONObject keys) { - Object[] keysObjects = getObjects(ti,tableName,keys); - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - StringBuilder cols = new StringBuilder("REPLICA__=?"); - List<Object> vallist = new ArrayList<Object>(); - vallist.add(myId); - int n = 0; - for (int i = 0; i < ti.columns.size(); i++) { - if (ti.iskey.get(i)) { - cols.append(" AND ").append(ti.columns.get(i)).append("=?"); - vallist.add(keysObjects[n++]); - pQueryObject.addValue(keysObjects[n++]); - } - } - String cql = String.format("DELETE FROM %s.DIRTY_%s WHERE %s;", music_ns, tableName, cols.toString()); - logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql); - pQueryObject.appendQueryString(cql); + //} + + } + } + /** + * Remove the entries from the dirty row (for this replica) that correspond to a set of primary keys + * @param tableName the table we are removing dirty entries from + * @param keys the primary key values to use in the DELETE. Note: this is *only* the primary keys, not a full table row. + */ + @Override + public void cleanDirtyRow(TableInfo ti, String tableName, JSONObject keys) { + Object[] keysObjects = getObjects(ti,tableName,keys); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + StringBuilder cols = new StringBuilder("REPLICA__=?"); + List<Object> vallist = new ArrayList<Object>(); + vallist.add(myId); + int n = 0; + for (int i = 0; i < ti.columns.size(); i++) { + if (ti.iskey.get(i)) { + cols.append(" AND ").append(ti.columns.get(i)).append("=?"); + vallist.add(keysObjects[n++]); + pQueryObject.addValue(keysObjects[n++]); + } + } + String cql = String.format("DELETE FROM %s.DIRTY_%s WHERE %s;", music_ns, tableName, cols.toString()); + logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql); + pQueryObject.appendQueryString(cql); ReturnType rt = MusicCore.eventualPut(pQueryObject); - if(rt.getResult().getResult().toLowerCase().equals("failure")) { - System.out.println("Failure while cleanDirtyRow..."+rt.getMessage()); - } + if(rt.getResult().getResult().toLowerCase().equals("failure")) { + System.out.println("Failure while cleanDirtyRow..."+rt.getMessage()); + } /*Session sess = getMusicSession(); PreparedStatement ps = getPreparedStatementFromCache(cql); BoundStatement bound = ps.bind(vallist.toArray()); @@ -431,18 +470,18 @@ public class MusicMixin implements MusicInterface { synchronized (sess) { sess.execute(bound); }*/ - } - /** - * Get a list of "dirty rows" for a table. The dirty rows returned apply only to this replica, - * and consist of a Map of primary key column names and values. - * @param tableName the table we are querying for - * @return a list of maps; each list item is a map of the primary key names and values for that "dirty row". - */ - @Override - public List<Map<String,Object>> getDirtyRows(TableInfo ti, String tableName) { - String cql = String.format("SELECT * FROM %s.DIRTY_%s WHERE REPLICA__=?;", music_ns, tableName); - ResultSet results = null; - logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql); + } + /** + * Get a list of "dirty rows" for a table. The dirty rows returned apply only to this replica, + * and consist of a Map of primary key column names and values. + * @param tableName the table we are querying for + * @return a list of maps; each list item is a map of the primary key names and values for that "dirty row". + */ + @Override + public List<Map<String,Object>> getDirtyRows(TableInfo ti, String tableName) { + String cql = String.format("SELECT * FROM %s.DIRTY_%s WHERE REPLICA__=?;", music_ns, tableName); + ResultSet results = null; + logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql); /*Session sess = getMusicSession(); PreparedStatement ps = getPreparedStatementFromCache(cql); @@ -451,111 +490,111 @@ public class MusicMixin implements MusicInterface { synchronized (sess) { results = sess.execute(bound); }*/ - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - try { - results = MusicCore.get(pQueryObject); - } catch (MusicServiceException e) { - - e.printStackTrace(); - } - - ColumnDefinitions cdef = results.getColumnDefinitions(); - List<Map<String,Object>> list = new ArrayList<Map<String,Object>>(); - for (Row row : results) { - Map<String,Object> objs = new HashMap<String,Object>(); - for (int i = 0; i < cdef.size(); i++) { - String colname = cdef.getName(i).toUpperCase(); - String coltype = cdef.getType(i).getName().toString().toUpperCase(); - if (!colname.equals("REPLICA__")) { - switch (coltype) { - case "BIGINT": - objs.put(colname, row.getLong(colname)); - break; - case "BOOLEAN": - objs.put(colname, row.getBool(colname)); - break; - case "BLOB": - objs.put(colname, row.getString(colname)); - break; - case "DATE": - objs.put(colname, row.getString(colname)); - break; - case "DOUBLE": - objs.put(colname, row.getDouble(colname)); - break; - case "DECIMAL": - objs.put(colname, row.getDecimal(colname)); - break; - case "INT": - objs.put(colname, row.getInt(colname)); - break; - case "TIMESTAMP": - objs.put(colname, row.getTimestamp(colname)); - break; - case "VARCHAR": - default: - objs.put(colname, row.getString(colname)); - break; - } - } - } - list.add(objs); - } - return list; - } - - /** - * Drops the named table and its dirty row table (for all replicas) from MUSIC. The dirty row table is dropped first. - * @param tableName This is the table that has been dropped - */ - @Override - public void clearMusicForTable(String tableName) { - dropDirtyRowTable(tableName); - String cql = String.format("DROP TABLE %s.%s;", music_ns, tableName); - executeMusicWriteQuery(cql); - } - /** - * This function is called whenever there is a DELETE to a row on a local SQL table, wherein it updates the - * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. MUSIC propagates - * it to the other replicas. - * - * @param tableName This is the table that has changed. - * @param oldRow This is a copy of the old row being deleted - */ - @Override - public void deleteFromEntityTableInMusic(TableInfo ti, String tableName, JSONObject oldRow) { - Object[] objects = getObjects(ti,tableName,oldRow); - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - if (ti.hasKey()) { - assert(ti.columns.size() == objects.length); - } else { - assert(ti.columns.size()+1 == objects.length); - } - - StringBuilder where = new StringBuilder(); - List<Object> vallist = new ArrayList<Object>(); - String pfx = ""; - for (int i = 0; i < ti.columns.size(); i++) { - if (ti.iskey.get(i)) { - where.append(pfx) - .append(ti.columns.get(i)) - .append("=?"); - vallist.add(objects[i]); - pQueryObject.addValue(objects[i]); - pfx = " AND "; - } - } - if (!ti.hasKey()) { - where.append(MDBC_PRIMARYKEY_NAME + "=?"); - //\FIXME this is wrong, old row is not going to contain the UUID, this needs to be fixed - vallist.add(UUID.fromString((String) objects[0])); - pQueryObject.addValue(UUID.fromString((String) objects[0])); - } - - String cql = String.format("DELETE FROM %s.%s WHERE %s;", music_ns, tableName, where.toString()); - logger.error(EELFLoggerDelegate.errorLogger,"Executing MUSIC write:"+ cql); - pQueryObject.appendQueryString(cql); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + try { + results = MusicCore.get(pQueryObject); + } catch (MusicServiceException e) { + + e.printStackTrace(); + } + + ColumnDefinitions cdef = results.getColumnDefinitions(); + List<Map<String,Object>> list = new ArrayList<Map<String,Object>>(); + for (Row row : results) { + Map<String,Object> objs = new HashMap<String,Object>(); + for (int i = 0; i < cdef.size(); i++) { + String colname = cdef.getName(i).toUpperCase(); + String coltype = cdef.getType(i).getName().toString().toUpperCase(); + if (!colname.equals("REPLICA__")) { + switch (coltype) { + case "BIGINT": + objs.put(colname, row.getLong(colname)); + break; + case "BOOLEAN": + objs.put(colname, row.getBool(colname)); + break; + case "BLOB": + objs.put(colname, row.getString(colname)); + break; + case "DATE": + objs.put(colname, row.getString(colname)); + break; + case "DOUBLE": + objs.put(colname, row.getDouble(colname)); + break; + case "DECIMAL": + objs.put(colname, row.getDecimal(colname)); + break; + case "INT": + objs.put(colname, row.getInt(colname)); + break; + case "TIMESTAMP": + objs.put(colname, row.getTimestamp(colname)); + break; + case "VARCHAR": + default: + objs.put(colname, row.getString(colname)); + break; + } + } + } + list.add(objs); + } + return list; + } + + /** + * Drops the named table and its dirty row table (for all replicas) from MUSIC. The dirty row table is dropped first. + * @param tableName This is the table that has been dropped + */ + @Override + public void clearMusicForTable(String tableName) { + dropDirtyRowTable(tableName); + String cql = String.format("DROP TABLE %s.%s;", music_ns, tableName); + executeMusicWriteQuery(cql); + } + /** + * This function is called whenever there is a DELETE to a row on a local SQL table, wherein it updates the + * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. MUSIC propagates + * it to the other replicas. + * + * @param tableName This is the table that has changed. + * @param oldRow This is a copy of the old row being deleted + */ + @Override + public void deleteFromEntityTableInMusic(TableInfo ti, String tableName, JSONObject oldRow) { + Object[] objects = getObjects(ti,tableName,oldRow); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + if (ti.hasKey()) { + assert(ti.columns.size() == objects.length); + } else { + assert(ti.columns.size()+1 == objects.length); + } + + StringBuilder where = new StringBuilder(); + List<Object> vallist = new ArrayList<Object>(); + String pfx = ""; + for (int i = 0; i < ti.columns.size(); i++) { + if (ti.iskey.get(i)) { + where.append(pfx) + .append(ti.columns.get(i)) + .append("=?"); + vallist.add(objects[i]); + pQueryObject.addValue(objects[i]); + pfx = " AND "; + } + } + if (!ti.hasKey()) { + where.append(MDBC_PRIMARYKEY_NAME + "=?"); + //\FIXME this is wrong, old row is not going to contain the UUID, this needs to be fixed + vallist.add(UUID.fromString((String) objects[0])); + pQueryObject.addValue(UUID.fromString((String) objects[0])); + } + + String cql = String.format("DELETE FROM %s.%s WHERE %s;", music_ns, tableName, where.toString()); + logger.error(EELFLoggerDelegate.errorLogger,"Executing MUSIC write:"+ cql); + pQueryObject.appendQueryString(cql); /*PreparedStatement ps = getPreparedStatementFromCache(cql); BoundStatement bound = ps.bind(vallist.toArray()); @@ -564,61 +603,68 @@ public class MusicMixin implements MusicInterface { synchronized (sess) { sess.execute(bound); }*/ - String primaryKey = getMusicKeyFromRow(ti,tableName, oldRow); + String primaryKey = getMusicKeyFromRow(ti,tableName, oldRow); updateMusicDB(tableName, primaryKey, pQueryObject); - // Mark the dirty rows in music for all the replicas but us - markDirtyRow(ti,tableName, oldRow); - } - - public Set<String> getMusicTableSet(String ns) { - Set<String> set = new TreeSet<String>(); - String cql = String.format("SELECT TABLE_NAME FROM SYSTEM_SCHEMA.TABLES WHERE KEYSPACE_NAME = '%s'", ns); - ResultSet rs = executeMusicRead(cql); - for (Row row : rs) { - set.add(row.getString("TABLE_NAME").toUpperCase()); - } - return set; - } - /** - * This method is called whenever there is a SELECT on a local SQL table, wherein it first checks the local - * dirty bits table to see if there are any keys in Cassandra whose value has not yet been sent to SQL - * @param tableName This is the table on which the select is being performed - */ - @Override - public void readDirtyRowsAndUpdateDb(DBInterface dbi, String tableName) { - // Read dirty rows of this table from Music - TableInfo ti = dbi.getTableInfo(tableName); - List<Map<String,Object>> objlist = getDirtyRows(ti,tableName); - PreparedQueryObject pQueryObject = null; - String pre_cql = String.format("SELECT * FROM %s.%s WHERE ", music_ns, tableName); - List<Object> vallist = new ArrayList<Object>(); - StringBuilder sb = new StringBuilder(); - //\TODO Perform a batch operation instead of each row at a time - for (Map<String,Object> map : objlist) { - pQueryObject = new PreparedQueryObject(); - sb.setLength(0); - vallist.clear(); - String pfx = ""; - for (String key : map.keySet()) { - sb.append(pfx).append(key).append("=?"); - vallist.add(map.get(key)); - pQueryObject.addValue(map.get(key)); - pfx = " AND "; - } - - String cql = pre_cql + sb.toString(); - System.out.println("readDirtyRowsAndUpdateDb: cql: "+cql); - pQueryObject.appendQueryString(cql); - ResultSet dirtyRows = null; - try { - //\TODO Why is this an eventual put?, this should be an atomic - dirtyRows = MusicCore.get(pQueryObject); - } catch (MusicServiceException e) { - - e.printStackTrace(); - } + // Mark the dirty rows in music for all the replicas but us + markDirtyRow(ti,tableName, oldRow); + } + + public Set<String> getMusicTableSet(String ns) { + Set<String> set = new TreeSet<String>(); + String cql = String.format("SELECT TABLE_NAME FROM SYSTEM_SCHEMA.TABLES WHERE KEYSPACE_NAME = '%s'", ns); + ResultSet rs = null; + try { + rs = executeMusicRead(cql); + } catch (MDBCServiceException e) { + e.printStackTrace(); + } + if(rs!=null) { + for (Row row : rs) { + set.add(row.getString("TABLE_NAME").toUpperCase()); + } + } + return set; + } + /** + * This method is called whenever there is a SELECT on a local SQL table, wherein it first checks the local + * dirty bits table to see if there are any keys in Cassandra whose value has not yet been sent to SQL + * @param tableName This is the table on which the select is being performed + */ + @Override + public void readDirtyRowsAndUpdateDb(DBInterface dbi, String tableName) { + // Read dirty rows of this table from Music + TableInfo ti = dbi.getTableInfo(tableName); + List<Map<String,Object>> objlist = getDirtyRows(ti,tableName); + PreparedQueryObject pQueryObject = null; + String pre_cql = String.format("SELECT * FROM %s.%s WHERE ", music_ns, tableName); + List<Object> vallist = new ArrayList<Object>(); + StringBuilder sb = new StringBuilder(); + //\TODO Perform a batch operation instead of each row at a time + for (Map<String,Object> map : objlist) { + pQueryObject = new PreparedQueryObject(); + sb.setLength(0); + vallist.clear(); + String pfx = ""; + for (String key : map.keySet()) { + sb.append(pfx).append(key).append("=?"); + vallist.add(map.get(key)); + pQueryObject.addValue(map.get(key)); + pfx = " AND "; + } + + String cql = pre_cql + sb.toString(); + System.out.println("readDirtyRowsAndUpdateDb: cql: "+cql); + pQueryObject.appendQueryString(cql); + ResultSet dirtyRows = null; + try { + //\TODO Why is this an eventual put?, this should be an atomic + dirtyRows = MusicCore.get(pQueryObject); + } catch (MusicServiceException e) { + + e.printStackTrace(); + } /* Session sess = getMusicSession(); PreparedStatement ps = getPreparedStatementFromCache(cql); @@ -628,58 +674,58 @@ public class MusicMixin implements MusicInterface { synchronized (sess) { dirtyRows = sess.execute(bound); }*/ - List<Row> rows = dirtyRows.all(); - if (rows.isEmpty()) { - // No rows, the row must have been deleted - deleteRowFromSqlDb(dbi,tableName, map); - } else { - for (Row row : rows) { - writeMusicRowToSQLDb(dbi,tableName, row); - } - } - } - } - - private void deleteRowFromSqlDb(DBInterface dbi, String tableName, Map<String, Object> map) { - dbi.deleteRowFromSqlDb(tableName, map); - TableInfo ti = dbi.getTableInfo(tableName); - List<Object> vallist = new ArrayList<Object>(); - for (int i = 0; i < ti.columns.size(); i++) { - if (ti.iskey.get(i)) { - String col = ti.columns.get(i); - Object val = map.get(col); - vallist.add(val); - } - } - cleanDirtyRow(ti, tableName, new JSONObject(vallist)); - } - /** - * This functions copies the contents of a row in Music into the corresponding row in the SQL table - * @param tableName This is the name of the table in both Music and swl - * @param musicRow This is the row in Music that is being copied into SQL - */ - private void writeMusicRowToSQLDb(DBInterface dbi, String tableName, Row musicRow) { - // First construct the map of columns and their values - TableInfo ti = dbi.getTableInfo(tableName); - Map<String, Object> map = new HashMap<String, Object>(); - List<Object> vallist = new ArrayList<Object>(); - String rowid = tableName; - for (String col : ti.columns) { - Object val = getValue(musicRow, col); - map.put(col, val); - if (ti.iskey(col)) { - vallist.add(val); - rowid += "_" + val.toString(); - } - } - - logger.debug("Blocking rowid: "+rowid); - in_progress.add(rowid); // Block propagation of the following INSERT/UPDATE - - dbi.insertRowIntoSqlDb(tableName, map); - - logger.debug("Unblocking rowid: "+rowid); - in_progress.remove(rowid); // Unblock propagation + List<Row> rows = dirtyRows.all(); + if (rows.isEmpty()) { + // No rows, the row must have been deleted + deleteRowFromSqlDb(dbi,tableName, map); + } else { + for (Row row : rows) { + writeMusicRowToSQLDb(dbi,tableName, row); + } + } + } + } + + private void deleteRowFromSqlDb(DBInterface dbi, String tableName, Map<String, Object> map) { + dbi.deleteRowFromSqlDb(tableName, map); + TableInfo ti = dbi.getTableInfo(tableName); + List<Object> vallist = new ArrayList<Object>(); + for (int i = 0; i < ti.columns.size(); i++) { + if (ti.iskey.get(i)) { + String col = ti.columns.get(i); + Object val = map.get(col); + vallist.add(val); + } + } + cleanDirtyRow(ti, tableName, new JSONObject(vallist)); + } + /** + * This functions copies the contents of a row in Music into the corresponding row in the SQL table + * @param tableName This is the name of the table in both Music and swl + * @param musicRow This is the row in Music that is being copied into SQL + */ + private void writeMusicRowToSQLDb(DBInterface dbi, String tableName, Row musicRow) { + // First construct the map of columns and their values + TableInfo ti = dbi.getTableInfo(tableName); + Map<String, Object> map = new HashMap<String, Object>(); + List<Object> vallist = new ArrayList<Object>(); + String rowid = tableName; + for (String col : ti.columns) { + Object val = getValue(musicRow, col); + map.put(col, val); + if (ti.iskey(col)) { + vallist.add(val); + rowid += "_" + val.toString(); + } + } + + logger.debug("Blocking rowid: "+rowid); + in_progress.add(rowid); // Block propagation of the following INSERT/UPDATE + + dbi.insertRowIntoSqlDb(tableName, map); + + logger.debug("Unblocking rowid: "+rowid); + in_progress.remove(rowid); // Unblock propagation // try { // String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString()); @@ -695,8 +741,8 @@ public class MusicMixin implements MusicInterface { // } // } - ti = dbi.getTableInfo(tableName); - cleanDirtyRow(ti, tableName, new JSONObject(vallist)); + ti = dbi.getTableInfo(tableName); + cleanDirtyRow(ti, tableName, new JSONObject(vallist)); // String selectQuery = "select "+ primaryKeyName+" FROM "+tableName+" WHERE "+primaryKeyName+"="+primaryKeyValue+";"; // java.sql.ResultSet rs = executeSQLRead(selectQuery); @@ -712,111 +758,111 @@ public class MusicMixin implements MusicInterface { // e.printStackTrace(); // } - //clean the music dirty bits table + //clean the music dirty bits table // String dirtyRowIdsTableName = music_ns+".DIRTY_"+tableName+"_"+myId; // String deleteQuery = "DELETE FROM "+dirtyRowIdsTableName+" WHERE dirtyRowKeys=$$"+primaryKeyValue+"$$;"; // executeMusicWriteQuery(deleteQuery); - } - private Object getValue(Row musicRow, String colname) { - ColumnDefinitions cdef = musicRow.getColumnDefinitions(); - DataType colType; - try { - colType= cdef.getType(colname); - } - catch(IllegalArgumentException e) { - logger.warn("Colname is not part of table metadata: "+e); - throw e; - } - String typeStr = colType.getName().toString().toUpperCase(); - switch (typeStr) { - case "BIGINT": - return musicRow.getLong(colname); - case "BOOLEAN": - return musicRow.getBool(colname); - case "BLOB": - return musicRow.getString(colname); - case "DATE": - return musicRow.getString(colname); - case "DECIMAL": - return musicRow.getDecimal(colname); - case "DOUBLE": - return musicRow.getDouble(colname); - case "SMALLINT": - case "INT": - return musicRow.getInt(colname); - case "TIMESTAMP": - return musicRow.getTimestamp(colname); - case "UUID": - return musicRow.getUUID(colname); - default: - logger.error(EELFLoggerDelegate.errorLogger, "UNEXPECTED COLUMN TYPE: columname="+colname+", columntype="+typeStr); - // fall thru - case "VARCHAR": - return musicRow.getString(colname); - } - } - - /** - * This method is called whenever there is an INSERT or UPDATE to a local SQL table, wherein it updates the - * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. Music propagates - * it to the other replicas. - * - * @param tableName This is the table that has changed. - * @param changedRow This is information about the row that has changed - */ - @Override - public void updateDirtyRowAndEntityTableInMusic(TableInfo ti, String tableName, JSONObject changedRow) { - // Build the CQL command - Object[] objects = getObjects(ti,tableName,changedRow); - StringBuilder fields = new StringBuilder(); - StringBuilder values = new StringBuilder(); - String rowid = tableName; - Object[] newrow = new Object[objects.length]; - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - String pfx = ""; - int keyoffset=0; - for (int i = 0; i < objects.length; i++) { - if (!ti.hasKey() && i==0) { - //We need to tack on cassandra's uid in place of a primary key - fields.append(MDBC_PRIMARYKEY_NAME); - values.append("?"); - newrow[i] = UUID.fromString((String) objects[i]); - pQueryObject.addValue(newrow[i]); - keyoffset=-1; - pfx = ", "; - continue; - } - fields.append(pfx).append(ti.columns.get(i+keyoffset)); - values.append(pfx).append("?"); - pfx = ", "; - if (objects[i] instanceof byte[]) { - // Cassandra doesn't seem to have a Codec to translate a byte[] to a ByteBuffer - newrow[i] = ByteBuffer.wrap((byte[]) objects[i]); - pQueryObject.addValue(newrow[i]); - } else if (objects[i] instanceof Reader) { - // Cassandra doesn't seem to have a Codec to translate a Reader to a ByteBuffer either... - newrow[i] = ByteBuffer.wrap(readBytesFromReader((Reader) objects[i])); - pQueryObject.addValue(newrow[i]); - } else { - newrow[i] = objects[i]; - pQueryObject.addValue(newrow[i]); - } - if (i+keyoffset>=0 && ti.iskey.get(i+keyoffset)) { - rowid += "_" + newrow[i].toString(); - } - } - - if (in_progress.contains(rowid)) { - // This call to updateDirtyRowAndEntityTableInMusic() was called as a result of a Cassandra -> H2 update; ignore - logger.debug(EELFLoggerDelegate.applicationLogger, "updateDirtyRowAndEntityTableInMusic: bypassing MUSIC update on "+rowid); - - } else { - // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update - String cql = String.format("INSERT INTO %s.%s (%s) VALUES (%s);", music_ns, tableName, fields.toString(), values.toString()); - - pQueryObject.appendQueryString(cql); - String primaryKey = getMusicKeyFromRow(ti,tableName, changedRow); - updateMusicDB(tableName, primaryKey, pQueryObject); + } + private Object getValue(Row musicRow, String colname) { + ColumnDefinitions cdef = musicRow.getColumnDefinitions(); + DataType colType; + try { + colType= cdef.getType(colname); + } + catch(IllegalArgumentException e) { + logger.warn("Colname is not part of table metadata: "+e); + throw e; + } + String typeStr = colType.getName().toString().toUpperCase(); + switch (typeStr) { + case "BIGINT": + return musicRow.getLong(colname); + case "BOOLEAN": + return musicRow.getBool(colname); + case "BLOB": + return musicRow.getString(colname); + case "DATE": + return musicRow.getString(colname); + case "DECIMAL": + return musicRow.getDecimal(colname); + case "DOUBLE": + return musicRow.getDouble(colname); + case "SMALLINT": + case "INT": + return musicRow.getInt(colname); + case "TIMESTAMP": + return musicRow.getTimestamp(colname); + case "UUID": + return musicRow.getUUID(colname); + default: + logger.error(EELFLoggerDelegate.errorLogger, "UNEXPECTED COLUMN TYPE: columname="+colname+", columntype="+typeStr); + // fall thru + case "VARCHAR": + return musicRow.getString(colname); + } + } + + /** + * This method is called whenever there is an INSERT or UPDATE to a local SQL table, wherein it updates the + * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. Music propagates + * it to the other replicas. + * + * @param tableName This is the table that has changed. + * @param changedRow This is information about the row that has changed + */ + @Override + public void updateDirtyRowAndEntityTableInMusic(TableInfo ti, String tableName, JSONObject changedRow) { + // Build the CQL command + Object[] objects = getObjects(ti,tableName,changedRow); + StringBuilder fields = new StringBuilder(); + StringBuilder values = new StringBuilder(); + String rowid = tableName; + Object[] newrow = new Object[objects.length]; + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + String pfx = ""; + int keyoffset=0; + for (int i = 0; i < objects.length; i++) { + if (!ti.hasKey() && i==0) { + //We need to tack on cassandra's uid in place of a primary key + fields.append(MDBC_PRIMARYKEY_NAME); + values.append("?"); + newrow[i] = UUID.fromString((String) objects[i]); + pQueryObject.addValue(newrow[i]); + keyoffset=-1; + pfx = ", "; + continue; + } + fields.append(pfx).append(ti.columns.get(i+keyoffset)); + values.append(pfx).append("?"); + pfx = ", "; + if (objects[i] instanceof byte[]) { + // Cassandra doesn't seem to have a Codec to translate a byte[] to a ByteBuffer + newrow[i] = ByteBuffer.wrap((byte[]) objects[i]); + pQueryObject.addValue(newrow[i]); + } else if (objects[i] instanceof Reader) { + // Cassandra doesn't seem to have a Codec to translate a Reader to a ByteBuffer either... + newrow[i] = ByteBuffer.wrap(readBytesFromReader((Reader) objects[i])); + pQueryObject.addValue(newrow[i]); + } else { + newrow[i] = objects[i]; + pQueryObject.addValue(newrow[i]); + } + if (i+keyoffset>=0 && ti.iskey.get(i+keyoffset)) { + rowid += "_" + newrow[i].toString(); + } + } + + if (in_progress.contains(rowid)) { + // This call to updateDirtyRowAndEntityTableInMusic() was called as a result of a Cassandra -> H2 update; ignore + logger.debug(EELFLoggerDelegate.applicationLogger, "updateDirtyRowAndEntityTableInMusic: bypassing MUSIC update on "+rowid); + + } else { + // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update + String cql = String.format("INSERT INTO %s.%s (%s) VALUES (%s);", music_ns, tableName, fields.toString(), values.toString()); + + pQueryObject.appendQueryString(cql); + String primaryKey = getMusicKeyFromRow(ti,tableName, changedRow); + updateMusicDB(tableName, primaryKey, pQueryObject); /*PreparedStatement ps = getPreparedStatementFromCache(cql); BoundStatement bound = ps.bind(newrow); @@ -825,208 +871,207 @@ public class MusicMixin implements MusicInterface { synchronized (sess) { sess.execute(bound); }*/ - // Mark the dirty rows in music for all the replicas but us - markDirtyRow(ti,tableName, changedRow); - } - } - - - - private byte[] readBytesFromReader(Reader rdr) { - StringBuilder sb = new StringBuilder(); - try { - int ch; - while ((ch = rdr.read()) >= 0) { - sb.append((char)ch); - } - } catch (IOException e) { - logger.warn("readBytesFromReader: "+e); - } - return sb.toString().getBytes(); - } - - protected PreparedStatement getPreparedStatementFromCache(String cql) { - // Note: have to hope that the Session never changes! - if (!ps_cache.containsKey(cql)) { - Session sess = getMusicSession(); - PreparedStatement ps = sess.prepare(cql); - ps_cache.put(cql, ps); - } - return ps_cache.get(cql); - } - - /** - * This method gets a connection to Music - * @return the Cassandra Session to use - */ - protected Session getMusicSession() { - // create cassandra session - if (musicSession == null) { - logger.info(EELFLoggerDelegate.applicationLogger, "Creating New Music Session"); - mCon = new MusicConnector(musicAddress); - musicSession = mCon.getSession(); - } - return musicSession; - } - - /** - * This method executes a write query in Music - * @param cql the CQL to be sent to Cassandra - */ - protected void executeMusicWriteQuery(String cql) { - logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql); - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - ReturnType rt = MusicCore.eventualPut(pQueryObject); - if(rt.getResult().getResult().toLowerCase().equals("failure")) { - logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage()); - } + // Mark the dirty rows in music for all the replicas but us + markDirtyRow(ti,tableName, changedRow); + } + } + + + + private byte[] readBytesFromReader(Reader rdr) { + StringBuilder sb = new StringBuilder(); + try { + int ch; + while ((ch = rdr.read()) >= 0) { + sb.append((char)ch); + } + } catch (IOException e) { + logger.warn("readBytesFromReader: "+e); + } + return sb.toString().getBytes(); + } + + protected PreparedStatement getPreparedStatementFromCache(String cql) { + // Note: have to hope that the Session never changes! + if (!ps_cache.containsKey(cql)) { + Session sess = getMusicSession(); + PreparedStatement ps = sess.prepare(cql); + ps_cache.put(cql, ps); + } + return ps_cache.get(cql); + } + + /** + * This method gets a connection to Music + * @return the Cassandra Session to use + */ + protected Session getMusicSession() { + // create cassandra session + if (musicSession == null) { + logger.info(EELFLoggerDelegate.applicationLogger, "Creating New Music Session"); + mCon = new MusicConnector(musicAddress); + musicSession = mCon.getSession(); + } + return musicSession; + } + + /** + * This method executes a write query in Music + * @param cql the CQL to be sent to Cassandra + */ + protected void executeMusicWriteQuery(String cql) { + logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + ReturnType rt = MusicCore.eventualPut(pQueryObject); + if(rt.getResult().getResult().toLowerCase().equals("failure")) { + logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage()); + } /*Session sess = getMusicSession(); SimpleStatement s = new SimpleStatement(cql); s.setReadTimeoutMillis(60000); synchronized (sess) { sess.execute(s); }*/ - } - - /** - * This method executes a read query in Music - * @param cql the CQL to be sent to Cassandra - * @return a ResultSet containing the rows returned from the query - */ - protected ResultSet executeMusicRead(String cql) { - logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql); - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - ResultSet results = null; - try { - results = MusicCore.get(pQueryObject); - } catch (MusicServiceException e) { - - e.printStackTrace(); - } - return results; - /*Session sess = getMusicSession(); - synchronized (sess) { - return sess.execute(cql); - }*/ - } - - /** - * Returns the default primary key name that this mixin uses - */ - public String getMusicDefaultPrimaryKeyName() { - return MDBC_PRIMARYKEY_NAME; - } - - /** - * Return the function for cassandra's primary key generation - */ - public String generateUniqueKey() { - return MDBCUtils.generateUniqueKey().toString(); - } - - @Override - public String getMusicKeyFromRowWithoutPrimaryIndexes(TableInfo ti, String table, JSONObject dbRow) { - //\TODO this operation is super expensive to perform, both latency and BW - // it is better to add additional where clauses, and have the primary key - // to be composed of known columns of the table - // Adding this primary indexes would be an additional burden to the developers, which spanner - // also does, but otherwise performance is really bad - // At least it should have a set of columns that are guaranteed to be unique - StringBuilder cqlOperation = new StringBuilder(); - cqlOperation.append("SELECT * FROM ") - .append(music_ns) - .append(".") - .append(table); - ResultSet musicResults = executeMusicRead(cqlOperation.toString()); - Object[] dbRowObjects = getObjects(ti,table,dbRow); - while (!musicResults.isExhausted()) { - Row musicRow = musicResults.one(); - if (rowIs(ti, musicRow, dbRowObjects)) { - return ((UUID)getValue(musicRow, MDBC_PRIMARYKEY_NAME)).toString(); - } - } - //should never reach here - return null; - } - - /** - * Checks to see if this row is in list of database entries - * @param ti - * @param musicRow - * @param dbRow - * @return - */ - private boolean rowIs(TableInfo ti, Row musicRow, Object[] dbRow) { - //System.out.println("Comparing " + musicRow.toString()); - boolean sameRow=true; - for (int i=0; i<ti.columns.size(); i++) { - Object val = getValue(musicRow, ti.columns.get(i)); - if (!dbRow[i].equals(val)) { - sameRow=false; - break; - } - } - return sameRow; - } - - @Override - public String getMusicKeyFromRow(TableInfo ti, String tableName, JSONObject row) { - List<String> keyCols = ti.getKeyColumns(); - if(keyCols.isEmpty()){ - throw new IllegalArgumentException("Table doesn't have defined primary indexes "); - } - StringBuilder key = new StringBuilder(); - String pfx = ""; - for(String keyCol: keyCols) { - key.append(pfx); - key.append(row.get(keyCol)); - pfx = ","; - } - String keyStr = key.toString(); - return keyStr; - } - - public void updateMusicDB(String tableName, String primaryKey, PreparedQueryObject pQObject) { - ReturnType rt = MusicCore.eventualPut(pQObject); - if(rt.getResult().getResult().toLowerCase().equals("failure")) { - System.out.println("Failure while critical put..."+rt.getMessage()); - } - } - - /** - * Build a preparedQueryObject that appends a transaction to the mriTable - * @param mriTable - * @param uuid - * @param table - * @param redoUuid - * @return - */ - private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, String table, UUID redoUuid){ + } + + /** + * This method executes a read query in Music + * @param cql the CQL to be sent to Cassandra + * @return a ResultSet containing the rows returned from the query + */ + protected ResultSet executeMusicRead(String cql) throws MDBCServiceException { + logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + ResultSet results = null; + try { + results = MusicCore.get(pQueryObject); + } catch (MusicServiceException e) { + logger.error("Error executing music get operation for query: ["+cql+"]"); + throw new MDBCServiceException("Error executing get: "+e.getMessage()); + } + return results; + } + + /** + * Returns the default primary key name that this mixin uses + */ + public String getMusicDefaultPrimaryKeyName() { + return MDBC_PRIMARYKEY_NAME; + } + + /** + * Return the function for cassandra's primary key generation + */ + @Override + public UUID generateUniqueKey() { + return MDBCUtils.generateUniqueKey(); + } + + @Override + public String getMusicKeyFromRowWithoutPrimaryIndexes(TableInfo ti, String table, JSONObject dbRow) { + //\TODO this operation is super expensive to perform, both latency and BW + // it is better to add additional where clauses, and have the primary key + // to be composed of known columns of the table + // Adding this primary indexes would be an additional burden to the developers, which spanner + // also does, but otherwise performance is really bad + // At least it should have a set of columns that are guaranteed to be unique + StringBuilder cqlOperation = new StringBuilder(); + cqlOperation.append("SELECT * FROM ") + .append(music_ns) + .append(".") + .append(table); + ResultSet musicResults = null; + try { + musicResults = executeMusicRead(cqlOperation.toString()); + } catch (MDBCServiceException e) { + return null; + } + Object[] dbRowObjects = getObjects(ti,table,dbRow); + while (!musicResults.isExhausted()) { + Row musicRow = musicResults.one(); + if (rowIs(ti, musicRow, dbRowObjects)) { + return ((UUID)getValue(musicRow, MDBC_PRIMARYKEY_NAME)).toString(); + } + } + //should never reach here + return null; + } + + /** + * Checks to see if this row is in list of database entries + * @param ti + * @param musicRow + * @param dbRow + * @return + */ + private boolean rowIs(TableInfo ti, Row musicRow, Object[] dbRow) { + //System.out.println("Comparing " + musicRow.toString()); + boolean sameRow=true; + for (int i=0; i<ti.columns.size(); i++) { + Object val = getValue(musicRow, ti.columns.get(i)); + if (!dbRow[i].equals(val)) { + sameRow=false; + break; + } + } + return sameRow; + } + + @Override + public String getMusicKeyFromRow(TableInfo ti, String tableName, JSONObject row) { + List<String> keyCols = ti.getKeyColumns(); + if(keyCols.isEmpty()){ + throw new IllegalArgumentException("Table doesn't have defined primary indexes "); + } + StringBuilder key = new StringBuilder(); + String pfx = ""; + for(String keyCol: keyCols) { + key.append(pfx); + key.append(row.get(keyCol)); + pfx = ","; + } + String keyStr = key.toString(); + return keyStr; + } + + public void updateMusicDB(String tableName, String primaryKey, PreparedQueryObject pQObject) { + ReturnType rt = MusicCore.eventualPut(pQObject); + if(rt.getResult().getResult().toLowerCase().equals("failure")) { + System.out.println("Failure while critical put..."+rt.getMessage()); + } + } + + /** + * Build a preparedQueryObject that appends a transaction to the mriTable + * @param mriTable + * @param uuid + * @param table + * @param redoUuid + * @return + */ + private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, String table, UUID redoUuid){ PreparedQueryObject query = new PreparedQueryObject(); StringBuilder appendBuilder = new StringBuilder(); appendBuilder.append("UPDATE ") - .append(music_ns) - .append(".") - .append(mriTable) - .append(" SET txredolog = txredolog +[('") - .append(table) - .append("',") - .append(redoUuid) - .append(")] WHERE rangeid = ") - .append(uuid) - .append(";"); + .append(music_ns) + .append(".") + .append(mriTable) + .append(" SET txredolog = txredolog +[('") + .append(table) + .append("',") + .append(redoUuid) + .append(")] WHERE rangeid = ") + .append(uuid) + .append(";"); query.appendQueryString(appendBuilder.toString()); return query; } - protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException { - UUID mriIndex = partition.getMusicRangeInformationIndex(); - String lockId; - lockId = MusicCore.createLockReference(fullyQualifiedKey); - //\TODO Handle better failures to acquire locks + protected ReturnType acquireLock(String fullyQualifiedKey, String lockId) throws MDBCServiceException{ ReturnType lockReturn; + //\TODO Handle better failures to acquire locks try { lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId); } catch (MusicLockingException e) { @@ -1039,40 +1084,46 @@ public class MusicMixin implements MusicInterface { logger.error(EELFLoggerDelegate.errorLogger, "Error in executing query music, when locking key: "+fullyQualifiedKey); throw new MDBCServiceException("Error in executing query music, when locking: "+fullyQualifiedKey); } - //\TODO this is wrong, we should have a better way to obtain a lock forcefully, clean the queue and obtain the lock + return lockReturn; + } + + protected List<LockResult> waitForLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException { + List<LockResult> result = new ArrayList<>(); + String lockId; + lockId = MusicCore.createLockReference(fullyQualifiedKey); + ReturnType lockReturn = acquireLock(fullyQualifiedKey,lockId); if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) { - try { - MusicCore.forciblyReleaseLock(fullyQualifiedKey,lockId); - CassaLockStore lockingServiceHandle = MusicCore.getLockingServiceHandle(); - CassaLockStore.LockObject lockOwner = lockingServiceHandle.peekLockQueue(music_ns, - this.musicRangeInformationTableName, mriIndex.toString()); - while(lockOwner.lockRef != lockId) { - MusicCore.forciblyReleaseLock(fullyQualifiedKey, lockOwner.lockRef); - try { - lockOwner = lockingServiceHandle.peekLockQueue(music_ns, - this.musicRangeInformationTableName, mriIndex.toString()); - } catch(NullPointerException e){ - //Ignore null pointer exception - lockId = MusicCore.createLockReference(fullyQualifiedKey); - break; - } + //\TODO Improve the exponential backoff + int n = 1; + int low = 1; + int high = 1000; + Random r = new Random(); + while(MusicCore.whoseTurnIsIt(fullyQualifiedKey)!=lockId){ + try { + Thread.sleep(((int) Math.round(Math.pow(2, n)) * 1000) + + (r.nextInt(high - low) + low)); + } catch (InterruptedException e) { + continue; + } + if(n++==20){ + throw new MDBCServiceException("Lock was impossible to obtain, waited for 20 exponential backoffs!") ; } - lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId); - - } catch (MusicLockingException e) { - throw new MDBCServiceException("Could not lock the corresponding lock"); - } catch (MusicServiceException e) { - logger.error(EELFLoggerDelegate.errorLogger, "Error in music, when locking key: "+fullyQualifiedKey); - throw new MDBCServiceException("Error in music, when locking: "+fullyQualifiedKey); - } catch (MusicQueryException e) { - logger.error(EELFLoggerDelegate.errorLogger, "Error in executing query music, when locking key: "+fullyQualifiedKey); - throw new MDBCServiceException("Error in executing query music, when locking: "+fullyQualifiedKey); } } + partition.setLockId(lockId); + result.add(new LockResult(partition.getMusicRangeInformationIndex(),lockId,true)); + return result; + } + + protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException { + UUID mriIndex = partition.getMusicRangeInformationIndex(); + String lockId; + lockId = MusicCore.createLockReference(fullyQualifiedKey); + ReturnType lockReturn = acquireLock(fullyQualifiedKey,lockId); + //\TODO this is wrong, we should have a better way to obtain a lock forcefully, clean the queue and obtain the lock if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) { - throw new MDBCServiceException("Could not lock the corresponding lock"); + return null; } - //TODO: Java newbie here, verify that this lockId is actually assigned to the global DatabasePartition in the StateManager instance partition.setLockId(lockId); return lockId; } @@ -1092,28 +1143,21 @@ public class MusicMixin implements MusicInterface { * Writes the transaction information to metric's txDigest and musicRangeInformation table * This officially commits the transaction globally */ - @Override - public void commitLog(DatabasePartition partition, - HashMap<Range,StagingTable> transactionDigest, String txId , - TxCommitProgress progressKeeper) throws MDBCServiceException { - if (transactionDigest==null || transactionDigest.size()==0) { - return; - } - logger.info("Commiting lock for " + partition.getMusicRangeInformationIndex() + ", txID=" + txId); - UUID mriIndex = partition.getMusicRangeInformationIndex(); - if(mriIndex==null) { - //\TODO Fetch MriIndex from the Range Information Table - throw new MDBCServiceException("TIT Index retrieval not yet implemented"); - } + @Override + public void commitLog(DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{ + UUID mriIndex = partition.getMusicRangeInformationIndex(); + if(mriIndex==null) { + own(partition.getSnapshot(),partition); + } String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex.toString(); - //0. See if reference to lock was already created - String lockId = partition.getLockId(); - if(lockId == null || lockId.isEmpty()) { - lockId = createAndAssignLock(fullyQualifiedMriKey,partition); - } - - UUID commitId; - //Generate a local commit id + //0. See if reference to lock was already created + String lockId = partition.getLockId(); + if(lockId == null || lockId.isEmpty()) { + waitForLock(fullyQualifiedMriKey,partition); + } + + UUID commitId; + //Generate a local commit id if(progressKeeper.containsTx(txId)) { commitId = progressKeeper.getCommitId(txId); } @@ -1123,7 +1167,7 @@ public class MusicMixin implements MusicInterface { } //Add creation type of transaction digest - //1. Push new row to RRT and obtain its index + //1. Push new row to RRT and obtain its index String serializedTransactionDigest; try { serializedTransactionDigest = MDBCUtils.toString(transactionDigest); @@ -1133,11 +1177,11 @@ public class MusicMixin implements MusicInterface { MusicTxDigestId digestId = new MusicTxDigestId(commitId); addTxDigest(digestId, serializedTransactionDigest); //2. Save RRT index to RQ - if(progressKeeper!= null) { - progressKeeper.setRecordId(txId,digestId); - } - //3. Append RRT index into the corresponding TIT row array - appendToRedoLog(mriIndex,partition,digestId); + if(progressKeeper!= null) { + progressKeeper.setRecordId(txId,digestId); + } + //3. Append RRT index into the corresponding TIT row array + appendToRedoLog(mriIndex,partition,digestId); } /** @@ -1146,91 +1190,104 @@ public class MusicMixin implements MusicInterface { * @param rowValues * @return */ - @SuppressWarnings("unused") - private String getUid(String tableName, String string, Object[] rowValues) { - // - // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update - String cql = String.format("SELECT * FROM %s.%s;", music_ns, tableName); - PreparedStatement ps = getPreparedStatementFromCache(cql); - BoundStatement bound = ps.bind(); - bound.setReadTimeoutMillis(60000); - Session sess = getMusicSession(); - ResultSet rs; - synchronized (sess) { - rs = sess.execute(bound); - } - - //should never reach here - logger.error(EELFLoggerDelegate.errorLogger, "Could not find the row in the primary key"); - return null; - } - - @Override - public Object[] getObjects(TableInfo ti, String tableName, JSONObject row) { - // \FIXME: we may need to add the primary key of the row if it was autogenerated by MUSIC - List<String> cols = ti.columns; - int size = cols.size(); - boolean hasDefault = false; - if(row.has(getMusicDefaultPrimaryKeyName())) { - size++; - hasDefault = true; - } - - Object[] objects = new Object[size]; - int idx = 0; - if(hasDefault) { - objects[idx++] = row.getString(getMusicDefaultPrimaryKeyName()); - } - for(String col : ti.columns) { - objects[idx]=row.get(col); - } - return objects; - } - - @Override - public List<UUID> getPartitionIndexes() { - ArrayList<UUID> partitions = new ArrayList<UUID>(); - String cql = String.format("SELECT rangeid FROM %s.%s", music_ns, musicRangeInformationTableName); - ResultSet rs = executeMusicRead(cql); - for (Row r: rs) { - partitions.add(r.getUUID("rangeid")); - } - return partitions; - } - + @SuppressWarnings("unused") + private String getUid(String tableName, String string, Object[] rowValues) { + // + // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update + String cql = String.format("SELECT * FROM %s.%s;", music_ns, tableName); + PreparedStatement ps = getPreparedStatementFromCache(cql); + BoundStatement bound = ps.bind(); + bound.setReadTimeoutMillis(60000); + Session sess = getMusicSession(); + ResultSet rs; + synchronized (sess) { + rs = sess.execute(bound); + } + + //should never reach here + logger.error(EELFLoggerDelegate.errorLogger, "Could not find the row in the primary key"); + return null; + } + + public Object[] getObjects(TableInfo ti, String tableName, JSONObject row) { + // \FIXME: we may need to add the primary key of the row if it was autogenerated by MUSIC + List<String> cols = ti.columns; + int size = cols.size(); + boolean hasDefault = false; + if(row.has(getMusicDefaultPrimaryKeyName())) { + size++; + hasDefault = true; + } + + Object[] objects = new Object[size]; + int idx = 0; + if(hasDefault) { + objects[idx++] = row.getString(getMusicDefaultPrimaryKeyName()); + } + for(String col : ti.columns) { + objects[idx]=row.get(col); + } + return objects; + } + + @Override + public List<UUID> getPartitionIndexes() throws MDBCServiceException { + ArrayList<UUID> partitions = new ArrayList<UUID>(); + String cql = String.format("SELECT rangeid FROM %s.%s", music_ns, musicRangeInformationTableName); + ResultSet rs = executeMusicRead(cql); + for (Row r: rs) { + partitions.add(r.getUUID("rangeid")); + } + return partitions; + } + + List<Range> getRanges(Row newRow){ + List<Range> partitions = new ArrayList<>(); + Set<String> tables = newRow.getSet("keys",String.class); + for (String table:tables){ + partitions.add(new Range(table)); + } + return partitions; + } + + MusicRangeInformationRow getMRIRowFromCassandraRow(Row newRow){ + UUID partitionIndex = newRow.getUUID("rangeid"); + List<TupleValue> log = newRow.getList("txredolog",TupleValue.class); + List<MusicTxDigestId> digestIds = new ArrayList<>(); + for(TupleValue t: log){ + //final String tableName = t.getString(0); + final UUID index = t.getUUID(1); + digestIds.add(new MusicTxDigestId(index)); + } + List<Range> partitions = new ArrayList<>(); + Set<String> tables = newRow.getSet("keys",String.class); + for (String table:tables){ + partitions.add(new Range(table)); + } + return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex, ""), + digestIds, newRow.getString("ownerid"),newRow.getString("metricprocessid")); + } + @Override public MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException { - //TODO: verify that lock id is valid before calling the database operations function + //TODO: verify that lock id is valid before calling the database operations function //UUID id = partition.getMusicRangeInformationIndex(); String cql = String.format("SELECT * FROM %s.%s WHERE rangeid = ?;", music_ns, musicRangeInformationTableName); - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - pQueryObject.addValue(partitionIndex); - Row newRow; - try { - newRow = executeMusicUnlockedQuorumGet(pQueryObject); - } catch (MDBCServiceException e) { - logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName); - throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); - } - - List<TupleValue> log = newRow.getList("txredolog",TupleValue.class); - List<MusicTxDigestId> digestIds = new ArrayList<>(); - for(TupleValue t: log){ - //final String tableName = t.getString(0); - final UUID index = t.getUUID(1); - digestIds.add(new MusicTxDigestId(index)); - } - List<Range> partitions = new ArrayList<>(); - Set<String> tables = newRow.getSet("keys",String.class); - for (String table:tables){ - partitions.add(new Range(table)); - } - return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex, ""), - digestIds, newRow.getString("ownerid"),newRow.getString("metricprocessid")); - } - + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + pQueryObject.addValue(partitionIndex); + Row newRow; + try { + newRow = executeMusicUnlockedQuorumGet(pQueryObject); + } catch (MDBCServiceException e) { + logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName); + throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); + } + + return getMRIRowFromCassandraRow(newRow); + } + /** * This function creates the TransactionInformation table. It contain information related @@ -1254,7 +1311,7 @@ public class MusicMixin implements MusicInterface { //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly fields.append("txredolog list<frozen<tuple<text,uuid>>> "); String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", - this.music_ns, tableName, fields, priKey); + this.music_ns, tableName, fields, priKey); try { executeMusicWriteQuery(this.music_ns,tableName,cql); } catch (MDBCServiceException e) { @@ -1262,251 +1319,437 @@ public class MusicMixin implements MusicInterface { throw(e); } } - - + + @Override public DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException { - DatabasePartition newPartition = info.getDBPartition(); + DatabasePartition newPartition = info.getDBPartition(); + String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+newPartition.getMusicRangeInformationIndex().toString(); String lockId = createAndAssignLock(fullyQualifiedMriKey,newPartition); - createEmptyMriRow(info.getMetricProcessId(),lockId,new ArrayList<Range>()); - throw new UnsupportedOperationException(); + if(lockId == null || lockId.isEmpty()){ + throw new MDBCServiceException("Error initializing music range information, error creating a lock for a new row") ; + } + createEmptyMriRow(newPartition.getMusicRangeInformationIndex(),info.getMetricProcessId(),lockId,newPartition.getSnapshot()); + return newPartition; } - + /** * Creates a new empty MRI row * @param processId id of the process that is going to own initially this. * @return uuid associated to the new row */ private UUID createEmptyMriRow(String processId, String lockId, List<Range> ranges) - throws MDBCServiceException { + throws MDBCServiceException { UUID id = MDBCUtils.generateUniqueKey(); return createEmptyMriRow(id,processId,lockId,ranges); } - + /** * Creates a new empty MRI row * @param processId id of the process that is going to own initially this. * @return uuid associated to the new row */ private UUID createEmptyMriRow(UUID id, String processId, String lockId, List<Range> ranges) - throws MDBCServiceException{ - StringBuilder insert = new StringBuilder("INSERT INTO ") - .append(this.music_ns) - .append('.') - .append(this.musicRangeInformationTableName) - .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ") - .append("(") - .append(id) - .append(",{"); - boolean first=true; - for(Range r: ranges){ - if(first){ first=false; } - else { - insert.append(','); - } - insert.append("'").append(r.toString()).append("'"); - } - insert.append("},'") - .append((lockId==null)?"":lockId) - .append("','") - .append(processId) - .append("',[]);"); - PreparedQueryObject query = new PreparedQueryObject(); - query.appendQueryString(insert.toString()); - try { - executeMusicLockedPut(this.music_ns,this.musicRangeInformationTableName,id.toString(),query,lockId,null); - } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to add new row to transaction information"); - throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); - } - return id; + throws MDBCServiceException{ + StringBuilder insert = new StringBuilder("INSERT INTO ") + .append(this.music_ns) + .append('.') + .append(this.musicRangeInformationTableName) + .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ") + .append("(") + .append(id) + .append(",{"); + boolean first=true; + for(Range r: ranges){ + if(first){ first=false; } + else { + insert.append(','); + } + insert.append("'").append(r.toString()).append("'"); + } + insert.append("},'") + .append((lockId==null)?"":lockId) + .append("','") + .append(processId) + .append("',[]);"); + PreparedQueryObject query = new PreparedQueryObject(); + query.appendQueryString(insert.toString()); + try { + executeMusicLockedPut(this.music_ns,this.musicRangeInformationTableName,id.toString(),query,lockId,null); + } catch (MDBCServiceException e) { + logger.error("Initialization error: Failure to add new row to transaction information"); + throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); + } + return id; } @Override public void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException { - logger.info("Appending to redo log for partition " + partition.getMusicRangeInformationIndex() + " txId=" + newRecord.txId); - PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, mriRowId, musicTxDigestTableName, newRecord.txId); - ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, mriRowId.toString(), appendQuery, partition.getLockId(), null); - if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){ - logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage()); - throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage()); - } - } - + logger.info("Appending to redo log for partition " + partition.getMusicRangeInformationIndex() + " txId=" + newRecord.txId); + PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, mriRowId, musicTxDigestTableName, newRecord.txId); + ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, mriRowId.toString(), appendQuery, partition.getLockId(), null); + if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){ + logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage()); + throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage()); + } + } + public void createMusicTxDigest() throws MDBCServiceException { - createMusicTxDigest(-1); - } - - - /** - * This function creates the MusicTxDigest table. It contain information related to each transaction committed - * * LeaseId: id associated with the lease, text - * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later - * * TransactionDigest: text that contains all the changes in the transaction - */ - private void createMusicTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException { - String tableName = this.musicTxDigestTableName; - if(musicTxDigestTableNumber >= 0) { - tableName = tableName + - "-" + - Integer.toString(musicTxDigestTableNumber); - } - String priKey = "txid"; - StringBuilder fields = new StringBuilder(); - fields.append("txid uuid, "); - fields.append("transactiondigest text ");//notice lack of ',' - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey); - try { - executeMusicWriteQuery(this.music_ns,tableName,cql); - } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to create redo records table"); - throw(e); - } - } - - - /** - * Writes the transaction history to the txDigest - */ + createMusicTxDigest(-1); + } + + + /** + * This function creates the MusicTxDigest table. It contain information related to each transaction committed + * * LeaseId: id associated with the lease, text + * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later + * * TransactionDigest: text that contains all the changes in the transaction + */ + private void createMusicTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException { + String tableName = this.musicTxDigestTableName; + if(musicTxDigestTableNumber >= 0) { + tableName = tableName + + "-" + + Integer.toString(musicTxDigestTableNumber); + } + String priKey = "txid"; + StringBuilder fields = new StringBuilder(); + fields.append("txid uuid, "); + fields.append("transactiondigest text ");//notice lack of ',' + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey); + try { + executeMusicWriteQuery(this.music_ns,tableName,cql); + } catch (MDBCServiceException e) { + logger.error("Initialization error: Failure to create redo records table"); + throw(e); + } + } + + + /** + * Writes the transaction history to the txDigest + */ @Override public void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException { - //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest); - PreparedQueryObject query = new PreparedQueryObject(); - String cqlQuery = "INSERT INTO " + - this.music_ns + - '.' + - this.musicTxDigestTableName + - " (txid,transactiondigest) " + - "VALUES (" + - newId.txId + ",'" + - transactionDigest + - "');"; - query.appendQueryString(cqlQuery); - //\TODO check if I am not shooting on my own foot - try { - MusicCore.nonKeyRelatedPut(query,"critical"); - } catch (MusicServiceException e) { - logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage()); - throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString()); - } - } - + //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest); + PreparedQueryObject query = new PreparedQueryObject(); + String cqlQuery = "INSERT INTO " + + this.music_ns + + '.' + + this.musicTxDigestTableName + + " (txid,transactiondigest) " + + "VALUES (" + + newId.txId + ",'" + + transactionDigest + + "');"; + query.appendQueryString(cqlQuery); + //\TODO check if I am not shooting on my own foot + try { + MusicCore.nonKeyRelatedPut(query,"critical"); + } catch (MusicServiceException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage()); + throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString()); + } + } + @Override + public HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException { + String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + pQueryObject.addValue(id.txId); + Row newRow; + try { + newRow = executeMusicUnlockedQuorumGet(pQueryObject); + } catch (MDBCServiceException e) { + logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.txId); + throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); + } + String digest = newRow.getString("transactiondigest"); + HashMap<Range,StagingTable> changes; + try { + changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest); + } catch (IOException e) { + logger.error("IOException when deserializing digest failed with an invalid class for id:"+id.txId); + throw new MDBCServiceException("Deserializng digest failed with ioexception"); + } catch (ClassNotFoundException e) { + logger.error("Deserializng digest failed with an invalid class for id:"+id.txId); + throw new MDBCServiceException("Deserializng digest failed with an invalid class"); + } + return changes; + } + + /** + * This function is used to find all the related uuids associated with the required ranges + * @param ranges ranges to be find + * @return a map that associated each MRI row to the corresponding ranges + */ + private Map<UUID,List<Range>> findRangeRows(List<Range> ranges) throws MDBCServiceException { + /* \TODO this function needs to be improved, by creating an additional index, or at least keeping a local cache + Additionally, we should at least used pagination and the token function, to avoid retrieving the whole table at + once, this can become problematic if we have too many connections in the overall METRIC system */ + Map<UUID,List<Range>> result = new HashMap<>(); + List<Range> rangesCopy = new LinkedList<>(ranges); + int counter=0; + StringBuilder cqlOperation = new StringBuilder(); + cqlOperation.append("SELECT * FROM ") + .append(music_ns) + .append(".") + .append(musicRangeInformationTableName); + ResultSet musicResults = executeMusicRead(cqlOperation.toString()); + while (!musicResults.isExhausted()) { + Row musicRow = musicResults.one(); + UUID mriIndex = musicRow.getUUID("rangeid"); + final List<Range> musicRanges = getRanges(musicRow); + for(Range retrievedRange : musicRanges) { + for (Iterator<Range> iterator = rangesCopy.iterator(); iterator.hasNext(); ) { + Range range = iterator.next(); + if (retrievedRange.overlaps(range)) { + // Remove the current element from the iterator and the list. + if(!result.containsKey(mriIndex)){ + result.put(mriIndex,new ArrayList<>()); + } + List<Range> foundRanges = result.get(mriIndex); + foundRanges.add(range); + iterator.remove(); + } + } + } + } + if(!rangesCopy.isEmpty()){ + StringBuilder tables = new StringBuilder(); + for(Range range: rangesCopy){ + tables.append(range.toString()).append(','); + } + logger.error("Row in MRI doesn't exist for tables [ "+tables.toString()+"]"); + throw new MDBCServiceException("MRI row doesn't exist for tables "+tables.toString()); + } + return result; + } + + private List<LockResult> lockRow(UUID rowId, List<Range> ranges, DatabasePartition partition) + throws MDBCServiceException { + List<LockResult> result = new ArrayList<>(); + if(partition.getMusicRangeInformationIndex()==rowId){ + result.add(new LockResult(rowId,partition.getLockId(),false)); + return result; + } + //\TODO: this function needs to be improved, to track possible changes in the owner of a set of ranges + String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+rowId.toString(); + //return List<Range> knownRanges, UUID mriIndex, String lockId + DatabasePartition newPartition = new DatabasePartition(ranges,rowId,null); + return waitForLock(fullyQualifiedMriKey,newPartition); + } + + @Override + public OwnershipReturn own(List<Range> ranges, DatabasePartition partition) throws MDBCServiceException { + UUID newId = generateUniqueKey(); + return appendRange(newId.toString(),ranges,partition); + } + + /** + * This function is used to check if we need to create a new row in MRI, beacause one of the new ranges is not contained + * @param ranges ranges that should be contained in the partition + * @param partition currently own partition + * @return + */ + public boolean isAppendRequired(List<Range> ranges, DatabasePartition partition){ + for(Range r: ranges){ + if(!partition.isContained(r)){ + return true; + } + } + return false; + } + + private List<UUID> mergeMriRows(String newId, Map<UUID,LockResult> lock, DatabasePartition partition) + throws MDBCServiceException { + List<UUID> oldIds = new ArrayList<>(); + List<Range> newRanges = new ArrayList<>(); + for (Map.Entry<UUID,LockResult> entry : lock.entrySet()) { + oldIds.add(entry.getKey()); + final MusicRangeInformationRow mriRow = getMusicRangeInformation(entry.getKey()); + final DatabasePartition dbPartition = mriRow.getDBPartition(); + newRanges.addAll(dbPartition.getSnapshot()); + } + DatabasePartition newPartition = new DatabasePartition(newRanges,UUID.fromString(newId),null); + String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+newId; + final List<LockResult> lockResults = waitForLock(fullyQualifiedMriKey, newPartition); + if(lockResults.size()!=1||!lockResults.get(0).newLock){ + logger.error("When merging rows, lock returned an invalid error"); + throw new MDBCServiceException("When merging MRI rows, lock returned an invalid error"); + } + final LockResult lockResult = lockResults.get(0); + partition.updateDatabasePartition(newPartition); + createEmptyMriRow(partition.getMusicRangeInformationIndex(),myId,lockResult.ownerId,partition.getSnapshot()); + return oldIds; + } @Override - public HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException { - String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName); - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - pQueryObject.addValue(id.txId); - Row newRow; - try { - newRow = executeMusicUnlockedQuorumGet(pQueryObject); - } catch (MDBCServiceException e) { - logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.txId); - throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); - } - String digest = newRow.getString("transactiondigest"); - HashMap<Range,StagingTable> changes; - try { - changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest); - } catch (IOException e) { - logger.error("IOException when deserializing digest failed with an invalid class for id:"+id.txId); - throw new MDBCServiceException("Deserializng digest failed with ioexception"); - } catch (ClassNotFoundException e) { - logger.error("Deserializng digest failed with an invalid class for id:"+id.txId); - throw new MDBCServiceException("Deserializng digest failed with an invalid class"); - } - return changes; + public OwnershipReturn appendRange(String rangeId, List<Range> ranges, DatabasePartition partition) + throws MDBCServiceException { + if(!isAppendRequired(ranges,partition)){ + return new OwnershipReturn(partition.getLockId(),UUID.fromString(rangeId),null); + } + Map<UUID,List<Range>> rows = findRangeRows(ranges); + HashMap<UUID,LockResult> rowLock=new HashMap<>(); + boolean newLock = false; + //\TODO: perform this operations in parallel + for(Map.Entry<UUID,List<Range>> row : rows.entrySet()){ + List<LockResult> locks; + try { + locks = lockRow(row.getKey(),row.getValue(), partition); + } catch (MDBCServiceException e) { + //TODO: Make a decision if retry or just fail? + logger.error("Error locking row"); + throw e; + } + for(LockResult l : locks){ + newLock = newLock || l.getNewLock(); + rowLock.put(l.getIndex(),l); + } + } + String lockId; + List<UUID> oldIds = null; + if(rowLock.size()!=1){ + oldIds = mergeMriRows(rangeId, rowLock, partition); + lockId = partition.getLockId(); + } + else{ + List<LockResult> list = new ArrayList<>(rowLock.values()); + LockResult lockResult = list.get(0); + lockId = lockResult.getOwnerId(); + } + + return new OwnershipReturn(lockId,UUID.fromString(rangeId),oldIds); } @Override - public void own(List<Range> ranges){ - throw new UnsupportedOperationException(); + public void relinquish(String ownerId, String rangeId) throws MDBCServiceException{ + String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+rangeId; + try { + MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey,ownerId); + } catch (MusicLockingException e) { + throw new MDBCServiceException(e.getMessage()); + } + } + + /** + * This function is used to rate the number of times we relinquish at the end of a transaction + * @return true if we should try to relinquish, else should avoid relinquishing in this iteration + */ + private boolean canTryRelinquishing(){ + return true; } @Override - public void appendRange(String rangeId, List<Range> ranges){ - throw new UnsupportedOperationException(); + public void relinquishIfRequired(DatabasePartition partition) throws MDBCServiceException { + if(!canTryRelinquishing() || !partition.isLocked()){ + return; + } + CassaLockStore lsHandle; + try { + lsHandle = MusicCore.getLockingServiceHandle(); + } catch (MusicLockingException e) { + logger.error("Error obtaining the locking service handle when checking if relinquish was required"); + throw new MDBCServiceException("Error obtaining locking service"+e.getMessage()); + } + long lockQueueSize; + try { + lockQueueSize = lsHandle.getLockQueueSize(music_ns, this.musicRangeInformationTableName, partition.getMusicRangeInformationIndex().toString()); + } catch (MusicServiceException|MusicQueryException e) { + logger.error("Error obtaining the lock queue size"); + throw new MDBCServiceException("Error obtaining lock queue size: "+e.getMessage()); + } + if(lockQueueSize> 1){ + //If there is any other node waiting, we just relinquish ownership + try { + relinquish(partition.getLockId(),partition.getMusicRangeInformationIndex().toString()); + } catch (MDBCServiceException e) { + logger.error("Error relinquishing lock, will use timeout to solve"); + } + partition.setLockId(""); + } } + /** + * This method executes a write query in Music + * @param cql the CQL to be sent to Cassandra + */ + private static void executeMusicWriteQuery(String keyspace, String table, String cql) + throws MDBCServiceException { + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + ResultType rt = null; + try { + rt = MusicCore.createTable(keyspace,table,pQueryObject,"critical"); + } catch (MusicServiceException e) { + //\TODO: handle better, at least transform into an MDBCServiceException + e.printStackTrace(); + } + String result = rt.getResult(); + if (result==null || result.toLowerCase().equals("failure")) { + throw new MDBCServiceException("Music eventual put failed"); + } + } + + private static Row executeMusicLockedGet(String keyspace, String table, PreparedQueryObject cqlObject, String primaryKey, + String lock) + throws MDBCServiceException{ + ResultSet result; + try { + result = MusicCore.criticalGet(keyspace,table,primaryKey,cqlObject,lock); + } catch(MusicServiceException e){ + //\TODO: handle better, at least transform into an MDBCServiceException + e.printStackTrace(); + throw new MDBCServiceException("Error executing critical get"); + } + if(result.isExhausted()){ + throw new MDBCServiceException("There is not a row that matches the id "+primaryKey); + } + return result.one(); + } + + private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject) + throws MDBCServiceException{ + ResultSet result = MusicCore.quorumGet(cqlObject); + //\TODO: handle better, at least transform into an MDBCServiceException + if(result.isExhausted()){ + throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]"); + } + return result.one(); + } + + private void executeMusicLockedPut(String namespace, String tableName, + String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId, + MusicCore.Condition conditionInfo) throws MDBCServiceException { + ReturnType rt ; + if(lockId==null) { + try { + rt = MusicCore.atomicPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, conditionInfo); + } catch (MusicLockingException e) { + logger.error("Music locked put failed"); + throw new MDBCServiceException("Music locked put failed"); + } catch (MusicServiceException e) { + logger.error("Music service fail: Music locked put failed"); + throw new MDBCServiceException("Music service fail: Music locked put failed"); + } catch (MusicQueryException e) { + logger.error("Music query fail: locked put failed"); + throw new MDBCServiceException("Music query fail: Music locked put failed"); + } + } + else { + rt = MusicCore.criticalPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, lockId, conditionInfo); + } + if (rt.getResult().getResult().toLowerCase().equals("failure")) { + throw new MDBCServiceException("Music locked put failed"); + } + } + + @Override - public void relinquish(String ownerId, String rangeId){ - throw new UnsupportedOperationException(); - } - - /** - * This method executes a write query in Music - * @param cql the CQL to be sent to Cassandra - */ - private static void executeMusicWriteQuery(String keyspace, String table, String cql) - throws MDBCServiceException { - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - ResultType rt = null; - try { - rt = MusicCore.createTable(keyspace,table,pQueryObject,"critical"); - } catch (MusicServiceException e) { - //\TODO: handle better, at least transform into an MDBCServiceException - e.printStackTrace(); - } - String result = rt.getResult(); - if (result==null || result.toLowerCase().equals("failure")) { - throw new MDBCServiceException("Music eventual put failed"); - } - } - - private static Row executeMusicLockedGet(String keyspace, String table, PreparedQueryObject cqlObject, String primaryKey, - String lock) - throws MDBCServiceException{ - ResultSet result; - try { - result = MusicCore.criticalGet(keyspace,table,primaryKey,cqlObject,lock); - } catch(MusicServiceException e){ - //\TODO: handle better, at least transform into an MDBCServiceException - e.printStackTrace(); - throw new MDBCServiceException("Error executing critical get"); - } - if(result.isExhausted()){ - throw new MDBCServiceException("There is not a row that matches the id "+primaryKey); - } - return result.one(); - } - - private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject) - throws MDBCServiceException{ - ResultSet result = MusicCore.quorumGet(cqlObject); - //\TODO: handle better, at least transform into an MDBCServiceException - if(result.isExhausted()){ - throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]"); - } - return result.one(); - } - - private void executeMusicLockedPut(String namespace, String tableName, - String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId, - MusicCore.Condition conditionInfo) throws MDBCServiceException { - ReturnType rt ; - if(lockId==null) { - try { - rt = MusicCore.atomicPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, conditionInfo); - } catch (MusicLockingException e) { - logger.error("Music locked put failed"); - throw new MDBCServiceException("Music locked put failed"); - } catch (MusicServiceException e) { - logger.error("Music service fail: Music locked put failed"); - throw new MDBCServiceException("Music service fail: Music locked put failed"); - } catch (MusicQueryException e) { - logger.error("Music query fail: locked put failed"); - throw new MDBCServiceException("Music query fail: Music locked put failed"); - } - } - else { - rt = MusicCore.criticalPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, lockId, conditionInfo); - } - if (rt.getResult().getResult().toLowerCase().equals("failure")) { - throw new MDBCServiceException("Music locked put failed"); - } - } + public void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException{ + throw new NotImplementedException("Error, replay transaction in music mixin needs to be implemented"); + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java index 15384ad..9a8f543 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java @@ -38,6 +38,7 @@ import org.json.JSONObject; import org.json.JSONTokener; import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; import org.onap.music.mdbc.tables.Operation; @@ -49,6 +50,7 @@ import net.sf.jsqlparser.parser.CCJSqlParserUtil; import net.sf.jsqlparser.statement.delete.Delete; import net.sf.jsqlparser.statement.insert.Insert; import net.sf.jsqlparser.statement.update.Update; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; /** * This class provides the methods that MDBC needs in order to mirror data to/from a @@ -586,7 +588,7 @@ NEW.field refers to the new value // the actual columns, otherwise performance when doing range queries are going // to be even worse (see the else bracket down) // - String musicKey = mi.generateUniqueKey(); + String musicKey = MDBCUtils.generateUniqueKey().toString(); /*} else { //get key from data musicKey = msm.getMusicKeyFromRowWithoutPrimaryIndexes(tbl,newRow); @@ -647,7 +649,7 @@ NEW.field refers to the new value JSONObject jo = new JSONObject(); if (!getTableInfo(tableName).hasKey()) { - String musicKey = mi.generateUniqueKey(); + String musicKey = MDBCUtils.generateUniqueKey().toString(); jo.put(mi.getMusicDefaultPrimaryKeyName(), musicKey); } @@ -690,7 +692,12 @@ NEW.field refers to the new value // return null; } - + + + public String applyDigest(Map<Range, StagingTable> digest){ + throw new NotImplementedException(); + } + @SuppressWarnings("unused") @Deprecated private ArrayList<String> getMusicKey(String sql) { @@ -807,7 +814,6 @@ NEW.field refers to the new value /** * Parse the transaction digest into individual events * @param transaction - base 64 encoded, serialized digest - * @param dbi */ public void replayTransaction(HashMap<Range,StagingTable> transaction) throws SQLException { boolean autocommit = jdbcConn.getAutoCommit(); @@ -840,7 +846,7 @@ NEW.field refers to the new value /** * Replays operation into database, usually from txDigest - * @param stmt + * @param jdbcStmt * @param r * @param op * @throws SQLException diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java b/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java index bc9a8fc..c14d5c9 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java @@ -21,22 +21,17 @@ package org.onap.music.mdbc.query; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.calcite.avatica.util.Casing; import org.apache.calcite.avatica.util.Quoting; -import org.apache.calcite.config.Lex; import org.apache.calcite.sql.SqlCall; -import org.apache.calcite.sql.SqlDialect; -import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlInsert; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlSelect; -import org.apache.calcite.sql.SqlSetOption; import org.apache.calcite.sql.SqlUpdate; import org.apache.calcite.sql.fun.SqlInOperator; import org.apache.calcite.sql.parser.SqlParseException; @@ -44,7 +39,6 @@ import org.apache.calcite.sql.parser.SqlParser; import org.apache.calcite.sql.parser.SqlParserImplFactory; import org.apache.calcite.sql.parser.impl.SqlParserImpl; import org.apache.calcite.sql.util.SqlBasicVisitor; -import org.apache.calcite.sql.util.SqlShuttle; import org.apache.calcite.sql.validate.SqlConformance; import org.apache.calcite.sql.validate.SqlConformanceEnum; import org.apache.calcite.util.Util; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java index 7057172..8784a76 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java @@ -71,25 +71,34 @@ public class MusicTxDigest { DBInterface dbi = ((MdbcConnection) stateManager.getConnection("daemon")).getDBInterface(); while (true) { + Thread.sleep(TimeUnit.SECONDS.toMillis(daemonSleepTimeS)); //update logger.info(String.format("[%s] Background MusicTxDigest daemon updating local db", new Timestamp(System.currentTimeMillis()))); //1) get all other partitions from musicrangeinformation - List<UUID> partitions = mi.getPartitionIndexes(); + List<UUID> partitions = null; + try { + partitions = mi.getPartitionIndexes(); + } catch (MDBCServiceException e) { + logger.error("Error obtainting partition indexes, trying again next iteration"); + continue; + } //2) for each partition I don't own - DatabasePartition myPartition = stateManager.getRanges(); - for (UUID partition: partitions) { - if (!partition.equals(myPartition.getMusicRangeInformationIndex())){ - try { - replayDigestForPartition(mi, partition, dbi); - } catch (MDBCServiceException e) { - logger.error("Unable to update for partition : " + partition + ". " + e.getMessage()); - continue; + List<DatabasePartition> ranges = stateManager.getRanges(); + if(ranges.size()!=0) { + DatabasePartition myPartition = ranges.get(0); + for (UUID partition : partitions) { + if (!partition.equals(myPartition.getMusicRangeInformationIndex())) { + try { + replayDigestForPartition(mi, partition, dbi); + } catch (MDBCServiceException e) { + logger.error("Unable to update for partition : " + partition + ". " + e.getMessage()); + continue; + } } } } - Thread.sleep(TimeUnit.SECONDS.toMillis(daemonSleepTimeS)); } } @@ -100,7 +109,7 @@ public class MusicTxDigest { * @param dbi interface to the database that will replay the operations * @throws MDBCServiceException */ - public void replayDigestForPartition(MusicInterface mi, UUID partitionId, DBInterface dbi) throws MDBCServiceException { + public static void replayDigestForPartition(MusicInterface mi, UUID partitionId, DBInterface dbi) throws MDBCServiceException { List<MusicTxDigestId> partitionsRedoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog(); for (MusicTxDigestId txId: partitionsRedoLogTxIds) { HashMap<Range, StagingTable> transaction = mi.getTxDigest(txId); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java index 0870be9..dc1bcce 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java @@ -24,6 +24,8 @@ import java.io.Serializable; import org.json.JSONObject; import org.json.JSONTokener; +import static java.util.Objects.hash; + public final class Operation implements Serializable{ private static final long serialVersionUID = -1215301985078183104L; @@ -53,6 +55,11 @@ public final class Operation implements Serializable{ } @Override + public int hashCode(){ + return hash(TYPE,NEW_VAL); + } + + @Override public boolean equals(Object o){ if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tests/MAIN.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tests/MAIN.java index 348d891..3de4b7a 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tests/MAIN.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tests/MAIN.java @@ -25,10 +25,10 @@ import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.List; -import org.apache.log4j.Logger; import org.json.JSONArray; import org.json.JSONObject; import org.json.JSONTokener; +import org.onap.music.logging.EELFLoggerDelegate; /** * Run all the tests against all the configurations specified in /tests.json. @@ -86,7 +86,8 @@ public class MAIN { } } public void run() { - Logger logger = Logger.getLogger(this.getClass()); + + EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(this.getClass()); for (int ix = 0; ix < configs.length(); ix++) { JSONObject config = configs.getJSONObject(ix); int succ = 0, fail = 0; diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcConnectionTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcConnectionTest.java new file mode 100644 index 0000000..776a06e --- /dev/null +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcConnectionTest.java @@ -0,0 +1,16 @@ +package org.onap.music.mdbc; + +import static org.junit.Assert.*; + +import org.junit.Test; + +public class MdbcConnectionTest { + + @Test + public void own() { + } + + @Test + public void relinquishIfRequired() { + } +}
\ No newline at end of file diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java b/mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java index 3f8bd65..4d32b83 100755 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java @@ -19,14 +19,124 @@ */ package org.onap.music.mdbc; +import com.datastax.driver.core.*; +import com.datastax.driver.core.exceptions.QueryExecutionException; +import com.datastax.driver.core.exceptions.SyntaxError; +import org.onap.music.datastore.CassaDataStore; +import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.main.MusicUtil; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Properties; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.*; + +import static junit.framework.TestCase.assertNotNull; +import static junit.framework.TestCase.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class TestUtils { + private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TestUtils.class); + + public static void createKeyspace(String keyspace, Session session) { + String queryOp = "CREATE KEYSPACE " + + keyspace + + " WITH REPLICATION " + + "= {'class':'SimpleStrategy', 'replication_factor':1}; "; + ResultSet res=null; + try { + res = session.execute(queryOp); + } + catch(QueryExecutionException e){ + fail("Failure executing creation of keyspace with error: " + e.getMessage()); + } catch(SyntaxError e){ + fail("Failure executing creation of keyspace with syntax error: " + e.getMessage()); + } + assertTrue("Keyspace "+keyspace+" is already being used, please change it to avoid loosing data",res.wasApplied()); + } + + public static void deleteKeyspace(String keyspace, Session session){ + String queryBuilder = "DROP KEYSPACE " + + keyspace + + ";"; + ResultSet res = session.execute(queryBuilder); + assertTrue("Keyspace "+keyspace+" doesn't exist and it should",res.wasApplied()); + } + + public static HashSet<String> getMriColNames(){ + return new HashSet<>( + Arrays.asList("rangeid","keys","txredolog","ownerid","metricprocessid") + ); + } + + public static HashSet<String> getMtdColNames(){ + return new HashSet<>( + Arrays.asList("txid","transactiondigest") + ); + } + + public static HashMap<String, DataType> getMriColTypes(Cluster cluster){ + HashMap<String, DataType> expectedTypes = new HashMap<>(); + expectedTypes.put("rangeid",DataType.uuid()); + expectedTypes.put("keys",DataType.set(DataType.text())); + ProtocolVersion currentVer = cluster.getConfiguration().getProtocolOptions().getProtocolVersion(); + assertNotNull("Protocol version for cluster is invalid", currentVer); + CodecRegistry registry = cluster.getConfiguration().getCodecRegistry(); + assertNotNull("Codec registry for cluster is invalid", registry); + expectedTypes.put("txredolog",DataType.list(TupleType.of(currentVer,registry,DataType.text(),DataType.uuid()))); + expectedTypes.put("ownerid",DataType.text()); + expectedTypes.put("metricprocessid",DataType.text()); + return expectedTypes; + } + + public static HashMap<String, DataType> getMtdColTypes(){ + HashMap<String,DataType> expectedTypes = new HashMap<>(); + expectedTypes.put("txid",DataType.uuid()); + expectedTypes.put("transactiondigest",DataType.text()); + return expectedTypes; + } + + + public static void checkRowsInTable(String keyspace, String tableName, CassaDataStore ds, + HashSet<String> expectedColumns, HashMap<String,DataType> expectedTypes){ + TableMetadata table = ds.returnColumnMetadata(keyspace,tableName); + assertNotNull("Error obtaining metadata of table, there may be an error with its creation", table); + List<ColumnMetadata> columnsMeta = table.getColumns(); + checkDataTypeForTable(columnsMeta,expectedColumns,expectedTypes); + } + + public static void checkDataTypeForTable(List<ColumnMetadata> columnsMeta, HashSet<String> expectedColumns, + HashMap<String,DataType> expectedTypes){ + for(ColumnMetadata cMeta : columnsMeta){ + String columnName = cMeta.getName(); + DataType type = cMeta.getType(); + assertTrue("Invalid column name: "+columnName,expectedColumns.contains(columnName)); + assertTrue("Fix the contents of expectedtypes for column: "+columnName, + expectedTypes.containsKey(columnName)); + assertEquals("Invalid type for column: "+columnName, + expectedTypes.get(columnName),type); + } + } + + public static void readPropertiesFile(Properties prop) { + try { + String fileLocation = MusicUtil.getMusicPropertiesFilePath(); + InputStream fstream = new FileInputStream(fileLocation); + prop.load(fstream); + fstream.close(); + } catch (FileNotFoundException e) { + logger.error("Configuration file not found"); + + } catch (IOException e) { + // TODO Auto-generated catch block + logger.error("Exception when reading file: "+e.toString()); + } + } + + public static void populateMusicUtilsWithProperties(Properties prop){ //TODO: Learn how to do this properly within music String[] propKeys = MusicUtil.getPropkeys(); diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java new file mode 100644 index 0000000..7e0e4c8 --- /dev/null +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java @@ -0,0 +1,193 @@ +package org.onap.music.mdbc.mixins; + +import static org.junit.Assert.*; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import org.cassandraunit.utils.EmbeddedCassandraServerHelper; + + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.onap.music.datastore.CassaDataStore; +import org.onap.music.datastore.MusicLockState; +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.exceptions.MusicLockingException; +import org.onap.music.exceptions.MusicQueryException; +import org.onap.music.exceptions.MusicServiceException; +import org.onap.music.main.MusicCore; +import org.onap.music.mdbc.DatabasePartition; +import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.mdbc.tables.MusicTxDigestId; + +public class MusicMixinTest { + + final private static String keyspace="metricmusictest"; + final private static String mriTableName = "musicrangeinformation"; + final private static String mtdTableName = "musictxdigest"; + final private static String mdbcServerName = "name"; + + //Properties used to connect to music + private static Cluster cluster; + private static Session session; + private static String cassaHost = "localhost"; + private static MusicMixin mixin = null; + + @BeforeClass + public static void init() throws MusicServiceException { + try { + EmbeddedCassandraServerHelper.startEmbeddedCassandra(); + } catch (Exception e) { + System.out.println(e); + } + + cluster = new Cluster.Builder().addContactPoint(cassaHost).withPort(9142).build(); + cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(20000); + assertNotNull("Invalid configuration for cassandra", cluster); + session = cluster.connect(); + assertNotNull("Invalid configuration for cassandra", session); + CassaDataStore store = new CassaDataStore(cluster, session); + assertNotNull("Invalid configuration for music", store); + MusicCore.mDstoreHandle = store; + try { + Properties properties = new Properties(); + properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,keyspace); + properties.setProperty(MusicMixin.KEY_MY_ID,mdbcServerName); + mixin=new MusicMixin(mdbcServerName,properties); + } catch (MDBCServiceException e) { + fail("error creating music mixin"); + } + } + + @AfterClass + public static void close() throws MusicServiceException, MusicQueryException { + //TODO: shutdown cassandra + session.close(); + cluster.close(); + } + + @Test + public void own() { + final UUID uuid = mixin.generateUniqueKey(); + List<Range> ranges = new ArrayList<>(); + ranges.add(new Range("table1")); + DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null); + MusicRangeInformationRow newRow = new MusicRangeInformationRow(dbPartition, new ArrayList<>(), "", mdbcServerName); + DatabasePartition partition=null; + try { + partition = mixin.createMusicRangeInformation(newRow); + } catch (MDBCServiceException e) { + fail("failure when creating new row"); + } + String fullyQualifiedMriKey = keyspace+"."+ mriTableName+"."+partition.getMusicRangeInformationIndex().toString(); + try { + MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId()); + } catch (MusicLockingException e) { + fail("failure when releasing lock"); + } + DatabasePartition newPartition = new DatabasePartition(); + try { + mixin.own(ranges,newPartition); + } catch (MDBCServiceException e) { + fail("failure when running own function"); + } + } + + @Test + public void own2() { + final UUID uuid = mixin.generateUniqueKey(); + final UUID uuid2 = mixin.generateUniqueKey(); + List<Range> ranges = new ArrayList<>(); + List<Range> ranges2 = new ArrayList<>(); + ranges.add(new Range("table1")); + ranges2.add(new Range("table2")); + DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null); + DatabasePartition dbPartition2 = new DatabasePartition(ranges2,uuid2,null); + MusicRangeInformationRow newRow = new MusicRangeInformationRow(dbPartition, new ArrayList<>(), "", mdbcServerName); + MusicRangeInformationRow newRow2 = new MusicRangeInformationRow(dbPartition2, new ArrayList<>(), "", mdbcServerName); + DatabasePartition partition=null; + DatabasePartition partition2=null; + try { + partition = mixin.createMusicRangeInformation(newRow); + partition2 = mixin.createMusicRangeInformation(newRow2); + } catch (MDBCServiceException e) { + fail("failure when creating new row"); + } + String fullyQualifiedMriKey = keyspace+"."+ mriTableName+"."+partition.getMusicRangeInformationIndex().toString(); + String fullyQualifiedMriKey2 = keyspace+"."+ mriTableName+"."+partition2.getMusicRangeInformationIndex().toString(); + try { + MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId()); + MusicLockState musicLockState2 = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey2, partition2.getLockId()); + } catch (MusicLockingException e) { + fail("failure when releasing lock"); + } + DatabasePartition newPartition = new DatabasePartition(); + MusicInterface.OwnershipReturn ownershipReturn=null; + try { + List<Range> ownRanges = new ArrayList<>(); + ownRanges.add(new Range("table1")); + ownRanges.add(new Range("table2")); + ownershipReturn = mixin.own(ownRanges, newPartition); + } catch (MDBCServiceException e) { + fail("failure when running own function"); + } + assertEquals(2,ownershipReturn.getOldIRangeds().size()); + assertEquals(ownershipReturn.getOwnerId(),newPartition.getLockId()); + assertTrue(ownershipReturn.getOldIRangeds().get(0).equals(partition.getMusicRangeInformationIndex())|| + ownershipReturn.getOldIRangeds().get(1).equals(partition.getMusicRangeInformationIndex())); + assertTrue(ownershipReturn.getOldIRangeds().get(0).equals(partition2.getMusicRangeInformationIndex())|| + ownershipReturn.getOldIRangeds().get(1).equals(partition2.getMusicRangeInformationIndex())); + String finalfullyQualifiedMriKey = keyspace+"."+ mriTableName+"."+newPartition.getMusicRangeInformationIndex().toString(); + try { + List<String> lockQueue = MusicCore.getLockingServiceHandle().getLockQueue(keyspace, mriTableName, + newPartition.getMusicRangeInformationIndex().toString()); + assertEquals(1,lockQueue.size()); + assertEquals(lockQueue.get(0),newPartition.getLockId()); + } catch (MusicServiceException|MusicQueryException|MusicLockingException e) { + fail("failure on getting queue"); + } + MusicRangeInformationRow musicRangeInformation=null; + try { + musicRangeInformation= mixin.getMusicRangeInformation(newPartition.getMusicRangeInformationIndex()); + } catch (MDBCServiceException e) { + fail("fail to retrieve row"); + } + assertEquals(2,musicRangeInformation.getDBPartition().getSnapshot().size()); + assertEquals(0,musicRangeInformation.getRedoLog().size()); + assertEquals(newPartition.getLockId(),musicRangeInformation.getOwnerId()); + assertEquals(mdbcServerName,musicRangeInformation.getMetricProcessId()); + List<Range> snapshot = musicRangeInformation.getDBPartition().getSnapshot(); + boolean containsTable1=false; + Range table1Range = new Range("table1"); + for(Range r:snapshot){ + if(r.overlaps(table1Range)){ + containsTable1=true; + break; + } + } + assertTrue(containsTable1); + boolean containsTable2=false; + Range table2Range = new Range("table2"); + for(Range r:snapshot){ + if(r.overlaps(table2Range)){ + containsTable2=true; + break; + } + } + assertTrue(containsTable2); + } + + @Test + public void relinquish() { + } + + @Test + public void relinquishIfRequired() { + } +}
\ No newline at end of file |