aboutsummaryrefslogtreecommitdiffstats
path: root/mdbc-server/src/main/java
diff options
context:
space:
mode:
authorEnrique Saurez <enrique.saurez@gmail.com>2018-10-29 10:43:15 -0400
committerEnrique Saurez <enrique.saurez@gmail.com>2018-11-29 18:26:26 -0500
commit9cc93ae782739f4fd637fa8d30a986ce2e14ae3e (patch)
treeedacbf99b11797f90370c7957f0f7a86e2be1820 /mdbc-server/src/main/java
parent76d8bc46fdf9b36548dff46b9d1c91bf7c56f6ac (diff)
ownership and relinquish
Change-Id: I625bd61adfac11febdb25b179efbc6134a276f12 Issue-ID: MUSIC-219 Signed-off-by: Enrique Saurez <enrique.saurez@gmail.com>
Diffstat (limited to 'mdbc-server/src/main/java')
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java254
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java9
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java925
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java53
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java2
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java809
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java49
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/Range.java22
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java82
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java6
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java114
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java2565
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java16
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java6
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java31
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java7
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tests/MAIN.java5
17 files changed, 2752 insertions, 2203 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
index 2ca621a..ea76598 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,7 +27,6 @@ import java.util.*;
import org.onap.music.logging.EELFLoggerDelegate;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-import org.onap.music.mdbc.tables.MriReference;
/**
* A database range contain information about what ranges should be hosted in the current MDBC instance
@@ -35,115 +34,164 @@ import org.onap.music.mdbc.tables.MriReference;
* @author Enrique Saurez
*/
public class DatabasePartition {
- private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(DatabasePartition.class);
-
- private UUID musicRangeInformationIndex;//Index that can be obtained either from
- private String lockId;
- protected List<Range> ranges;
-
- /**
- * Each range represents a partition of the database, a database partition is a union of this partitions.
- * The only requirement is that the ranges are not overlapping.
- */
-
- public DatabasePartition(UUID mriIndex) {
- this(new ArrayList<Range>(), mriIndex,"");
- }
-
- public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String lockId) {
- ranges = knownRanges;
-
- if(mriIndex != null) {
- this.setMusicRangeInformationIndex(mriIndex);
- }
- else {
- this.setMusicRangeInformationIndex(null);
- }
- this.setLockId(lockId);
-
- }
-
- public UUID getMusicRangeInformationIndex() {
- return musicRangeInformationIndex;
- }
-
- public void setMusicRangeInformationIndex(UUID musicRangeInformationIndex) {
- this.musicRangeInformationIndex = musicRangeInformationIndex;
- }
-
- /**
- * Add a new range to the ones own by the local MDBC
- * @param newRange range that is being added
- * @throws IllegalArgumentException
- */
- public synchronized void addNewRange(Range newRange) {
- //Check overlap
- for(Range r : ranges) {
- if(r.overlaps(newRange)) {
- throw new IllegalArgumentException("Range is already contain by a previous range");
- }
- }
- ranges.add(newRange);
- }
-
- /**
- * Delete a range that is being modified
- * @param rangeToDel limits of the range
- */
- public synchronized void deleteRange(Range rangeToDel) {
- if(!ranges.contains(rangeToDel)) {
- logger.error(EELFLoggerDelegate.errorLogger,"Range doesn't exist");
- throw new IllegalArgumentException("Invalid table");
- }
- ranges.remove(rangeToDel);
- }
-
- /**
- * Get all the ranges that are currently owned
- * @return ranges
- */
- public synchronized Range[] getSnapshot() {
- return (Range[]) ranges.toArray();
- }
-
- /**
- * Serialize the ranges
- * @return serialized ranges
- */
+ private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(DatabasePartition.class);
+
+ private UUID musicRangeInformationIndex;//Index that can be obtained either from
+ private String lockId;
+ protected List<Range> ranges;
+
+ private boolean ready;
+
+ /**
+ * Each range represents a partition of the database, a database partition is a union of this partitions.
+ * The only requirement is that the ranges are not overlapping.
+ */
+
+ public DatabasePartition() {
+ this(new ArrayList<Range>(),null,"");
+ }
+
+ public DatabasePartition(UUID mriIndex) {
+ this(new ArrayList<Range>(), mriIndex,"");
+ }
+
+ public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String lockId) {
+ if(mriIndex==null){
+ ready = false;
+ }
+ else{
+ ready = true;
+ }
+ ranges = knownRanges;
+
+ this.setMusicRangeInformationIndex(mriIndex);
+ this.setLockId(lockId);
+ }
+
+ /**
+ * This function is used to change the contents of this, with the contents of a different object
+ * @param otherPartition partition that is used to substitute the local contents
+ */
+ public void updateDatabasePartition(DatabasePartition otherPartition){
+ musicRangeInformationIndex = otherPartition.musicRangeInformationIndex;//Index that can be obtained either from
+ lockId = otherPartition.lockId;
+ ranges = otherPartition.ranges;
+ ready = otherPartition.ready;
+ }
+
+ public String toString(){
+ StringBuilder builder = new StringBuilder().append("Row: ["+musicRangeInformationIndex.toString()+"], lockId: ["+lockId +"], ranges: [");
+ for(Range r: ranges){
+ builder.append(r.toString()).append(",");
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+
+
+ public boolean isLocked(){return lockId != null && !lockId.isEmpty(); }
+
+ public boolean isReady() {
+ return ready;
+ }
+
+ public void setReady(boolean ready) {
+ this.ready = ready;
+ }
+
+ public UUID getMusicRangeInformationIndex() {
+ return musicRangeInformationIndex;
+ }
+
+ public void setMusicRangeInformationIndex(UUID musicRangeInformationIndex) {
+ this.musicRangeInformationIndex = musicRangeInformationIndex;
+ }
+
+ /**
+ * Add a new range to the ones own by the local MDBC
+ * @param newRange range that is being added
+ * @throws IllegalArgumentException
+ */
+ public synchronized void addNewRange(Range newRange) {
+ //Check overlap
+ for(Range r : ranges) {
+ if(r.overlaps(newRange)) {
+ throw new IllegalArgumentException("Range is already contain by a previous range");
+ }
+ }
+ ranges.add(newRange);
+ }
+
+ /**
+ * Delete a range that is being modified
+ * @param rangeToDel limits of the range
+ */
+ public synchronized void deleteRange(Range rangeToDel) {
+ if(!ranges.contains(rangeToDel)) {
+ logger.error(EELFLoggerDelegate.errorLogger,"Range doesn't exist");
+ throw new IllegalArgumentException("Invalid table");
+ }
+ ranges.remove(rangeToDel);
+ }
+
+ /**
+ * Get all the ranges that are currently owned
+ * @return ranges
+ */
+ public synchronized List<Range> getSnapshot() {
+ List<Range> newRange = new ArrayList<>();
+ for(Range r : ranges){
+ newRange.add(r.clone());
+ }
+ return newRange;
+ }
+
+ /**
+ * Serialize the ranges
+ * @return serialized ranges
+ */
public String toJson() {
- GsonBuilder builder = new GsonBuilder();
- builder.setPrettyPrinting().serializeNulls();;
+ GsonBuilder builder = new GsonBuilder();
+ builder.setPrettyPrinting().serializeNulls();;
Gson gson = builder.create();
- return gson.toJson(this);
+ return gson.toJson(this);
}
-
+
/**
* Function to obtain the configuration
* @param filepath path to the database range
* @return a new object of type DatabaseRange
* @throws FileNotFoundException
*/
-
+
public static DatabasePartition readJsonFromFile( String filepath) throws FileNotFoundException {
- BufferedReader br;
- try {
- br = new BufferedReader(
- new FileReader(filepath));
- } catch (FileNotFoundException e) {
- logger.error(EELFLoggerDelegate.errorLogger,"File was not found when reading json"+e);
- throw e;
- }
- Gson gson = new Gson();
- DatabasePartition range = gson.fromJson(br, DatabasePartition.class);
- return range;
- }
-
- public String getLockId() {
- return lockId;
- }
-
- public void setLockId(String lockId) {
- this.lockId = lockId;
- }
+ BufferedReader br;
+ try {
+ br = new BufferedReader(
+ new FileReader(filepath));
+ } catch (FileNotFoundException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,"File was not found when reading json"+e);
+ throw e;
+ }
+ Gson gson = new Gson();
+ DatabasePartition range = gson.fromJson(br, DatabasePartition.class);
+ return range;
+ }
+
+ public String getLockId() {
+ return lockId;
+ }
+ public void setLockId(String lockId) {
+ this.lockId = lockId;
+ }
+
+ public boolean isContained(Range range){
+ for(Range r: ranges){
+ if(r.overlaps(range)){
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
index 2723490..3f45d98 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
@@ -25,6 +25,7 @@ import java.util.Base64;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -122,4 +123,12 @@ public class MDBCUtils {
}
return prop;
}
+
+ public static List<Range> getTables(Map<String,List<String>> queryParsed){
+ List<Range> ranges = new ArrayList<>();
+ for(String table: queryParsed.keySet()){
+ ranges.add(new Range(table));
+ }
+ return ranges;
+ }
} \ No newline at end of file
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
index cac4139..7574841 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -37,9 +37,11 @@ import java.sql.Struct;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.Executor;
import org.onap.music.exceptions.MDBCServiceException;
@@ -51,6 +53,9 @@ import org.onap.music.logging.format.ErrorTypes;
import org.onap.music.mdbc.mixins.DBInterface;
import org.onap.music.mdbc.mixins.MixinFactory;
import org.onap.music.mdbc.mixins.MusicInterface;
+import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn;
+import org.onap.music.mdbc.query.QueryProcessor;
+import org.onap.music.mdbc.tables.MusicTxDigest;
import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.TxCommitProgress;
@@ -64,461 +69,487 @@ import org.onap.music.mdbc.tables.TxCommitProgress;
* @author Robert Eby
*/
public class MdbcConnection implements Connection {
- private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MdbcConnection.class);
-
- private final String id; // This is the transaction id, assigned to this connection. There is no need to change the id, if connection is reused
- private final Connection jdbcConn; // the JDBC Connection to the actual underlying database
- private final MusicInterface mi;
- private final TxCommitProgress progressKeeper;
- private final DatabasePartition partition;
- private final DBInterface dbi;
- private final HashMap<Range,StagingTable> transactionDigest;
- private final Set<String> table_set;
-
- public MdbcConnection(String id, String url, Connection c, Properties info, MusicInterface mi, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException {
- this.id = id;
- this.table_set = Collections.synchronizedSet(new HashSet<String>());
- this.transactionDigest = new HashMap<Range,StagingTable>();
-
- if (c == null) {
- throw new MDBCServiceException("Connection is null");
- }
- this.jdbcConn = c;
- info.putAll(MDBCUtils.getMdbcProperties());
- String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT);
- this.dbi = MixinFactory.createDBInterface(mixinDb, mi, url, jdbcConn, info);
- this.mi = mi;
- try {
- this.setAutoCommit(c.getAutoCommit());
- } catch (SQLException e) {
+ private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MdbcConnection.class);
+
+ private final String id; // This is the transaction id, assigned to this connection. There is no need to change the id, if connection is reused
+ private final Connection jdbcConn; // the JDBC Connection to the actual underlying database
+ private final MusicInterface mi;
+ private final TxCommitProgress progressKeeper;
+ private final DatabasePartition partition;
+ private final DBInterface dbi;
+ private final HashMap<Range,StagingTable> transactionDigest;
+ private final Set<String> table_set;
+
+ public MdbcConnection(String id, String url, Connection c, Properties info, MusicInterface mi, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException {
+ this.id = id;
+ this.table_set = Collections.synchronizedSet(new HashSet<String>());
+ this.transactionDigest = new HashMap<Range,StagingTable>();
+
+ if (c == null) {
+ throw new MDBCServiceException("Connection is null");
+ }
+ this.jdbcConn = c;
+ info.putAll(MDBCUtils.getMdbcProperties());
+ String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT);
+ this.dbi = MixinFactory.createDBInterface(mixinDb, mi, url, jdbcConn, info);
+ this.mi = mi;
+ try {
+ this.setAutoCommit(c.getAutoCommit());
+ } catch (SQLException e) {
logger.error("Failure in autocommit");
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
- }
-
- // Verify the tables in MUSIC match the tables in the database
- // and create triggers on any tables that need them
- try {
- this.synchronizeTables();
- } catch (QueryException e) {
- logger.error("Error syncrhonizing tables");
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
- }
- this.progressKeeper = progressKeeper;
- this.partition = partition;
-
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
+ }
+
+ // Verify the tables in MUSIC match the tables in the database
+ // and create triggers on any tables that need them
+ try {
+ this.synchronizeTables();
+ } catch (QueryException e) {
+ logger.error("Error syncrhonizing tables");
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
+ }
+ this.progressKeeper = progressKeeper;
+ this.partition = partition;
+
logger.debug("Mdbc connection created with id: "+id);
- }
-
- @Override
- public <T> T unwrap(Class<T> iface) throws SQLException {
- logger.error(EELFLoggerDelegate.errorLogger, "proxyconn unwrap: " + iface.getName());
- return jdbcConn.unwrap(iface);
- }
-
- @Override
- public boolean isWrapperFor(Class<?> iface) throws SQLException {
- logger.error(EELFLoggerDelegate.errorLogger, "proxystatement iswrapperfor: " + iface.getName());
- return jdbcConn.isWrapperFor(iface);
- }
-
- @Override
- public Statement createStatement() throws SQLException {
- return new MdbcCallableStatement(jdbcConn.createStatement(), this);
- }
-
- @Override
- public PreparedStatement prepareStatement(String sql) throws SQLException {
- //TODO: grab the sql call from here and all the other preparestatement calls
- return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql), sql, this);
- }
-
- @Override
- public CallableStatement prepareCall(String sql) throws SQLException {
- return new MdbcCallableStatement(jdbcConn.prepareCall(sql), this);
- }
-
- @Override
- public String nativeSQL(String sql) throws SQLException {
- return jdbcConn.nativeSQL(sql);
- }
-
- @Override
- public void setAutoCommit(boolean autoCommit) throws SQLException {
- boolean b = jdbcConn.getAutoCommit();
- if (b != autoCommit) {
- if(progressKeeper!=null) progressKeeper.commitRequested(id);
- logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b);
- if (b) {
- // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction
- if(id == null || id.isEmpty()) {
- logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
- throw new SQLException("tx id is null");
- }
- try {
- mi.commitLog(partition, transactionDigest, id, progressKeeper);
- } catch (MDBCServiceException e) {
- // TODO Auto-generated catch block
- logger.error("Cannot commit log to music" + e.getStackTrace());
- throw new SQLException(e.getMessage());
- }
- }
- if(progressKeeper!=null) {
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ logger.error(EELFLoggerDelegate.errorLogger, "proxyconn unwrap: " + iface.getName());
+ return jdbcConn.unwrap(iface);
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ logger.error(EELFLoggerDelegate.errorLogger, "proxystatement iswrapperfor: " + iface.getName());
+ return jdbcConn.isWrapperFor(iface);
+ }
+
+ @Override
+ public Statement createStatement() throws SQLException {
+ return new MdbcCallableStatement(jdbcConn.createStatement(), this);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql) throws SQLException {
+ //TODO: grab the sql call from here and all the other preparestatement calls
+ return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql), sql, this);
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql) throws SQLException {
+ return new MdbcCallableStatement(jdbcConn.prepareCall(sql), this);
+ }
+
+ @Override
+ public String nativeSQL(String sql) throws SQLException {
+ return jdbcConn.nativeSQL(sql);
+ }
+
+ @Override
+ public void setAutoCommit(boolean autoCommit) throws SQLException {
+ boolean b = jdbcConn.getAutoCommit();
+ if (b != autoCommit) {
+ if(progressKeeper!=null) progressKeeper.commitRequested(id);
+ logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b);
+ if (b) {
+ // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction
+ if(id == null || id.isEmpty()) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
+ throw new SQLException("tx id is null");
+ }
+ try {
+ mi.commitLog(partition, transactionDigest, id, progressKeeper);
+ } catch (MDBCServiceException e) {
+ // TODO Auto-generated catch block
+ logger.error("Cannot commit log to music" + e.getStackTrace());
+ throw new SQLException(e.getMessage());
+ }
+ }
+ if(progressKeeper!=null) {
progressKeeper.setMusicDone(id);
- }
- jdbcConn.setAutoCommit(autoCommit);
+ }
+ jdbcConn.setAutoCommit(autoCommit);
if(progressKeeper!=null) {
progressKeeper.setSQLDone(id);
}
if(progressKeeper!=null&&progressKeeper.isComplete(id)){
progressKeeper.reinitializeTxProgress(id);
}
- }
- }
-
- @Override
- public boolean getAutoCommit() throws SQLException {
- return jdbcConn.getAutoCommit();
- }
-
- /**
- * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed,
- * they are performed now and copied into MUSIC.
- * @throws SQLException
- */
- @Override
- public void commit() throws SQLException {
- if(progressKeeper.isComplete(id)) {
- return;
- }
- if(progressKeeper != null) {
- progressKeeper.commitRequested(id);
- }
-
- try {
- logger.debug(EELFLoggerDelegate.applicationLogger, " commit ");
- // transaction was committed -- add all the updates into the REDO-Log in MUSIC
- mi.commitLog(partition, transactionDigest, id, progressKeeper);
- } catch (MDBCServiceException e) {
- //If the commit fail, then a new commitId should be used
- logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
- throw new SQLException("Failure commiting to MUSIC");
- }
-
- if(progressKeeper != null) {
- progressKeeper.setMusicDone(id);
- }
-
- jdbcConn.commit();
-
- if(progressKeeper != null) {
- progressKeeper.setSQLDone(id);
- }
- //MusicMixin.releaseZKLocks(MusicMixin.currentLockMap.get(getConnID()));
+ }
+ }
+
+ @Override
+ public boolean getAutoCommit() throws SQLException {
+ return jdbcConn.getAutoCommit();
+ }
+
+ /**
+ * Perform a commit, as requested by the JDBC driver. If any row updates have been delayed,
+ * they are performed now and copied into MUSIC.
+ * @throws SQLException
+ */
+ @Override
+ public void commit() throws SQLException {
+ if(progressKeeper.isComplete(id)) {
+ return;
+ }
+ if(progressKeeper != null) {
+ progressKeeper.commitRequested(id);
+ }
+
+ try {
+ logger.debug(EELFLoggerDelegate.applicationLogger, " commit ");
+ // transaction was committed -- add all the updates into the REDO-Log in MUSIC
+ mi.commitLog(partition, transactionDigest, id, progressKeeper);
+ } catch (MDBCServiceException e) {
+ //If the commit fail, then a new commitId should be used
+ logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
+ throw new SQLException("Failure commiting to MUSIC");
+ }
+
+ if(progressKeeper != null) {
+ progressKeeper.setMusicDone(id);
+ }
+
+ jdbcConn.commit();
+
+ if(progressKeeper != null) {
+ progressKeeper.setSQLDone(id);
+ }
+ //MusicMixin.releaseZKLocks(MusicMixin.currentLockMap.get(getConnID()));
if(progressKeeper.isComplete(id)){
- progressKeeper.reinitializeTxProgress(id);
+ progressKeeper.reinitializeTxProgress(id);
+ }
+
+ //\TODO try to execute outside of the critical path of commit
+ try {
+ relinquishIfRequired(partition);
+ } catch (MDBCServiceException e) {
+ logger.warn("Error trying to relinquish: "+partition.toString());
}
- }
-
- /**
- * Perform a rollback, as requested by the JDBC driver. If any row updates have been delayed,
- * they are discarded.
- */
- @Override
- public void rollback() throws SQLException {
- logger.debug(EELFLoggerDelegate.applicationLogger, "Rollback");;
- transactionDigest.clear();
- jdbcConn.rollback();
- progressKeeper.reinitializeTxProgress(id);
- }
-
- /**
- * Close this MdbcConnection.
- */
- @Override
- public void close() throws SQLException {
- logger.debug("Closing mdbc connection with id:"+id);
- if (dbi != null) {
- dbi.close();
- }
- if (jdbcConn != null && !jdbcConn.isClosed()) {
+ }
+
+ /**
+ * Perform a rollback, as requested by the JDBC driver. If any row updates have been delayed,
+ * they are discarded.
+ */
+ @Override
+ public void rollback() throws SQLException {
+ logger.debug(EELFLoggerDelegate.applicationLogger, "Rollback");;
+ transactionDigest.clear();
+ jdbcConn.rollback();
+ progressKeeper.reinitializeTxProgress(id);
+ }
+
+ /**
+ * Close this MdbcConnection.
+ */
+ @Override
+ public void close() throws SQLException {
+ logger.debug("Closing mdbc connection with id:"+id);
+ if (dbi != null) {
+ dbi.close();
+ }
+ if (jdbcConn != null && !jdbcConn.isClosed()) {
logger.debug("Closing jdbc from mdbc with id:"+id);
- jdbcConn.close();
- logger.debug("Connection was closed for id:" + id);
- }
- }
-
- @Override
- public boolean isClosed() throws SQLException {
- return jdbcConn.isClosed();
- }
-
- @Override
- public DatabaseMetaData getMetaData() throws SQLException {
- return jdbcConn.getMetaData();
- }
-
- @Override
- public void setReadOnly(boolean readOnly) throws SQLException {
- jdbcConn.setReadOnly(readOnly);
- }
-
- @Override
- public boolean isReadOnly() throws SQLException {
- return jdbcConn.isReadOnly();
- }
-
- @Override
- public void setCatalog(String catalog) throws SQLException {
- jdbcConn.setCatalog(catalog);
- }
-
- @Override
- public String getCatalog() throws SQLException {
- return jdbcConn.getCatalog();
- }
-
- @Override
- public void setTransactionIsolation(int level) throws SQLException {
- jdbcConn.setTransactionIsolation(level);
- }
-
- @Override
- public int getTransactionIsolation() throws SQLException {
- return jdbcConn.getTransactionIsolation();
- }
-
- @Override
- public SQLWarning getWarnings() throws SQLException {
- return jdbcConn.getWarnings();
- }
-
- @Override
- public void clearWarnings() throws SQLException {
- jdbcConn.clearWarnings();
- }
-
- @Override
- public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
- return new MdbcCallableStatement(jdbcConn.createStatement(resultSetType, resultSetConcurrency), this);
- }
-
- @Override
- public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
- throws SQLException {
- return new MdbcCallableStatement(jdbcConn.prepareStatement(sql, resultSetType, resultSetConcurrency), sql, this);
- }
-
- @Override
- public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
- return new MdbcCallableStatement(jdbcConn.prepareCall(sql, resultSetType, resultSetConcurrency), this);
- }
-
- @Override
- public Map<String, Class<?>> getTypeMap() throws SQLException {
- return jdbcConn.getTypeMap();
- }
-
- @Override
- public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
- jdbcConn.setTypeMap(map);
- }
-
- @Override
- public void setHoldability(int holdability) throws SQLException {
- jdbcConn.setHoldability(holdability);
- }
-
- @Override
- public int getHoldability() throws SQLException {
- return jdbcConn.getHoldability();
- }
-
- @Override
- public Savepoint setSavepoint() throws SQLException {
- return jdbcConn.setSavepoint();
- }
-
- @Override
- public Savepoint setSavepoint(String name) throws SQLException {
- return jdbcConn.setSavepoint(name);
- }
-
- @Override
- public void rollback(Savepoint savepoint) throws SQLException {
- jdbcConn.rollback(savepoint);
- }
-
- @Override
- public void releaseSavepoint(Savepoint savepoint) throws SQLException {
- jdbcConn.releaseSavepoint(savepoint);
- }
-
- @Override
- public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
- throws SQLException {
- return new MdbcCallableStatement(jdbcConn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability), this);
- }
-
- @Override
- public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
- int resultSetHoldability) throws SQLException {
- return new MdbcCallableStatement(jdbcConn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql, this);
- }
-
- @Override
- public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
- int resultSetHoldability) throws SQLException {
- return new MdbcCallableStatement(jdbcConn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability), this);
- }
-
- @Override
- public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
- return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql, autoGeneratedKeys), sql, this);
- }
-
- @Override
- public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
- return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql, columnIndexes), sql, this);
- }
-
- @Override
- public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
- return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql, columnNames), sql, this);
- }
-
- @Override
- public Clob createClob() throws SQLException {
- return jdbcConn.createClob();
- }
-
- @Override
- public Blob createBlob() throws SQLException {
- return jdbcConn.createBlob();
- }
-
- @Override
- public NClob createNClob() throws SQLException {
- return jdbcConn.createNClob();
- }
-
- @Override
- public SQLXML createSQLXML() throws SQLException {
- return jdbcConn.createSQLXML();
- }
-
- @Override
- public boolean isValid(int timeout) throws SQLException {
- return jdbcConn.isValid(timeout);
- }
-
- @Override
- public void setClientInfo(String name, String value) throws SQLClientInfoException {
- jdbcConn.setClientInfo(name, value);
- }
-
- @Override
- public void setClientInfo(Properties properties) throws SQLClientInfoException {
- jdbcConn.setClientInfo(properties);
- }
-
- @Override
- public String getClientInfo(String name) throws SQLException {
- return jdbcConn.getClientInfo(name);
- }
-
- @Override
- public Properties getClientInfo() throws SQLException {
- return jdbcConn.getClientInfo();
- }
-
- @Override
- public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
- return jdbcConn.createArrayOf(typeName, elements);
- }
-
- @Override
- public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
- return jdbcConn.createStruct(typeName, attributes);
- }
-
- @Override
- public void setSchema(String schema) throws SQLException {
- jdbcConn.setSchema(schema);
- }
-
- @Override
- public String getSchema() throws SQLException {
- return jdbcConn.getSchema();
- }
-
- @Override
- public void abort(Executor executor) throws SQLException {
- jdbcConn.abort(executor);
- }
-
- @Override
- public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
- jdbcConn.setNetworkTimeout(executor, milliseconds);
- }
-
- @Override
- public int getNetworkTimeout() throws SQLException {
- return jdbcConn.getNetworkTimeout();
- }
-
-
- /**
- * Code to be run within the DB driver before a SQL statement is executed. This is where tables
- * can be synchronized before a SELECT, for those databases that do not support SELECT triggers.
- * @param sql the SQL statement that is about to be executed
- */
- public void preStatementHook(String sql) {
- dbi.preStatementHook(sql);
- }
-
- /**
- * Code to be run within the DB driver after a SQL statement has been executed. This is where remote
- * statement actions can be copied back to Cassandra/MUSIC.
- * @param sql the SQL statement that was executed
- */
- public void postStatementHook(String sql) {
- dbi.postStatementHook(sql, transactionDigest);
- }
-
- /**
- * Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the
- * proxy first starts, and whenever there is the possibility that tables were created or dropped. It is synchronized
- * in order to prevent multiple threads from running this code in parallel.
- */
- public void synchronizeTables() throws QueryException {
- Set<String> set1 = dbi.getSQLTableSet(); // set of tables in the database
- logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1);
- for (String tableName : set1) {
- // This map will be filled in if this table was previously discovered
- tableName = tableName.toUpperCase();
- if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) {
- logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName);
- try {
- TableInfo ti = dbi.getTableInfo(tableName);
- mi.initializeMusicForTable(ti,tableName);
- //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted
- ti = dbi.getTableInfo(tableName);
- mi.createDirtyRowTable(ti,tableName);
- dbi.createSQLTriggers(tableName);
- table_set.add(tableName);
- dbi.synchronizeData(tableName);
- logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" +
- table_set.size() + "/" + set1.size() + "tables uploaded");
- } catch (Exception e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
- //logger.error(EELFLoggerDelegate.errorLogger, "Exception synchronizeTables: "+e);
- throw new QueryException();
- }
- }
- }
- }
-
- public DBInterface getDBInterface() {
- return this.dbi;
- }
+ jdbcConn.close();
+ logger.debug("Connection was closed for id:" + id);
+ }
+ }
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ return jdbcConn.isClosed();
+ }
+
+ @Override
+ public DatabaseMetaData getMetaData() throws SQLException {
+ return jdbcConn.getMetaData();
+ }
+
+ @Override
+ public void setReadOnly(boolean readOnly) throws SQLException {
+ jdbcConn.setReadOnly(readOnly);
+ }
+
+ @Override
+ public boolean isReadOnly() throws SQLException {
+ return jdbcConn.isReadOnly();
+ }
+
+ @Override
+ public void setCatalog(String catalog) throws SQLException {
+ jdbcConn.setCatalog(catalog);
+ }
+
+ @Override
+ public String getCatalog() throws SQLException {
+ return jdbcConn.getCatalog();
+ }
+
+ @Override
+ public void setTransactionIsolation(int level) throws SQLException {
+ jdbcConn.setTransactionIsolation(level);
+ }
+
+ @Override
+ public int getTransactionIsolation() throws SQLException {
+ return jdbcConn.getTransactionIsolation();
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ return jdbcConn.getWarnings();
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {
+ jdbcConn.clearWarnings();
+ }
+
+ @Override
+ public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+ return new MdbcCallableStatement(jdbcConn.createStatement(resultSetType, resultSetConcurrency), this);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
+ throws SQLException {
+ return new MdbcCallableStatement(jdbcConn.prepareStatement(sql, resultSetType, resultSetConcurrency), sql, this);
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+ return new MdbcCallableStatement(jdbcConn.prepareCall(sql, resultSetType, resultSetConcurrency), this);
+ }
+
+ @Override
+ public Map<String, Class<?>> getTypeMap() throws SQLException {
+ return jdbcConn.getTypeMap();
+ }
+
+ @Override
+ public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+ jdbcConn.setTypeMap(map);
+ }
+
+ @Override
+ public void setHoldability(int holdability) throws SQLException {
+ jdbcConn.setHoldability(holdability);
+ }
+
+ @Override
+ public int getHoldability() throws SQLException {
+ return jdbcConn.getHoldability();
+ }
+
+ @Override
+ public Savepoint setSavepoint() throws SQLException {
+ return jdbcConn.setSavepoint();
+ }
+
+ @Override
+ public Savepoint setSavepoint(String name) throws SQLException {
+ return jdbcConn.setSavepoint(name);
+ }
+
+ @Override
+ public void rollback(Savepoint savepoint) throws SQLException {
+ jdbcConn.rollback(savepoint);
+ }
+
+ @Override
+ public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+ jdbcConn.releaseSavepoint(savepoint);
+ }
+
+ @Override
+ public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
+ throws SQLException {
+ return new MdbcCallableStatement(jdbcConn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability), this);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
+ int resultSetHoldability) throws SQLException {
+ return new MdbcCallableStatement(jdbcConn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql, this);
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
+ int resultSetHoldability) throws SQLException {
+ return new MdbcCallableStatement(jdbcConn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability), this);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+ return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql, autoGeneratedKeys), sql, this);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+ return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql, columnIndexes), sql, this);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+ return new MdbcPreparedStatement(jdbcConn.prepareStatement(sql, columnNames), sql, this);
+ }
+
+ @Override
+ public Clob createClob() throws SQLException {
+ return jdbcConn.createClob();
+ }
+
+ @Override
+ public Blob createBlob() throws SQLException {
+ return jdbcConn.createBlob();
+ }
+
+ @Override
+ public NClob createNClob() throws SQLException {
+ return jdbcConn.createNClob();
+ }
+
+ @Override
+ public SQLXML createSQLXML() throws SQLException {
+ return jdbcConn.createSQLXML();
+ }
+
+ @Override
+ public boolean isValid(int timeout) throws SQLException {
+ return jdbcConn.isValid(timeout);
+ }
+
+ @Override
+ public void setClientInfo(String name, String value) throws SQLClientInfoException {
+ jdbcConn.setClientInfo(name, value);
+ }
+
+ @Override
+ public void setClientInfo(Properties properties) throws SQLClientInfoException {
+ jdbcConn.setClientInfo(properties);
+ }
+
+ @Override
+ public String getClientInfo(String name) throws SQLException {
+ return jdbcConn.getClientInfo(name);
+ }
+
+ @Override
+ public Properties getClientInfo() throws SQLException {
+ return jdbcConn.getClientInfo();
+ }
+
+ @Override
+ public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+ return jdbcConn.createArrayOf(typeName, elements);
+ }
+
+ @Override
+ public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+ return jdbcConn.createStruct(typeName, attributes);
+ }
+
+ @Override
+ public void setSchema(String schema) throws SQLException {
+ jdbcConn.setSchema(schema);
+ }
+
+ @Override
+ public String getSchema() throws SQLException {
+ return jdbcConn.getSchema();
+ }
+
+ @Override
+ public void abort(Executor executor) throws SQLException {
+ jdbcConn.abort(executor);
+ }
+
+ @Override
+ public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+ jdbcConn.setNetworkTimeout(executor, milliseconds);
+ }
+
+ @Override
+ public int getNetworkTimeout() throws SQLException {
+ return jdbcConn.getNetworkTimeout();
+ }
+
+ /**
+ * Code to be run within the DB driver before a SQL statement is executed. This is where tables
+ * can be synchronized before a SELECT, for those databases that do not support SELECT triggers.
+ * @param sql the SQL statement that is about to be executed
+ */
+ public void preStatementHook(final String sql) throws MDBCServiceException {
+ //TODO: verify ownership of keys here
+ //Parse tables from the sql query
+ Map<String, List<String>> tableToInstruction = QueryProcessor.extractTableFromQuery(sql);
+ //Check ownership of keys
+ own(MDBCUtils.getTables(tableToInstruction));
+ dbi.preStatementHook(sql);
+ }
+
+
+ /**
+ * Code to be run within the DB driver after a SQL statement has been executed. This is where remote
+ * statement actions can be copied back to Cassandra/MUSIC.
+ * @param sql the SQL statement that was executed
+ */
+ public void postStatementHook(String sql) {
+ dbi.postStatementHook(sql, transactionDigest);
+ }
+
+ /**
+ * Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the
+ * proxy first starts, and whenever there is the possibility that tables were created or dropped. It is synchronized
+ * in order to prevent multiple threads from running this code in parallel.
+ */
+ public void synchronizeTables() throws QueryException {
+ Set<String> set1 = dbi.getSQLTableSet(); // set of tables in the database
+ logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1);
+ for (String tableName : set1) {
+ // This map will be filled in if this table was previously discovered
+ tableName = tableName.toUpperCase();
+ if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) {
+ logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName);
+ try {
+ TableInfo ti = dbi.getTableInfo(tableName);
+ mi.initializeMusicForTable(ti,tableName);
+ //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted
+ ti = dbi.getTableInfo(tableName);
+ mi.createDirtyRowTable(ti,tableName);
+ dbi.createSQLTriggers(tableName);
+ table_set.add(tableName);
+ dbi.synchronizeData(tableName);
+ logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" +
+ table_set.size() + "/" + set1.size() + "tables uploaded");
+ } catch (Exception e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
+ //logger.error(EELFLoggerDelegate.errorLogger, "Exception synchronizeTables: "+e);
+ throw new QueryException();
+ }
+ }
+ }
+ }
+
+ public DBInterface getDBInterface() {
+ return this.dbi;
+ }
+
+ public void own(List<Range> ranges) throws MDBCServiceException {
+ final OwnershipReturn ownershipReturn = mi.own(ranges, partition);
+ final List<UUID> oldRangeIds = ownershipReturn.getOldIRangeds();
+ //\TODO: do in parallel for all range ids
+ for(UUID oldRange : oldRangeIds) {
+ MusicTxDigest.replayDigestForPartition(mi, oldRange,dbi);
+ }
+ }
+
+ public void relinquishIfRequired(DatabasePartition partition) throws MDBCServiceException {
+ mi.relinquishIfRequired(partition);
+ }
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java
index 092aa94..68a40a8 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java
@@ -46,6 +46,7 @@ import java.util.Calendar;
import org.apache.commons.lang3.StringUtils;
+import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
/**
@@ -93,11 +94,13 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
r = stmt.executeQuery(sql);
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
logger.error(EELFLoggerDelegate.errorLogger, "executeQuery: exception "+nm);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
return r;
}
@@ -112,11 +115,13 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
n = stmt.executeUpdate(sql);
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
logger.error(EELFLoggerDelegate.errorLogger, "executeUpdate: exception "+nm+" "+e);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
return n;
}
@@ -193,7 +198,7 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
b = stmt.execute(sql);
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
logger.error(EELFLoggerDelegate.errorLogger, "execute: exception "+nm+" "+e);
// Note: this seems to be the only call Camunda uses, so it is the only one I am fixing for now.
@@ -205,6 +210,8 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.error(EELFLoggerDelegate.errorLogger, " Exception "+nm+" "+e);
throw e;
}
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
return b;
}
@@ -305,11 +312,13 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
n = stmt.executeUpdate(sql, autoGeneratedKeys);
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
return n;
}
@@ -323,12 +332,14 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
n = stmt.executeUpdate(sql, columnIndexes);
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
- }
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
+ }
return n;
}
@@ -341,11 +352,13 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
n = stmt.executeUpdate(sql, columnNames);
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
return n;
}
@@ -359,11 +372,13 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
b = stmt.execute(sql, autoGeneratedKeys);
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
return b;
}
@@ -377,11 +392,13 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
b = stmt.execute(sql, columnIndexes);
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
return b;
}
@@ -395,11 +412,13 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
b = stmt.execute(sql, columnNames);
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
return b;
}
@@ -443,13 +462,17 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
r = ((PreparedStatement)stmt).executeQuery();;
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
e.printStackTrace();
String nm = e.getClass().getName();
logger.error(EELFLoggerDelegate.errorLogger,"executeQuery: exception "+nm);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
+
+
return r;
}
@@ -462,12 +485,14 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
n = ((PreparedStatement)stmt).executeUpdate();
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
e.printStackTrace();
String nm = e.getClass().getName();
logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
return n;
}
@@ -583,7 +608,7 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
b = ((PreparedStatement)stmt).execute();
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
e.printStackTrace();
String nm = e.getClass().getName();
// Note: this seems to be the only call Camunda uses, so it is the only one I am fixing for now.
@@ -595,6 +620,8 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm);
throw e;
}
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
return b;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
index 989b6e4..a70c359 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
@@ -57,7 +57,7 @@ public class MdbcServerLogic extends JdbcMeta{
public MdbcServerLogic(String Url, Properties info, NodeConfiguration config) throws SQLException, MDBCServiceException {
super(Url,info);
- this.manager = new StateManager(Url,info,config.partition,config.nodeName, "test"); //FIXME: db name should not be passed in ahead of time
+ this.manager = new StateManager(Url,info,config.nodeName, "test"); //FIXME: db name should not be passed in ahead of time
this.info = info;
int concurrencyLevel = Integer.parseInt(
info.getProperty(ConnectionCacheSettings.CONCURRENCY_LEVEL.key(),
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java
index 97f4be4..26b9acd 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcStatement.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,6 +25,7 @@ import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
+import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.exceptions.QueryException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.logging.format.AppMessages;
@@ -38,398 +39,416 @@ import org.onap.music.logging.format.ErrorTypes;
* @author Robert Eby
*/
public class MdbcStatement implements Statement {
- private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MdbcStatement.class);
- private static final String DATASTAX_PREFIX = "com.datastax.driver";
-
- final Statement stmt; // the Statement that we are proxying
- final MdbcConnection mConn;
- //\TODO We may need to all pass the connection object to support autocommit
-
- public MdbcStatement(Statement s, MdbcConnection mConn) {
- this.stmt = s;
- this.mConn = mConn;
- }
-
- public MdbcStatement(Statement stmt, String sql, MdbcConnection mConn) {
- //\TODO why there is a constructor with a sql parameter in a not PreparedStatement
- this.stmt = stmt;
- this.mConn = mConn;
- }
-
- @Override
- public <T> T unwrap(Class<T> iface) throws SQLException {
- logger.error(EELFLoggerDelegate.errorLogger, "proxystatement unwrap: " + iface.getName());
- return stmt.unwrap(iface);
- }
-
- @Override
- public boolean isWrapperFor(Class<?> iface) throws SQLException {
- logger.error(EELFLoggerDelegate.errorLogger, "proxystatement isWrapperFor: " + iface.getName());
- return stmt.isWrapperFor(iface);
- }
-
- @Override
- public ResultSet executeQuery(String sql) throws SQLException {
- logger.debug(EELFLoggerDelegate.applicationLogger,"executeQuery: "+sql);
- ResultSet r = null;
- try {
- mConn.preStatementHook(sql);
- r = stmt.executeQuery(sql);
- mConn.postStatementHook(sql);
- synchronizeTables(sql);
- } catch (Exception e) {
- String nm = e.getClass().getName();
- logger.error(EELFLoggerDelegate.errorLogger, "executeQuery: exception "+nm);
- if (!nm.startsWith(DATASTAX_PREFIX))
- throw e;
- }
- return r;
- }
-
- @Override
- public int executeUpdate(String sql) throws SQLException {
- logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
-
- int n = 0;
- try {
- mConn.preStatementHook(sql);
- n = stmt.executeUpdate(sql);
- mConn.postStatementHook(sql);
- synchronizeTables(sql);
- } catch (Exception e) {
- String nm = e.getClass().getName();
- logger.error(EELFLoggerDelegate.errorLogger, "executeUpdate: exception "+nm+" "+e);
- if (!nm.startsWith(DATASTAX_PREFIX))
- throw e;
- }
- return n;
- }
-
- @Override
- public void close() throws SQLException {
- logger.debug(EELFLoggerDelegate.applicationLogger,"Statement close: ");
- stmt.close();
- }
-
- @Override
- public int getMaxFieldSize() throws SQLException {
- logger.debug(EELFLoggerDelegate.applicationLogger,"getMaxFieldSize");
- return stmt.getMaxFieldSize();
- }
-
- @Override
- public void setMaxFieldSize(int max) throws SQLException {
- stmt.setMaxFieldSize(max);
- }
-
- @Override
- public int getMaxRows() throws SQLException {
- return stmt.getMaxRows();
- }
-
- @Override
- public void setMaxRows(int max) throws SQLException {
- stmt.setMaxRows(max);
- }
-
- @Override
- public void setEscapeProcessing(boolean enable) throws SQLException {
- stmt.setEscapeProcessing(enable);
- }
-
- @Override
- public int getQueryTimeout() throws SQLException {
- return stmt.getQueryTimeout();
- }
-
- @Override
- public void setQueryTimeout(int seconds) throws SQLException {
- //\TODO: we also need to implement a higher level timeout in MDBC
- logger.debug(EELFLoggerDelegate.applicationLogger,"setQueryTimeout seconds "+ seconds);
- stmt.setQueryTimeout(seconds);
- }
-
- @Override
- public void cancel() throws SQLException {
- stmt.cancel();
- }
-
- @Override
- public SQLWarning getWarnings() throws SQLException {
- return stmt.getWarnings();
- }
-
- @Override
- public void clearWarnings() throws SQLException {
- stmt.clearWarnings();
- }
-
- @Override
- public void setCursorName(String name) throws SQLException {
- stmt.setCursorName(name);
- }
-
- @Override
- public boolean execute(String sql) throws SQLException {
- logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
- boolean b = false;
- //\TODO Add the result of the postStatementHook to b
- try {
- mConn.preStatementHook(sql);
- b = stmt.execute(sql);
- mConn.postStatementHook(sql);
- synchronizeTables(sql);
- } catch (Exception e) {
- String nm = e.getClass().getName();
- logger.error(EELFLoggerDelegate.errorLogger, "execute: exception "+nm+" "+e);
- // Note: this seems to be the only call Camunda uses, so it is the only one I am fixing for now.
- boolean ignore = nm.startsWith(DATASTAX_PREFIX);
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MdbcStatement.class);
+ private static final String DATASTAX_PREFIX = "com.datastax.driver";
+
+ final Statement stmt; // the Statement that we are proxying
+ final MdbcConnection mConn;
+ //\TODO We may need to all pass the connection object to support autocommit
+
+ public MdbcStatement(Statement s, MdbcConnection mConn) {
+ this.stmt = s;
+ this.mConn = mConn;
+ }
+
+ public MdbcStatement(Statement stmt, String sql, MdbcConnection mConn) {
+ //\TODO why there is a constructor with a sql parameter in a not PreparedStatement
+ this.stmt = stmt;
+ this.mConn = mConn;
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ logger.error(EELFLoggerDelegate.errorLogger, "proxystatement unwrap: " + iface.getName());
+ return stmt.unwrap(iface);
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ logger.error(EELFLoggerDelegate.errorLogger, "proxystatement isWrapperFor: " + iface.getName());
+ return stmt.isWrapperFor(iface);
+ }
+
+ @Override
+ public ResultSet executeQuery(String sql) throws SQLException {
+ logger.debug(EELFLoggerDelegate.applicationLogger,"executeQuery: "+sql);
+ ResultSet r = null;
+ try {
+ mConn.preStatementHook(sql);
+ r = stmt.executeQuery(sql);
+ mConn.postStatementHook(sql);
+ synchronizeTables(sql);
+ } catch (SQLException e) {
+ String nm = e.getClass().getName();
+ logger.error(EELFLoggerDelegate.errorLogger, "executeQuery: exception "+nm);
+ if (!nm.startsWith(DATASTAX_PREFIX))
+ throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
+ }
+ return r;
+ }
+
+ @Override
+ public int executeUpdate(String sql) throws SQLException {
+ logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
+
+ int n = 0;
+ try {
+ mConn.preStatementHook(sql);
+ n = stmt.executeUpdate(sql);
+ mConn.postStatementHook(sql);
+ synchronizeTables(sql);
+ } catch (SQLException e) {
+ String nm = e.getClass().getName();
+ logger.error(EELFLoggerDelegate.errorLogger, "executeUpdate: exception "+nm+" "+e);
+ if (!nm.startsWith(DATASTAX_PREFIX))
+ throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
+ }
+ return n;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ logger.debug(EELFLoggerDelegate.applicationLogger,"Statement close: ");
+ stmt.close();
+ }
+
+ @Override
+ public int getMaxFieldSize() throws SQLException {
+ logger.debug(EELFLoggerDelegate.applicationLogger,"getMaxFieldSize");
+ return stmt.getMaxFieldSize();
+ }
+
+ @Override
+ public void setMaxFieldSize(int max) throws SQLException {
+ stmt.setMaxFieldSize(max);
+ }
+
+ @Override
+ public int getMaxRows() throws SQLException {
+ return stmt.getMaxRows();
+ }
+
+ @Override
+ public void setMaxRows(int max) throws SQLException {
+ stmt.setMaxRows(max);
+ }
+
+ @Override
+ public void setEscapeProcessing(boolean enable) throws SQLException {
+ stmt.setEscapeProcessing(enable);
+ }
+
+ @Override
+ public int getQueryTimeout() throws SQLException {
+ return stmt.getQueryTimeout();
+ }
+
+ @Override
+ public void setQueryTimeout(int seconds) throws SQLException {
+ //\TODO: we also need to implement a higher level timeout in MDBC
+ logger.debug(EELFLoggerDelegate.applicationLogger,"setQueryTimeout seconds "+ seconds);
+ stmt.setQueryTimeout(seconds);
+ }
+
+ @Override
+ public void cancel() throws SQLException {
+ stmt.cancel();
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ return stmt.getWarnings();
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {
+ stmt.clearWarnings();
+ }
+
+ @Override
+ public void setCursorName(String name) throws SQLException {
+ stmt.setCursorName(name);
+ }
+
+ @Override
+ public boolean execute(String sql) throws SQLException {
+ logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
+ boolean b = false;
+ //\TODO Add the result of the postStatementHook to b
+ try {
+ mConn.preStatementHook(sql);
+ b = stmt.execute(sql);
+ mConn.postStatementHook(sql);
+ synchronizeTables(sql);
+ } catch (SQLException e) {
+ String nm = e.getClass().getName();
+ logger.error(EELFLoggerDelegate.errorLogger, "execute: exception "+nm+" "+e);
+ // Note: this seems to be the only call Camunda uses, so it is the only one I am fixing for now.
+ boolean ignore = nm.startsWith(DATASTAX_PREFIX);
// ignore |= (nm.startsWith("org.h2.jdbc.JdbcSQLException") && e.getMessage().contains("already exists"));
- if (ignore) {
- logger.warn("execute: exception (IGNORED) "+nm);
- } else {
- logger.error(EELFLoggerDelegate.errorLogger, " Exception "+nm+" "+e);
- throw e;
- }
- }
- return b;
- }
-
- @Override
- public ResultSet getResultSet() throws SQLException {
- return stmt.getResultSet();
- }
-
- @Override
- public int getUpdateCount() throws SQLException {
- return stmt.getUpdateCount();
- }
-
- @Override
- public boolean getMoreResults() throws SQLException {
- return stmt.getMoreResults();
- }
-
- @Override
- public void setFetchDirection(int direction) throws SQLException {
- stmt.setFetchDirection(direction);
- }
-
- @Override
- public int getFetchDirection() throws SQLException {
- return stmt.getFetchDirection();
- }
-
- @Override
- public void setFetchSize(int rows) throws SQLException {
- stmt.setFetchSize(rows);
- }
-
- @Override
- public int getFetchSize() throws SQLException {
- return stmt.getFetchSize();
- }
-
- @Override
- public int getResultSetConcurrency() throws SQLException {
- return stmt.getResultSetConcurrency();
- }
-
- @Override
- public int getResultSetType() throws SQLException {
- return stmt.getResultSetType();
- }
-
- @Override
- public void addBatch(String sql) throws SQLException {
- stmt.addBatch(sql);
- }
-
- @Override
- public void clearBatch() throws SQLException {
- stmt.clearBatch();
- }
-
- @Override
- public int[] executeBatch() throws SQLException {
- logger.debug(EELFLoggerDelegate.applicationLogger,"executeBatch: ");
- int[] n = null;
- try {
- logger.debug(EELFLoggerDelegate.applicationLogger,"executeBatch() is not supported by MDBC; your results may be incorrect as a result.");
- n = stmt.executeBatch();
- synchronizeTables(null);
- } catch (Exception e) {
- String nm = e.getClass().getName();
- logger.error(EELFLoggerDelegate.errorLogger,"executeBatch: exception "+nm);
- if (!nm.startsWith(DATASTAX_PREFIX))
- throw e;
- }
- return n;
- }
-
- @Override
- public Connection getConnection() throws SQLException {
- return stmt.getConnection();
- }
-
- @Override
- public boolean getMoreResults(int current) throws SQLException {
- return stmt.getMoreResults(current);
- }
-
- @Override
- public ResultSet getGeneratedKeys() throws SQLException {
- return stmt.getGeneratedKeys();
- }
-
- @Override
- public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
- logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
- int n = 0;
- try {
- mConn.preStatementHook(sql);
- n = stmt.executeUpdate(sql, autoGeneratedKeys);
- mConn.postStatementHook(sql);
- synchronizeTables(sql);
- } catch (Exception e) {
- String nm = e.getClass().getName();
- logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm);
- if (!nm.startsWith(DATASTAX_PREFIX))
- throw e;
- }
- return n;
- }
-
- @Override
- public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
- logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
- int n = 0;
- try {
- mConn.preStatementHook(sql);
- n = stmt.executeUpdate(sql, columnIndexes);
- mConn.postStatementHook(sql);
- synchronizeTables(sql);
- } catch (Exception e) {
- String nm = e.getClass().getName();
- logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm);
- if (!nm.startsWith(DATASTAX_PREFIX))
- throw e;
- }
- return n;
- }
-
- @Override
- public int executeUpdate(String sql, String[] columnNames) throws SQLException {
- logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
- int n = 0;
- try {
- mConn.preStatementHook(sql);
- n = stmt.executeUpdate(sql, columnNames);
- mConn.postStatementHook(sql);
- synchronizeTables(sql);
- } catch (Exception e) {
- String nm = e.getClass().getName();
- logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm);
- if (!nm.startsWith(DATASTAX_PREFIX))
- throw e;
- }
- return n;
- }
-
- @Override
- public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
- logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
- boolean b = false;
- try {
- mConn.preStatementHook(sql);
- b = stmt.execute(sql, autoGeneratedKeys);
- mConn.postStatementHook(sql);
- synchronizeTables(sql);
- } catch (Exception e) {
- String nm = e.getClass().getName();
- logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm);
- if (!nm.startsWith(DATASTAX_PREFIX))
- throw e;
- }
- return b;
- }
-
- @Override
- public boolean execute(String sql, int[] columnIndexes) throws SQLException {
- logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
- boolean b = false;
- try {
- mConn.preStatementHook(sql);
- b = stmt.execute(sql, columnIndexes);
- mConn.postStatementHook(sql);
- synchronizeTables(sql);
- } catch (Exception e) {
- String nm = e.getClass().getName();
- logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm);
- if (!nm.startsWith(DATASTAX_PREFIX))
- throw e;
- }
- return b;
- }
-
- @Override
- public boolean execute(String sql, String[] columnNames) throws SQLException {
- logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
- //\TODO Idem to the other execute without columnNames
- boolean b = false;
- try {
- mConn.preStatementHook(sql);
- b = stmt.execute(sql, columnNames);
- mConn.postStatementHook(sql);
- synchronizeTables(sql);
- } catch (Exception e) {
- String nm = e.getClass().getName();
- logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm);
- if (!nm.startsWith(DATASTAX_PREFIX))
- throw e;
- }
- return b;
- }
-
- @Override
- public int getResultSetHoldability() throws SQLException {
- return stmt.getResultSetHoldability();
- }
-
- @Override
- public boolean isClosed() throws SQLException {
- return stmt.isClosed();
- }
-
- @Override
- public void setPoolable(boolean poolable) throws SQLException {
- stmt.setPoolable(poolable);
- }
-
- @Override
- public boolean isPoolable() throws SQLException {
- return stmt.isPoolable();
- }
-
- @Override
- public void closeOnCompletion() throws SQLException {
- stmt.closeOnCompletion();
- }
-
- @Override
- public boolean isCloseOnCompletion() throws SQLException {
- return stmt.isCloseOnCompletion();
- }
-
- protected void synchronizeTables(String sql) {
- if (sql == null || sql.trim().toLowerCase().startsWith("create")) {
- if (mConn != null) {
- try {
- mConn.synchronizeTables();
- } catch (QueryException e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
- }
- }
- }
- }
+ if (ignore) {
+ logger.warn("execute: exception (IGNORED) "+nm);
+ } else {
+ logger.error(EELFLoggerDelegate.errorLogger, " Exception "+nm+" "+e);
+ throw e;
+ }
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
+ }
+ return b;
+ }
+
+ @Override
+ public ResultSet getResultSet() throws SQLException {
+ return stmt.getResultSet();
+ }
+
+ @Override
+ public int getUpdateCount() throws SQLException {
+ return stmt.getUpdateCount();
+ }
+
+ @Override
+ public boolean getMoreResults() throws SQLException {
+ return stmt.getMoreResults();
+ }
+
+ @Override
+ public void setFetchDirection(int direction) throws SQLException {
+ stmt.setFetchDirection(direction);
+ }
+
+ @Override
+ public int getFetchDirection() throws SQLException {
+ return stmt.getFetchDirection();
+ }
+
+ @Override
+ public void setFetchSize(int rows) throws SQLException {
+ stmt.setFetchSize(rows);
+ }
+
+ @Override
+ public int getFetchSize() throws SQLException {
+ return stmt.getFetchSize();
+ }
+
+ @Override
+ public int getResultSetConcurrency() throws SQLException {
+ return stmt.getResultSetConcurrency();
+ }
+
+ @Override
+ public int getResultSetType() throws SQLException {
+ return stmt.getResultSetType();
+ }
+
+ @Override
+ public void addBatch(String sql) throws SQLException {
+ stmt.addBatch(sql);
+ }
+
+ @Override
+ public void clearBatch() throws SQLException {
+ stmt.clearBatch();
+ }
+
+ @Override
+ public int[] executeBatch() throws SQLException {
+ logger.debug(EELFLoggerDelegate.applicationLogger,"executeBatch: ");
+ int[] n = null;
+ try {
+ logger.debug(EELFLoggerDelegate.applicationLogger,"executeBatch() is not supported by MDBC; your results may be incorrect as a result.");
+ n = stmt.executeBatch();
+ synchronizeTables(null);
+ } catch (Exception e) {
+ String nm = e.getClass().getName();
+ logger.error(EELFLoggerDelegate.errorLogger,"executeBatch: exception "+nm);
+ if (!nm.startsWith(DATASTAX_PREFIX))
+ throw e;
+ }
+ return n;
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ return stmt.getConnection();
+ }
+
+ @Override
+ public boolean getMoreResults(int current) throws SQLException {
+ return stmt.getMoreResults(current);
+ }
+
+ @Override
+ public ResultSet getGeneratedKeys() throws SQLException {
+ return stmt.getGeneratedKeys();
+ }
+
+ @Override
+ public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+ logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
+ int n = 0;
+ try {
+ mConn.preStatementHook(sql);
+ n = stmt.executeUpdate(sql, autoGeneratedKeys);
+ mConn.postStatementHook(sql);
+ synchronizeTables(sql);
+ } catch (SQLException e) {
+ String nm = e.getClass().getName();
+ logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm);
+ if (!nm.startsWith(DATASTAX_PREFIX))
+ throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
+ }
+ return n;
+ }
+
+ @Override
+ public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+ logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
+ int n = 0;
+ try {
+ mConn.preStatementHook(sql);
+ n = stmt.executeUpdate(sql, columnIndexes);
+ mConn.postStatementHook(sql);
+ synchronizeTables(sql);
+ } catch (SQLException e) {
+ String nm = e.getClass().getName();
+ logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm);
+ if (!nm.startsWith(DATASTAX_PREFIX))
+ throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
+ }
+ return n;
+ }
+
+ @Override
+ public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+ logger.debug(EELFLoggerDelegate.applicationLogger,"executeUpdate: "+sql);
+ int n = 0;
+ try {
+ mConn.preStatementHook(sql);
+ n = stmt.executeUpdate(sql, columnNames);
+ mConn.postStatementHook(sql);
+ synchronizeTables(sql);
+ } catch (SQLException e) {
+ String nm = e.getClass().getName();
+ logger.error(EELFLoggerDelegate.errorLogger,"executeUpdate: exception "+nm);
+ if (!nm.startsWith(DATASTAX_PREFIX))
+ throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
+ }
+ return n;
+ }
+
+ @Override
+ public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+ logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
+ boolean b = false;
+ try {
+ mConn.preStatementHook(sql);
+ b = stmt.execute(sql, autoGeneratedKeys);
+ mConn.postStatementHook(sql);
+ synchronizeTables(sql);
+ } catch (SQLException e) {
+ String nm = e.getClass().getName();
+ logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm);
+ if (!nm.startsWith(DATASTAX_PREFIX))
+ throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
+ }
+ return b;
+ }
+
+ @Override
+ public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+ logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
+ boolean b = false;
+ try {
+ mConn.preStatementHook(sql);
+ b = stmt.execute(sql, columnIndexes);
+ mConn.postStatementHook(sql);
+ synchronizeTables(sql);
+ } catch (SQLException e) {
+ String nm = e.getClass().getName();
+ logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm);
+ if (!nm.startsWith(DATASTAX_PREFIX))
+ throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
+ }
+ return b;
+ }
+
+ @Override
+ public boolean execute(String sql, String[] columnNames) throws SQLException {
+ logger.debug(EELFLoggerDelegate.applicationLogger,"execute: "+sql);
+ //\TODO Idem to the other execute without columnNames
+ boolean b = false;
+ try {
+ mConn.preStatementHook(sql);
+ b = stmt.execute(sql, columnNames);
+ mConn.postStatementHook(sql);
+ synchronizeTables(sql);
+ } catch (SQLException e) {
+ String nm = e.getClass().getName();
+ logger.error(EELFLoggerDelegate.errorLogger,"execute: exception "+nm);
+ if (!nm.startsWith(DATASTAX_PREFIX))
+ throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
+ }
+ return b;
+ }
+
+ @Override
+ public int getResultSetHoldability() throws SQLException {
+ return stmt.getResultSetHoldability();
+ }
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ return stmt.isClosed();
+ }
+
+ @Override
+ public void setPoolable(boolean poolable) throws SQLException {
+ stmt.setPoolable(poolable);
+ }
+
+ @Override
+ public boolean isPoolable() throws SQLException {
+ return stmt.isPoolable();
+ }
+
+ @Override
+ public void closeOnCompletion() throws SQLException {
+ stmt.closeOnCompletion();
+ }
+
+ @Override
+ public boolean isCloseOnCompletion() throws SQLException {
+ return stmt.isCloseOnCompletion();
+ }
+
+ protected void synchronizeTables(String sql) {
+ if (sql == null || sql.trim().toLowerCase().startsWith("create")) {
+ if (mConn != null) {
+ try {
+ mConn.synchronizeTables();
+ } catch (QueryException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
+ }
+ }
+ }
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java
index 3175f94..059793d 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ProxyStatement.java
@@ -45,9 +45,9 @@ import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Map;
-import org.apache.log4j.Logger;
-
+import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.exceptions.QueryException;
+import org.onap.music.logging.EELFLoggerDelegate;
/**
* ProxyStatement is a proxy Statement that front ends Statements from the underlying JDBC driver. It passes all operations through,
@@ -56,7 +56,7 @@ import org.onap.music.exceptions.QueryException;
* @author Robert Eby
*/
public class ProxyStatement implements CallableStatement {
- private static final Logger logger = Logger.getLogger(ProxyStatement.class);
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(ProxyStatement.class);
private static final String DATASTAX_PREFIX = "com.datastax.driver";
private final Statement stmt; // the Statement that we are proxying
@@ -86,12 +86,15 @@ public class ProxyStatement implements CallableStatement {
r = stmt.executeQuery(sql);
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
logger.warn("executeQuery: exception "+nm);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
+
return r;
}
@@ -104,11 +107,13 @@ public class ProxyStatement implements CallableStatement {
n = stmt.executeUpdate(sql);
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
logger.warn("executeUpdate: exception "+nm);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
return n;
}
@@ -182,7 +187,7 @@ public class ProxyStatement implements CallableStatement {
b = stmt.execute(sql);
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
// Note: this seems to be the only call Camunda uses, so it is the only one I am fixing for now.
boolean ignore = nm.startsWith(DATASTAX_PREFIX);
@@ -193,6 +198,8 @@ public class ProxyStatement implements CallableStatement {
logger.warn("execute: exception "+nm);
throw e;
}
+ } catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
return b;
}
@@ -256,16 +263,18 @@ public class ProxyStatement implements CallableStatement {
public int[] executeBatch() throws SQLException {
logger.debug("executeBatch");
int[] n = null;
+ //\TODO check if this is still invalid in Metric
try {
logger.warn("executeBatch() is not supported by MDBC; your results may be incorrect as a result.");
n = stmt.executeBatch();
synchronizeTables(null);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
logger.warn("executeBatch: exception "+nm);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
- }
+ }
+
return n;
}
@@ -293,11 +302,13 @@ public class ProxyStatement implements CallableStatement {
n = stmt.executeUpdate(sql, autoGeneratedKeys);
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
logger.warn("executeUpdate: exception "+nm);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
+ }catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
return n;
}
@@ -311,11 +322,13 @@ public class ProxyStatement implements CallableStatement {
n = stmt.executeUpdate(sql, columnIndexes);
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
logger.warn("executeUpdate: exception "+nm);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
+ }catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
return n;
}
@@ -329,11 +342,13 @@ public class ProxyStatement implements CallableStatement {
n = stmt.executeUpdate(sql, columnNames);
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
logger.warn("executeUpdate: exception "+nm);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
+ }catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
return n;
}
@@ -347,11 +362,13 @@ public class ProxyStatement implements CallableStatement {
b = stmt.execute(sql, autoGeneratedKeys);
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
logger.warn("execute: exception "+nm);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
+ }catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
return b;
}
@@ -365,11 +382,13 @@ public class ProxyStatement implements CallableStatement {
b = stmt.execute(sql, columnIndexes);
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
logger.warn("execute: exception "+nm);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
+ }catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
return b;
}
@@ -383,11 +402,13 @@ public class ProxyStatement implements CallableStatement {
b = stmt.execute(sql, columnNames);
mConn.postStatementHook(sql);
synchronizeTables(sql);
- } catch (Exception e) {
+ } catch (SQLException e) {
String nm = e.getClass().getName();
logger.warn("execute: exception "+nm);
if (!nm.startsWith(DATASTAX_PREFIX))
throw e;
+ }catch (MDBCServiceException e) {
+ throw new SQLException(e.getMessage());
}
return b;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
index e182368..4bccbba 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
@@ -29,11 +29,11 @@ import java.util.Objects;
* In the future we may decide to partition ranges differently
* @author Enrique Saurez
*/
-public class Range implements Serializable {
+public class Range implements Serializable, Cloneable{
private static final long serialVersionUID = 1610744496930800088L;
- final public String table;
+ public String table;
public Range(String table) {
this.table = table;
@@ -58,8 +58,22 @@ public class Range implements Serializable {
public int hashCode(){
return Objects.hash(table);
}
-
+
+ @Override
+ protected Range clone() {
+ Range newRange = null;
+ try{
+ newRange = (Range) super.clone();
+ newRange.table = this.table;
+ }
+ catch(CloneNotSupportedException cns){
+ //\TODO add logging
+ }
+ return newRange;
+
+ }
public boolean overlaps(Range other) {
- return table == other.table;
+ return table.equals(other.table);
}
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index c13fe16..f281548 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -19,8 +19,9 @@
*/
package org.onap.music.mdbc;
+import java.util.ArrayList;
+import java.util.List;
import org.onap.music.exceptions.MDBCServiceException;
-import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.logging.format.AppMessages;
import org.onap.music.logging.format.ErrorSeverity;
@@ -71,16 +72,16 @@ public class StateManager {
/** Identifier for this server instance */
private String mdbcServerName;
+ private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition
- @SuppressWarnings("unused")
- private DatabasePartition ranges;
- public StateManager(String sqlDBUrl, Properties info, DatabasePartition ranges, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
+ public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
this.sqlDBName = sqlDBName;
- this.ranges = ranges;
this.sqlDBUrl = sqlDBUrl;
this.info = info;
this.mdbcServerName = mdbcServerName;
+
+ this.connectionRanges = new HashMap<>();
this.transactionInfo = new TxCommitProgress();
//\fixme this might not be used, delete?
try {
@@ -102,9 +103,7 @@ public class StateManager {
}
/**
- * Initialize the connections to music, set up any necessary tables in music
- * @param mixin
- * @param cassandraUrl
+ * Initialize all the interfaces and datastructures
* @throws MDBCServiceException
*/
protected void initMusic() throws MDBCServiceException {
@@ -118,7 +117,8 @@ public class StateManager {
Class.forName("org.mariadb.jdbc.Driver");
}
catch (ClassNotFoundException e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR);
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
+ ErrorTypes.GENERALSERVICEERROR);
return;
}
try {
@@ -129,7 +129,8 @@ public class StateManager {
Statement stmt = sqlConnection.createStatement();
stmt.execute(sql.toString());
} catch (SQLException e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR);
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
+ ErrorTypes.GENERALSERVICEERROR);
throw new MDBCServiceException(e.getMessage());
}
}
@@ -138,15 +139,11 @@ public class StateManager {
return this.musicInterface;
}
- public DatabasePartition getRanges() {
- return ranges;
+ public List<DatabasePartition> getRanges() {
+ return new ArrayList<>(connectionRanges.values());
}
- public void setRanges(DatabasePartition ranges) {
- this.ranges = ranges;
- }
-
-
+
public void closeConnection(String connectionId){
//\TODO check if there is a race condition
if(mdbcConnections.containsKey(connectionId)) {
@@ -156,10 +153,15 @@ public class StateManager {
if(conn!=null)
conn.close();
} catch (SQLException e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR);
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
+ ErrorTypes.GENERALSERVICEERROR);
}
mdbcConnections.remove(connectionId);
}
+ if(connectionRanges.containsKey(connectionId)){
+ //TODO release lock?
+ connectionRanges.remove(connectionId);
+ }
}
/**
@@ -179,20 +181,34 @@ public class StateManager {
}
catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR);
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
+ ErrorTypes.GENERALSERVICEERROR);
return;
}
try {
sqlConnection = DriverManager.getConnection(this.sqlDBUrl+"/"+this.sqlDBName, this.info);
} catch (SQLException e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL,
+ ErrorTypes.QUERYERROR);
sqlConnection = null;
}
+ //check if a range was already created for this connection
+ //TODO: later we could try to match it to some more sticky client id
+ DatabasePartition ranges;
+ if(connectionRanges.containsKey(id)){
+ ranges=connectionRanges.get(id);
+ }
+ else{
+ ranges=new DatabasePartition();
+ connectionRanges.put(id,ranges);
+ }
//Create MDBC connection
try {
- newConnection = new MdbcConnection(id, this.sqlDBUrl+"/"+this.sqlDBName, sqlConnection, info, this.musicInterface, transactionInfo,ranges);
+ newConnection = new MdbcConnection(id, this.sqlDBUrl+"/"+this.sqlDBName, sqlConnection, info, this.musicInterface,
+ transactionInfo,ranges);
} catch (MDBCServiceException e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
+ ErrorTypes.QUERYERROR);
newConnection = null;
return;
}
@@ -227,7 +243,8 @@ public class StateManager {
}
catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL,
+ ErrorTypes.QUERYERROR);
}
//Create connection to local SQL DB
@@ -235,14 +252,27 @@ public class StateManager {
sqlConnection = DriverManager.getConnection(this.sqlDBUrl+"/"+this.sqlDBName, this.info);
} catch (SQLException e) {
logger.error("sql connection was not created correctly");
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL,
+ ErrorTypes.QUERYERROR);
sqlConnection = null;
}
+ //check if a range was already created for this connection
+ //TODO: later we could try to match it to some more sticky client id
+ DatabasePartition ranges;
+ if(connectionRanges.containsKey(id)){
+ ranges=connectionRanges.get(id);
+ }
+ else{
+ ranges=new DatabasePartition();
+ connectionRanges.put(id,ranges);
+ }
//Create MDBC connection
try {
- newConnection = new MdbcConnection(id,this.sqlDBUrl+"/"+this.sqlDBName, sqlConnection, info, this.musicInterface, transactionInfo,ranges);
+ newConnection = new MdbcConnection(id,this.sqlDBUrl+"/"+this.sqlDBName, sqlConnection, info, this.musicInterface,
+ transactionInfo,ranges);
} catch (MDBCServiceException e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
+ ErrorTypes.QUERYERROR);
newConnection = null;
}
logger.info(EELFLoggerDelegate.applicationLogger,"Connection created for connection: "+id);
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
index 73622b1..383b4b3 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
@@ -111,11 +111,13 @@ public interface DBInterface {
List<String> getReservedTblNames();
String getPrimaryKey(String sql, String tableName);
-
+
+ String applyDigest(Map<Range,StagingTable> digest);
+
/**
* Replay a given TxDigest into the local DB
* @param digest
* @throws SQLException if replay cannot occur correctly
*/
- public void replayTransaction(HashMap<Range,StagingTable> digest) throws SQLException;
+ void replayTransaction(HashMap<Range,StagingTable> digest) throws SQLException;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
index f4231d8..64e9253 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
@@ -28,12 +28,12 @@ import org.json.JSONObject;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.exceptions.MusicServiceException;
+import org.onap.music.exceptions.MusicLockingException;
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.StagingTable;
-import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.TxCommitProgress;
@@ -42,7 +42,26 @@ import org.onap.music.mdbc.tables.TxCommitProgress;
*
* @author Robert P. Eby
*/
-public interface MusicInterface {
+public interface MusicInterface {
+ class OwnershipReturn{
+ private final String ownerId;
+ private final UUID rangeId;
+ private List<UUID> oldIds;
+ public OwnershipReturn(String ownerId, UUID rangeId, List<UUID> oldIds){
+ this.ownerId=ownerId;
+ this.rangeId=rangeId;
+ this.oldIds=oldIds;
+ }
+ public String getOwnerId(){
+ return ownerId;
+ }
+ public UUID getRangeId(){
+ return rangeId;
+ }
+ public List<UUID> getOldIRangeds(){
+ return oldIds;
+ }
+ }
/**
* Get the name of this MusicInterface mixin object.
* @return the name
@@ -57,7 +76,7 @@ public interface MusicInterface {
* generates a key or placeholder for what is required for a primary key
* @return a primary key
*/
- String generateUniqueKey();
+ UUID generateUniqueKey();
/**
* Find the key used with Music for a table that was created without a primary index
@@ -68,7 +87,8 @@ public interface MusicInterface {
* @param dbRow row obtained from the SQL layer
* @return key associated with the row
*/
- String getMusicKeyFromRowWithoutPrimaryIndexes(TableInfo ti, String table, JSONObject dbRow);
+ String getMusicKeyFromRowWithoutPrimaryIndexes(TableInfo ti, String table, JSONObject dbRow)
+ ;
/**
* Do what is needed to close down the MUSIC connection.
*/
@@ -157,30 +177,98 @@ public interface MusicInterface {
/**
* Commits the corresponding REDO-log into MUSIC
*
- * @param partition
- * @param transactionDigest digest of the transaction that is being committed into the Redo log in music. It has to be a HashMap, because it is required to be serializable
+ * @param partition information related to ownership of partitions, used to verify ownership when commiting the Tx
+ * @param transactionDigest digest of the transaction that is being committed into the Redo log in music. It has to
+ * be a HashMap, because it is required to be serializable
* @param txId id associated with the log being send
* @param progressKeeper data structure that is used to handle to detect failures, and know what to do
* @throws MDBCServiceException
*/
void commitLog(DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException;
+
+ /**
+ * This function is used to obtain the information related to a specific row in the MRI table
+ * @param partitionIndex index of the row that is going to be retrieved
+ * @return all the information related to the table
+ * @throws MDBCServiceException
+ */
MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException;
+ /**
+ * This function is used to create a new row in the MRI table
+ * @param info the information used to create the row
+ * @return the new partition object that contain the new information used to create the row
+ * @throws MDBCServiceException
+ */
DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException;
-
+
+ /**
+ * This function is used to append an index to the redo log in a MRI row
+ * @param mriRowId mri row index to which we are going to append the index to the redo log
+ * @param partition information related to ownership of partitions, used to verify ownership
+ * @param newRecord index of the new record to be appended to the redo log
+ * @throws MDBCServiceException
+ */
void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException;
-
+
+ /**
+ * This functions adds the tx digest to
+ * @param newId id used as index in the MTD table
+ * @param transactionDigest digest that contains all the changes performed in the transaction
+ * @throws MDBCServiceException
+ */
void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException;
+ /**
+ * Function used to retrieve a given transaction digest and deserialize it
+ * @param id of the transaction digest to be retrieved
+ * @return the deserialize transaction digest that can be applied to the local SQL database
+ * @throws MDBCServiceException
+ */
HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException;
-
- void own(List<Range> ranges);
- void appendRange(String rangeId, List<Range> ranges);
+ /**
+ * Use this functions to verify ownership, and own new ranges
+ * @param ranges the ranges that should be own after calling this function
+ * @param partition current information of the ownership in the system
+ * @return an object indicating the status of the own function result
+ * @throws MDBCServiceException
+ */
+ OwnershipReturn own(List<Range> ranges, DatabasePartition partition) throws MDBCServiceException;
+
+ /**
+ * This function relinquish ownership, if it is time to do it, it should be used at the end of a commit operation
+ * @param partition information of the partition that is currently being owned
+ * @throws MDBCServiceException
+ */
+ void relinquishIfRequired(DatabasePartition partition) throws MDBCServiceException;
+
+ /**
+ * This function is in charge of owning all the ranges requested and creating a new row that show the ownership of all
+ * those ranges.
+ * @param rangeId new id to be used in the new row
+ * @param ranges ranges to be owned by the end of the function called
+ * @param partition current ownership status
+ * @return
+ * @throws MDBCServiceException
+ */
+ OwnershipReturn appendRange(String rangeId, List<Range> ranges, DatabasePartition partition) throws MDBCServiceException;
+
+ /**
+ * This functions relinquishes a range
+ * @param ownerId id of the current ownerh
+ * @param rangeId id of the range to be relinquished
+ * @throws MusicLockingException
+ */
+ void relinquish(String ownerId, String rangeId) throws MDBCServiceException;
- void relinquish(String ownerId, String rangeId);
- public List<UUID> getPartitionIndexes();
+ /**
+ * This function return all the range indexes that are currently hold by any of the connections in the system
+ * @return list of ids of rows in MRI
+ */
+ List<UUID> getPartitionIndexes() throws MDBCServiceException;
+ void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index 8ffb6af..7228b55 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,19 +27,21 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
+import org.apache.commons.lang.NotImplementedException;
import org.onap.music.mdbc.*;
import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.StagingTable;
-import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
-import org.onap.music.mdbc.tables.MusicTxDigest;
import org.onap.music.mdbc.tables.TxCommitProgress;
import org.json.JSONObject;
@@ -49,7 +51,6 @@ import org.onap.music.exceptions.MusicLockingException;
import org.onap.music.exceptions.MusicQueryException;
import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.main.MusicCore;
-import org.onap.music.main.MusicCore.Condition;
import org.onap.music.main.ResultType;
import org.onap.music.main.ReturnType;
@@ -84,104 +85,131 @@ import com.datastax.driver.core.TupleValue;
* @author Robert P. Eby
*/
public class MusicMixin implements MusicInterface {
- /** The property name to use to identify this replica to MusicSqlManager */
- public static final String KEY_MY_ID = "myid";
- /** The property name to use for the comma-separated list of replica IDs. */
- public static final String KEY_REPLICAS = "replica_ids";
- /** The property name to use to identify the IP address for Cassandra. */
- public static final String KEY_MUSIC_ADDRESS = "cassandra.host";
- /** The property name to use to provide the replication factor for Cassandra. */
- public static final String KEY_MUSIC_RFACTOR = "music_rfactor";
- /** The property name to use to provide the replication factor for Cassandra. */
- public static final String KEY_MUSIC_NAMESPACE = "music_namespace";
- /** Namespace for the tables in MUSIC (Cassandra) */
- public static final String DEFAULT_MUSIC_NAMESPACE = "namespace";
- /** The default property value to use for the Cassandra IP address. */
- public static final String DEFAULT_MUSIC_ADDRESS = "localhost";
- /** The default property value to use for the Cassandra replication factor. */
- public static final int DEFAULT_MUSIC_RFACTOR = 1;
- /** The default primary string column, if none is provided. */
- public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid";
- /** Type of the primary key, if none is defined by the user */
- public static final String MDBC_PRIMARYKEY_TYPE = "uuid";
-
-
- //\TODO Add logic to change the names when required and create the tables when necessary
+ /** The property name to use to identify this replica to MusicSqlManager */
+ public static final String KEY_MY_ID = "myid";
+ /** The property name to use for the comma-separated list of replica IDs. */
+ public static final String KEY_REPLICAS = "replica_ids";
+ /** The property name to use to identify the IP address for Cassandra. */
+ public static final String KEY_MUSIC_ADDRESS = "cassandra.host";
+ /** The property name to use to provide the replication factor for Cassandra. */
+ public static final String KEY_MUSIC_RFACTOR = "music_rfactor";
+ /** The property name to use to provide the replication factor for Cassandra. */
+ public static final String KEY_MUSIC_NAMESPACE = "music_namespace";
+ /** Namespace for the tables in MUSIC (Cassandra) */
+ public static final String DEFAULT_MUSIC_NAMESPACE = "namespace";
+ /** The default property value to use for the Cassandra IP address. */
+ public static final String DEFAULT_MUSIC_ADDRESS = "localhost";
+ /** The default property value to use for the Cassandra replication factor. */
+ public static final int DEFAULT_MUSIC_RFACTOR = 1;
+ /** The default primary string column, if none is provided. */
+ public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid";
+ /** Type of the primary key, if none is defined by the user */
+ public static final String MDBC_PRIMARYKEY_TYPE = "uuid";
+
+
+ //\TODO Add logic to change the names when required and create the tables when necessary
private String musicTxDigestTableName = "musictxdigest";
- private String musicRangeInformationTableName = "musicrangeinformation";
-
- private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicMixin.class);
-
- private static final Map<Integer, String> typemap = new HashMap<>();
- static {
- // We only support the following type mappings currently (from DB -> Cassandra).
- // Anything else will likely cause a NullPointerException
- typemap.put(Types.BIGINT, "BIGINT"); // aka. IDENTITY
- typemap.put(Types.BLOB, "VARCHAR");
- typemap.put(Types.BOOLEAN, "BOOLEAN");
- typemap.put(Types.CLOB, "BLOB");
- typemap.put(Types.DATE, "VARCHAR");
- typemap.put(Types.DOUBLE, "DOUBLE");
- typemap.put(Types.DECIMAL, "DECIMAL");
- typemap.put(Types.INTEGER, "INT");
- //typemap.put(Types.TIMESTAMP, "TIMESTAMP");
- typemap.put(Types.SMALLINT, "SMALLINT");
- typemap.put(Types.TIMESTAMP, "VARCHAR");
- typemap.put(Types.VARBINARY, "BLOB");
- typemap.put(Types.VARCHAR, "VARCHAR");
- typemap.put(Types.CHAR, "VARCHAR");
- //The "Hacks", these don't have a direct mapping
- //typemap.put(Types.DATE, "VARCHAR");
- //typemap.put(Types.DATE, "TIMESTAMP");
- }
-
- protected final String music_ns;
- protected final String myId;
- protected final String[] allReplicaIds;
- private final String musicAddress;
- private final int music_rfactor;
- private MusicConnector mCon = null;
- private Session musicSession = null;
- private boolean keyspace_created = false;
- private Map<String, PreparedStatement> ps_cache = new HashMap<>();
- private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>());
-
- public MusicMixin() {
- //this.logger = null;
- this.musicAddress = null;
- this.music_ns = null;
- this.music_rfactor = 0;
- this.myId = null;
- this.allReplicaIds = null;
- }
-
- public MusicMixin(String mdbcServerName, Properties info) throws MDBCServiceException {
- // Default values -- should be overridden in the Properties
- // Default to using the host_ids of the various peers as the replica IDs (this is probably preferred)
- this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS);
- logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: musicAddress="+musicAddress);
-
- String s = info.getProperty(KEY_MUSIC_RFACTOR);
- this.music_rfactor = (s == null) ? DEFAULT_MUSIC_RFACTOR : Integer.parseInt(s);
-
- this.myId = info.getProperty(KEY_MY_ID, getMyHostId());
- logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: myId="+myId);
+ private String musicRangeInformationTableName = "musicrangeinformation";
+
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicMixin.class);
+
+ private class LockResult{
+ private final UUID musicRangeInformationIndex;
+ private final String ownerId;
+ private final boolean newLock;
+ public LockResult(UUID rowId, String ownerId, boolean newLock){
+ this.musicRangeInformationIndex = rowId;
+ this.ownerId=ownerId;
+ this.newLock=newLock;
+ }
+ public String getOwnerId(){
+ return ownerId;
+ }
+ public boolean getNewLock(){
+ return newLock;
+ }
+ public UUID getIndex() {return musicRangeInformationIndex;}
+ }
-
- this.allReplicaIds = info.getProperty(KEY_REPLICAS, getAllHostIds()).split(",");
- logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: allReplicaIds="+info.getProperty(KEY_REPLICAS, this.myId));
- this.music_ns = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE);
- logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns);
-
- initializeMetricTables();
+ private static final Map<Integer, String> typemap = new HashMap<>();
+ static {
+ // We only support the following type mappings currently (from DB -> Cassandra).
+ // Anything else will likely cause a NullPointerException
+ typemap.put(Types.BIGINT, "BIGINT"); // aka. IDENTITY
+ typemap.put(Types.BLOB, "VARCHAR");
+ typemap.put(Types.BOOLEAN, "BOOLEAN");
+ typemap.put(Types.CLOB, "BLOB");
+ typemap.put(Types.DATE, "VARCHAR");
+ typemap.put(Types.DOUBLE, "DOUBLE");
+ typemap.put(Types.DECIMAL, "DECIMAL");
+ typemap.put(Types.INTEGER, "INT");
+ //typemap.put(Types.TIMESTAMP, "TIMESTAMP");
+ typemap.put(Types.SMALLINT, "SMALLINT");
+ typemap.put(Types.TIMESTAMP, "VARCHAR");
+ typemap.put(Types.VARBINARY, "BLOB");
+ typemap.put(Types.VARCHAR, "VARCHAR");
+ typemap.put(Types.CHAR, "VARCHAR");
+ //The "Hacks", these don't have a direct mapping
+ //typemap.put(Types.DATE, "VARCHAR");
+ //typemap.put(Types.DATE, "TIMESTAMP");
+ }
+
+ protected final String music_ns;
+ protected final String myId;
+ protected final String[] allReplicaIds;
+ private final String musicAddress;
+ private final int music_rfactor;
+ private MusicConnector mCon = null;
+ private Session musicSession = null;
+ private boolean keyspace_created = false;
+ private Map<String, PreparedStatement> ps_cache = new HashMap<>();
+ private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>());
+
+ public MusicMixin() {
+ //this.logger = null;
+ this.musicAddress = null;
+ this.music_ns = null;
+ this.music_rfactor = 0;
+ this.myId = null;
+ this.allReplicaIds = null;
+ }
+
+ public MusicMixin(String mdbcServerName, Properties info) throws MDBCServiceException {
+ // Default values -- should be overridden in the Properties
+ // Default to using the host_ids of the various peers as the replica IDs (this is probably preferred)
+ this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS);
+ logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: musicAddress="+musicAddress);
+
+ String s = info.getProperty(KEY_MUSIC_RFACTOR);
+ this.music_rfactor = (s == null) ? DEFAULT_MUSIC_RFACTOR : Integer.parseInt(s);
+
+ this.myId = info.getProperty(KEY_MY_ID, getMyHostId());
+ logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: myId="+myId);
+
+
+ this.allReplicaIds = info.getProperty(KEY_REPLICAS, getAllHostIds()).split(",");
+ logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: allReplicaIds="+info.getProperty(KEY_REPLICAS, this.myId));
+
+ this.music_ns = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE);
+ logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns);
+
+ initializeMetricTables();
+ }
+
+ public String getMusicTxDigestTableName(){
+ return musicTxDigestTableName;
}
- /**
- * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables.
- * The keyspace name comes from the initialization properties passed to the JDBC driver.
- */
- @Override
+ public String getMusicRangeInformationTableName(){
+ return musicRangeInformationTableName;
+ }
+
+ /**
+ * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables.
+ * The keyspace name comes from the initialization properties passed to the JDBC driver.
+ */
+ @Override
public void createKeyspace() throws MDBCServiceException {
Map<String,Object> replicationInfo = new HashMap<>();
@@ -190,202 +218,213 @@ public class MusicMixin implements MusicInterface {
PreparedQueryObject queryObject = new PreparedQueryObject();
queryObject.appendQueryString(
- "CREATE KEYSPACE IF NOT EXISTS " + this.music_ns +
+ "CREATE KEYSPACE IF NOT EXISTS " + this.music_ns +
" WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":"));
try {
MusicCore.nonKeyRelatedPut(queryObject, "eventual");
- } catch (MusicServiceException e) {
+ } catch (MusicServiceException e) {
if (!e.getMessage().equals("Keyspace "+music_ns+" already exists")) {
- throw new MDBCServiceException("Error creating namespace: "+music_ns+". Internal error:"+e.getErrorMessage());
+ throw new MDBCServiceException("Error creating namespace: "+music_ns+". Internal error:"+e.getErrorMessage());
+ }
+ }
+ }
+
+ private String getMyHostId() {
+ ResultSet rs = null;
+ try {
+ rs = executeMusicRead("SELECT HOST_ID FROM SYSTEM.LOCAL");
+ } catch (MDBCServiceException e) {
+ return "UNKNOWN";
+ }
+ Row row = rs.one();
+ return (row == null) ? "UNKNOWN" : row.getUUID("HOST_ID").toString();
+ }
+ private String getAllHostIds() {
+ ResultSet results = null;
+ try {
+ results = executeMusicRead("SELECT HOST_ID FROM SYSTEM.PEERS");
+ } catch (MDBCServiceException e) {
+ }
+ StringBuilder sb = new StringBuilder(myId);
+ if(results!=null) {
+ for (Row row : results) {
+ sb.append(",");
+ sb.append(row.getUUID("HOST_ID").toString());
}
}
+ return sb.toString();
+ }
+
+ /**
+ * Get the name of this MusicInterface mixin object.
+ * @return the name
+ */
+ @Override
+ public String getMixinName() {
+ return "cassandra";
+ }
+ /**
+ * Do what is needed to close down the MUSIC connection.
+ */
+ @Override
+ public void close() {
+ if (musicSession != null) {
+ musicSession.close();
+ musicSession = null;
+ }
}
- private String getMyHostId() {
- ResultSet rs = executeMusicRead("SELECT HOST_ID FROM SYSTEM.LOCAL");
- Row row = rs.one();
- return (row == null) ? "UNKNOWN" : row.getUUID("HOST_ID").toString();
- }
- private String getAllHostIds() {
- ResultSet results = executeMusicRead("SELECT HOST_ID FROM SYSTEM.PEERS");
- StringBuilder sb = new StringBuilder(myId);
- for (Row row : results) {
- sb.append(",");
- sb.append(row.getUUID("HOST_ID").toString());
- }
- return sb.toString();
- }
-
- /**
- * Get the name of this MusicInterface mixin object.
- * @return the name
- */
- @Override
- public String getMixinName() {
- return "cassandra";
- }
- /**
- * Do what is needed to close down the MUSIC connection.
- */
- @Override
- public void close() {
- if (musicSession != null) {
- musicSession.close();
- musicSession = null;
- }
- }
-
- /**
- * This function is used to created all the required data structures, both local
- */
- private void initializeMetricTables() throws MDBCServiceException {
- createKeyspace();
- try {
+ /**
+ * This function is used to created all the required data structures, both local
+ */
+ private void initializeMetricTables() throws MDBCServiceException {
+ createKeyspace();
+ try {
createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number
- createMusicRangeInformationTable();
- }
- catch(MDBCServiceException e){
+ createMusicRangeInformationTable();
+ }
+ catch(MDBCServiceException e){
logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC");
}
- }
-
- /**
- * This method performs all necessary initialization in Music/Cassandra to store the table <i>tableName</i>.
- * @param tableName the table to initialize MUSIC for
- */
- @Override
- public void initializeMusicForTable(TableInfo ti, String tableName) {
- /**
- * This code creates two tables for every table in SQL:
- * (i) a table with the exact same name as the SQL table storing the SQL data.
- * (ii) a "dirty bits" table that stores the keys in the Cassandra table that are yet to be
- * updated in the SQL table (they were written by some other node).
- */
- StringBuilder fields = new StringBuilder();
- StringBuilder prikey = new StringBuilder();
- String pfx = "", pfx2 = "";
- for (int i = 0; i < ti.columns.size(); i++) {
- fields.append(pfx)
- .append(ti.columns.get(i))
- .append(" ")
- .append(typemap.get(ti.coltype.get(i)));
- if (ti.iskey.get(i)) {
- // Primary key column
- prikey.append(pfx2).append(ti.columns.get(i));
- pfx2 = ", ";
- }
- pfx = ", ";
- }
- if (prikey.length()==0) {
- fields.append(pfx).append(MDBC_PRIMARYKEY_NAME)
- .append(" ")
- .append(MDBC_PRIMARYKEY_TYPE);
- prikey.append(MDBC_PRIMARYKEY_NAME);
- }
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", music_ns, tableName, fields.toString(), prikey.toString());
- executeMusicWriteQuery(cql);
- }
-
- // **************************************************
- // Dirty Tables (in MUSIC) methods
- // **************************************************
-
- /**
- * Create a <i>dirty row</i> table for the real table <i>tableName</i>. The primary keys columns from the real table are recreated in
- * the dirty table, along with a "REPLICA__" column that names the replica that should update it's internal state from MUSIC.
- * @param tableName the table to create a "dirty" table for
- */
- @Override
- public void createDirtyRowTable(TableInfo ti, String tableName) {
- // create dirtybitsTable at all replicas
+ }
+
+ /**
+ * This method performs all necessary initialization in Music/Cassandra to store the table <i>tableName</i>.
+ * @param tableName the table to initialize MUSIC for
+ */
+ @Override
+ public void initializeMusicForTable(TableInfo ti, String tableName) {
+ /**
+ * This code creates two tables for every table in SQL:
+ * (i) a table with the exact same name as the SQL table storing the SQL data.
+ * (ii) a "dirty bits" table that stores the keys in the Cassandra table that are yet to be
+ * updated in the SQL table (they were written by some other node).
+ */
+ StringBuilder fields = new StringBuilder();
+ StringBuilder prikey = new StringBuilder();
+ String pfx = "", pfx2 = "";
+ for (int i = 0; i < ti.columns.size(); i++) {
+ fields.append(pfx)
+ .append(ti.columns.get(i))
+ .append(" ")
+ .append(typemap.get(ti.coltype.get(i)));
+ if (ti.iskey.get(i)) {
+ // Primary key column
+ prikey.append(pfx2).append(ti.columns.get(i));
+ pfx2 = ", ";
+ }
+ pfx = ", ";
+ }
+ if (prikey.length()==0) {
+ fields.append(pfx).append(MDBC_PRIMARYKEY_NAME)
+ .append(" ")
+ .append(MDBC_PRIMARYKEY_TYPE);
+ prikey.append(MDBC_PRIMARYKEY_NAME);
+ }
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", music_ns, tableName, fields.toString(), prikey.toString());
+ executeMusicWriteQuery(cql);
+ }
+
+ // **************************************************
+ // Dirty Tables (in MUSIC) methods
+ // **************************************************
+
+ /**
+ * Create a <i>dirty row</i> table for the real table <i>tableName</i>. The primary keys columns from the real table are recreated in
+ * the dirty table, along with a "REPLICA__" column that names the replica that should update it's internal state from MUSIC.
+ * @param tableName the table to create a "dirty" table for
+ */
+ @Override
+ public void createDirtyRowTable(TableInfo ti, String tableName) {
+ // create dirtybitsTable at all replicas
// for (String repl : allReplicaIds) {
//// String dirtyRowsTableName = "dirty_"+tableName+"_"+allReplicaIds[i];
//// String dirtyTableQuery = "CREATE TABLE IF NOT EXISTS "+music_ns+"."+ dirtyRowsTableName+" (dirtyRowKeys text PRIMARY KEY);";
// cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s_%s (dirtyRowKeys TEXT PRIMARY KEY);", music_ns, tableName, repl);
// executeMusicWriteQuery(cql);
// }
- StringBuilder ddl = new StringBuilder("REPLICA__ TEXT");
- StringBuilder cols = new StringBuilder("REPLICA__");
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- // Only use the primary keys columns in the "Dirty" table
- ddl.append(", ")
- .append(ti.columns.get(i))
- .append(" ")
- .append(typemap.get(ti.coltype.get(i)));
- cols.append(", ").append(ti.columns.get(i));
- }
- }
- if(cols.length()==0) {
- //fixme
- System.err.println("Create dirty row table found no primary key");
- }
- ddl.append(", PRIMARY KEY(").append(cols).append(")");
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s (%s);", music_ns, tableName, ddl.toString());
- executeMusicWriteQuery(cql);
- }
- /**
- * Drop the dirty row table for <i>tableName</i> from MUSIC.
- * @param tableName the table being dropped
- */
- @Override
- public void dropDirtyRowTable(String tableName) {
- String cql = String.format("DROP TABLE %s.DIRTY_%s;", music_ns, tableName);
- executeMusicWriteQuery(cql);
- }
- /**
- * Mark rows as "dirty" in the dirty rows table for <i>tableName</i>. Rows are marked for all replicas but
- * this one (this replica already has the up to date data).
- * @param tableName the table we are marking dirty
- * @param keys an ordered list of the values being put into the table. The values that correspond to the tables'
- * primary key are copied into the dirty row table.
- */
- @Override
- public void markDirtyRow(TableInfo ti, String tableName, JSONObject keys) {
- Object[] keyObj = getObjects(ti,tableName, keys);
- StringBuilder cols = new StringBuilder("REPLICA__");
- PreparedQueryObject pQueryObject = null;
- StringBuilder vals = new StringBuilder("?");
- List<Object> vallist = new ArrayList<Object>();
- vallist.add(""); // placeholder for replica
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- cols.append(", ").append(ti.columns.get(i));
- vals.append(", ").append("?");
- vallist.add(keyObj[i]);
- }
- }
- if(cols.length()==0) {
- //FIXME
- System.err.println("markDIrtyRow need to fix primary key");
- }
- String cql = String.format("INSERT INTO %s.DIRTY_%s (%s) VALUES (%s);", music_ns, tableName, cols.toString(), vals.toString());
+ StringBuilder ddl = new StringBuilder("REPLICA__ TEXT");
+ StringBuilder cols = new StringBuilder("REPLICA__");
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ // Only use the primary keys columns in the "Dirty" table
+ ddl.append(", ")
+ .append(ti.columns.get(i))
+ .append(" ")
+ .append(typemap.get(ti.coltype.get(i)));
+ cols.append(", ").append(ti.columns.get(i));
+ }
+ }
+ if(cols.length()==0) {
+ //fixme
+ System.err.println("Create dirty row table found no primary key");
+ }
+ ddl.append(", PRIMARY KEY(").append(cols).append(")");
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s (%s);", music_ns, tableName, ddl.toString());
+ executeMusicWriteQuery(cql);
+ }
+ /**
+ * Drop the dirty row table for <i>tableName</i> from MUSIC.
+ * @param tableName the table being dropped
+ */
+ @Override
+ public void dropDirtyRowTable(String tableName) {
+ String cql = String.format("DROP TABLE %s.DIRTY_%s;", music_ns, tableName);
+ executeMusicWriteQuery(cql);
+ }
+ /**
+ * Mark rows as "dirty" in the dirty rows table for <i>tableName</i>. Rows are marked for all replicas but
+ * this one (this replica already has the up to date data).
+ * @param tableName the table we are marking dirty
+ * @param keys an ordered list of the values being put into the table. The values that correspond to the tables'
+ * primary key are copied into the dirty row table.
+ */
+ @Override
+ public void markDirtyRow(TableInfo ti, String tableName, JSONObject keys) {
+ Object[] keyObj = getObjects(ti,tableName, keys);
+ StringBuilder cols = new StringBuilder("REPLICA__");
+ PreparedQueryObject pQueryObject = null;
+ StringBuilder vals = new StringBuilder("?");
+ List<Object> vallist = new ArrayList<Object>();
+ vallist.add(""); // placeholder for replica
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ cols.append(", ").append(ti.columns.get(i));
+ vals.append(", ").append("?");
+ vallist.add(keyObj[i]);
+ }
+ }
+ if(cols.length()==0) {
+ //FIXME
+ System.err.println("markDIrtyRow need to fix primary key");
+ }
+ String cql = String.format("INSERT INTO %s.DIRTY_%s (%s) VALUES (%s);", music_ns, tableName, cols.toString(), vals.toString());
/*Session sess = getMusicSession();
PreparedStatement ps = getPreparedStatementFromCache(cql);*/
- String primaryKey;
- if(ti.hasKey()) {
- primaryKey = getMusicKeyFromRow(ti,tableName, keys);
- }
- else {
- primaryKey = getMusicKeyFromRowWithoutPrimaryIndexes(ti,tableName, keys);
- }
- System.out.println("markDirtyRow: PK value: "+primaryKey);
-
- Object pkObj = null;
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- pkObj = keyObj[i];
- }
- }
- for (String repl : allReplicaIds) {
- pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- pQueryObject.addValue(tableName);
- pQueryObject.addValue(repl);
- pQueryObject.addValue(pkObj);
- updateMusicDB(tableName, primaryKey, pQueryObject);
- //if (!repl.equals(myId)) {
+ String primaryKey;
+ if(ti.hasKey()) {
+ primaryKey = getMusicKeyFromRow(ti,tableName, keys);
+ }
+ else {
+ primaryKey = getMusicKeyFromRowWithoutPrimaryIndexes(ti,tableName, keys);
+ }
+ System.out.println("markDirtyRow: PK value: "+primaryKey);
+
+ Object pkObj = null;
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ pkObj = keyObj[i];
+ }
+ }
+ for (String repl : allReplicaIds) {
+ pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ pQueryObject.addValue(tableName);
+ pQueryObject.addValue(repl);
+ pQueryObject.addValue(pkObj);
+ updateMusicDB(tableName, primaryKey, pQueryObject);
+ //if (!repl.equals(myId)) {
/*logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
vallist.set(0, repl);
BoundStatement bound = ps.bind(vallist.toArray());
@@ -393,37 +432,37 @@ public class MusicMixin implements MusicInterface {
synchronized (sess) {
sess.execute(bound);
}*/
- //}
-
- }
- }
- /**
- * Remove the entries from the dirty row (for this replica) that correspond to a set of primary keys
- * @param tableName the table we are removing dirty entries from
- * @param keys the primary key values to use in the DELETE. Note: this is *only* the primary keys, not a full table row.
- */
- @Override
- public void cleanDirtyRow(TableInfo ti, String tableName, JSONObject keys) {
- Object[] keysObjects = getObjects(ti,tableName,keys);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- StringBuilder cols = new StringBuilder("REPLICA__=?");
- List<Object> vallist = new ArrayList<Object>();
- vallist.add(myId);
- int n = 0;
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- cols.append(" AND ").append(ti.columns.get(i)).append("=?");
- vallist.add(keysObjects[n++]);
- pQueryObject.addValue(keysObjects[n++]);
- }
- }
- String cql = String.format("DELETE FROM %s.DIRTY_%s WHERE %s;", music_ns, tableName, cols.toString());
- logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
- pQueryObject.appendQueryString(cql);
+ //}
+
+ }
+ }
+ /**
+ * Remove the entries from the dirty row (for this replica) that correspond to a set of primary keys
+ * @param tableName the table we are removing dirty entries from
+ * @param keys the primary key values to use in the DELETE. Note: this is *only* the primary keys, not a full table row.
+ */
+ @Override
+ public void cleanDirtyRow(TableInfo ti, String tableName, JSONObject keys) {
+ Object[] keysObjects = getObjects(ti,tableName,keys);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ StringBuilder cols = new StringBuilder("REPLICA__=?");
+ List<Object> vallist = new ArrayList<Object>();
+ vallist.add(myId);
+ int n = 0;
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ cols.append(" AND ").append(ti.columns.get(i)).append("=?");
+ vallist.add(keysObjects[n++]);
+ pQueryObject.addValue(keysObjects[n++]);
+ }
+ }
+ String cql = String.format("DELETE FROM %s.DIRTY_%s WHERE %s;", music_ns, tableName, cols.toString());
+ logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
+ pQueryObject.appendQueryString(cql);
ReturnType rt = MusicCore.eventualPut(pQueryObject);
- if(rt.getResult().getResult().toLowerCase().equals("failure")) {
- System.out.println("Failure while cleanDirtyRow..."+rt.getMessage());
- }
+ if(rt.getResult().getResult().toLowerCase().equals("failure")) {
+ System.out.println("Failure while cleanDirtyRow..."+rt.getMessage());
+ }
/*Session sess = getMusicSession();
PreparedStatement ps = getPreparedStatementFromCache(cql);
BoundStatement bound = ps.bind(vallist.toArray());
@@ -431,18 +470,18 @@ public class MusicMixin implements MusicInterface {
synchronized (sess) {
sess.execute(bound);
}*/
- }
- /**
- * Get a list of "dirty rows" for a table. The dirty rows returned apply only to this replica,
- * and consist of a Map of primary key column names and values.
- * @param tableName the table we are querying for
- * @return a list of maps; each list item is a map of the primary key names and values for that "dirty row".
- */
- @Override
- public List<Map<String,Object>> getDirtyRows(TableInfo ti, String tableName) {
- String cql = String.format("SELECT * FROM %s.DIRTY_%s WHERE REPLICA__=?;", music_ns, tableName);
- ResultSet results = null;
- logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
+ }
+ /**
+ * Get a list of "dirty rows" for a table. The dirty rows returned apply only to this replica,
+ * and consist of a Map of primary key column names and values.
+ * @param tableName the table we are querying for
+ * @return a list of maps; each list item is a map of the primary key names and values for that "dirty row".
+ */
+ @Override
+ public List<Map<String,Object>> getDirtyRows(TableInfo ti, String tableName) {
+ String cql = String.format("SELECT * FROM %s.DIRTY_%s WHERE REPLICA__=?;", music_ns, tableName);
+ ResultSet results = null;
+ logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
/*Session sess = getMusicSession();
PreparedStatement ps = getPreparedStatementFromCache(cql);
@@ -451,111 +490,111 @@ public class MusicMixin implements MusicInterface {
synchronized (sess) {
results = sess.execute(bound);
}*/
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- try {
- results = MusicCore.get(pQueryObject);
- } catch (MusicServiceException e) {
-
- e.printStackTrace();
- }
-
- ColumnDefinitions cdef = results.getColumnDefinitions();
- List<Map<String,Object>> list = new ArrayList<Map<String,Object>>();
- for (Row row : results) {
- Map<String,Object> objs = new HashMap<String,Object>();
- for (int i = 0; i < cdef.size(); i++) {
- String colname = cdef.getName(i).toUpperCase();
- String coltype = cdef.getType(i).getName().toString().toUpperCase();
- if (!colname.equals("REPLICA__")) {
- switch (coltype) {
- case "BIGINT":
- objs.put(colname, row.getLong(colname));
- break;
- case "BOOLEAN":
- objs.put(colname, row.getBool(colname));
- break;
- case "BLOB":
- objs.put(colname, row.getString(colname));
- break;
- case "DATE":
- objs.put(colname, row.getString(colname));
- break;
- case "DOUBLE":
- objs.put(colname, row.getDouble(colname));
- break;
- case "DECIMAL":
- objs.put(colname, row.getDecimal(colname));
- break;
- case "INT":
- objs.put(colname, row.getInt(colname));
- break;
- case "TIMESTAMP":
- objs.put(colname, row.getTimestamp(colname));
- break;
- case "VARCHAR":
- default:
- objs.put(colname, row.getString(colname));
- break;
- }
- }
- }
- list.add(objs);
- }
- return list;
- }
-
- /**
- * Drops the named table and its dirty row table (for all replicas) from MUSIC. The dirty row table is dropped first.
- * @param tableName This is the table that has been dropped
- */
- @Override
- public void clearMusicForTable(String tableName) {
- dropDirtyRowTable(tableName);
- String cql = String.format("DROP TABLE %s.%s;", music_ns, tableName);
- executeMusicWriteQuery(cql);
- }
- /**
- * This function is called whenever there is a DELETE to a row on a local SQL table, wherein it updates the
- * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. MUSIC propagates
- * it to the other replicas.
- *
- * @param tableName This is the table that has changed.
- * @param oldRow This is a copy of the old row being deleted
- */
- @Override
- public void deleteFromEntityTableInMusic(TableInfo ti, String tableName, JSONObject oldRow) {
- Object[] objects = getObjects(ti,tableName,oldRow);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- if (ti.hasKey()) {
- assert(ti.columns.size() == objects.length);
- } else {
- assert(ti.columns.size()+1 == objects.length);
- }
-
- StringBuilder where = new StringBuilder();
- List<Object> vallist = new ArrayList<Object>();
- String pfx = "";
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- where.append(pfx)
- .append(ti.columns.get(i))
- .append("=?");
- vallist.add(objects[i]);
- pQueryObject.addValue(objects[i]);
- pfx = " AND ";
- }
- }
- if (!ti.hasKey()) {
- where.append(MDBC_PRIMARYKEY_NAME + "=?");
- //\FIXME this is wrong, old row is not going to contain the UUID, this needs to be fixed
- vallist.add(UUID.fromString((String) objects[0]));
- pQueryObject.addValue(UUID.fromString((String) objects[0]));
- }
-
- String cql = String.format("DELETE FROM %s.%s WHERE %s;", music_ns, tableName, where.toString());
- logger.error(EELFLoggerDelegate.errorLogger,"Executing MUSIC write:"+ cql);
- pQueryObject.appendQueryString(cql);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ try {
+ results = MusicCore.get(pQueryObject);
+ } catch (MusicServiceException e) {
+
+ e.printStackTrace();
+ }
+
+ ColumnDefinitions cdef = results.getColumnDefinitions();
+ List<Map<String,Object>> list = new ArrayList<Map<String,Object>>();
+ for (Row row : results) {
+ Map<String,Object> objs = new HashMap<String,Object>();
+ for (int i = 0; i < cdef.size(); i++) {
+ String colname = cdef.getName(i).toUpperCase();
+ String coltype = cdef.getType(i).getName().toString().toUpperCase();
+ if (!colname.equals("REPLICA__")) {
+ switch (coltype) {
+ case "BIGINT":
+ objs.put(colname, row.getLong(colname));
+ break;
+ case "BOOLEAN":
+ objs.put(colname, row.getBool(colname));
+ break;
+ case "BLOB":
+ objs.put(colname, row.getString(colname));
+ break;
+ case "DATE":
+ objs.put(colname, row.getString(colname));
+ break;
+ case "DOUBLE":
+ objs.put(colname, row.getDouble(colname));
+ break;
+ case "DECIMAL":
+ objs.put(colname, row.getDecimal(colname));
+ break;
+ case "INT":
+ objs.put(colname, row.getInt(colname));
+ break;
+ case "TIMESTAMP":
+ objs.put(colname, row.getTimestamp(colname));
+ break;
+ case "VARCHAR":
+ default:
+ objs.put(colname, row.getString(colname));
+ break;
+ }
+ }
+ }
+ list.add(objs);
+ }
+ return list;
+ }
+
+ /**
+ * Drops the named table and its dirty row table (for all replicas) from MUSIC. The dirty row table is dropped first.
+ * @param tableName This is the table that has been dropped
+ */
+ @Override
+ public void clearMusicForTable(String tableName) {
+ dropDirtyRowTable(tableName);
+ String cql = String.format("DROP TABLE %s.%s;", music_ns, tableName);
+ executeMusicWriteQuery(cql);
+ }
+ /**
+ * This function is called whenever there is a DELETE to a row on a local SQL table, wherein it updates the
+ * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. MUSIC propagates
+ * it to the other replicas.
+ *
+ * @param tableName This is the table that has changed.
+ * @param oldRow This is a copy of the old row being deleted
+ */
+ @Override
+ public void deleteFromEntityTableInMusic(TableInfo ti, String tableName, JSONObject oldRow) {
+ Object[] objects = getObjects(ti,tableName,oldRow);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ if (ti.hasKey()) {
+ assert(ti.columns.size() == objects.length);
+ } else {
+ assert(ti.columns.size()+1 == objects.length);
+ }
+
+ StringBuilder where = new StringBuilder();
+ List<Object> vallist = new ArrayList<Object>();
+ String pfx = "";
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ where.append(pfx)
+ .append(ti.columns.get(i))
+ .append("=?");
+ vallist.add(objects[i]);
+ pQueryObject.addValue(objects[i]);
+ pfx = " AND ";
+ }
+ }
+ if (!ti.hasKey()) {
+ where.append(MDBC_PRIMARYKEY_NAME + "=?");
+ //\FIXME this is wrong, old row is not going to contain the UUID, this needs to be fixed
+ vallist.add(UUID.fromString((String) objects[0]));
+ pQueryObject.addValue(UUID.fromString((String) objects[0]));
+ }
+
+ String cql = String.format("DELETE FROM %s.%s WHERE %s;", music_ns, tableName, where.toString());
+ logger.error(EELFLoggerDelegate.errorLogger,"Executing MUSIC write:"+ cql);
+ pQueryObject.appendQueryString(cql);
/*PreparedStatement ps = getPreparedStatementFromCache(cql);
BoundStatement bound = ps.bind(vallist.toArray());
@@ -564,61 +603,68 @@ public class MusicMixin implements MusicInterface {
synchronized (sess) {
sess.execute(bound);
}*/
- String primaryKey = getMusicKeyFromRow(ti,tableName, oldRow);
+ String primaryKey = getMusicKeyFromRow(ti,tableName, oldRow);
updateMusicDB(tableName, primaryKey, pQueryObject);
- // Mark the dirty rows in music for all the replicas but us
- markDirtyRow(ti,tableName, oldRow);
- }
-
- public Set<String> getMusicTableSet(String ns) {
- Set<String> set = new TreeSet<String>();
- String cql = String.format("SELECT TABLE_NAME FROM SYSTEM_SCHEMA.TABLES WHERE KEYSPACE_NAME = '%s'", ns);
- ResultSet rs = executeMusicRead(cql);
- for (Row row : rs) {
- set.add(row.getString("TABLE_NAME").toUpperCase());
- }
- return set;
- }
- /**
- * This method is called whenever there is a SELECT on a local SQL table, wherein it first checks the local
- * dirty bits table to see if there are any keys in Cassandra whose value has not yet been sent to SQL
- * @param tableName This is the table on which the select is being performed
- */
- @Override
- public void readDirtyRowsAndUpdateDb(DBInterface dbi, String tableName) {
- // Read dirty rows of this table from Music
- TableInfo ti = dbi.getTableInfo(tableName);
- List<Map<String,Object>> objlist = getDirtyRows(ti,tableName);
- PreparedQueryObject pQueryObject = null;
- String pre_cql = String.format("SELECT * FROM %s.%s WHERE ", music_ns, tableName);
- List<Object> vallist = new ArrayList<Object>();
- StringBuilder sb = new StringBuilder();
- //\TODO Perform a batch operation instead of each row at a time
- for (Map<String,Object> map : objlist) {
- pQueryObject = new PreparedQueryObject();
- sb.setLength(0);
- vallist.clear();
- String pfx = "";
- for (String key : map.keySet()) {
- sb.append(pfx).append(key).append("=?");
- vallist.add(map.get(key));
- pQueryObject.addValue(map.get(key));
- pfx = " AND ";
- }
-
- String cql = pre_cql + sb.toString();
- System.out.println("readDirtyRowsAndUpdateDb: cql: "+cql);
- pQueryObject.appendQueryString(cql);
- ResultSet dirtyRows = null;
- try {
- //\TODO Why is this an eventual put?, this should be an atomic
- dirtyRows = MusicCore.get(pQueryObject);
- } catch (MusicServiceException e) {
-
- e.printStackTrace();
- }
+ // Mark the dirty rows in music for all the replicas but us
+ markDirtyRow(ti,tableName, oldRow);
+ }
+
+ public Set<String> getMusicTableSet(String ns) {
+ Set<String> set = new TreeSet<String>();
+ String cql = String.format("SELECT TABLE_NAME FROM SYSTEM_SCHEMA.TABLES WHERE KEYSPACE_NAME = '%s'", ns);
+ ResultSet rs = null;
+ try {
+ rs = executeMusicRead(cql);
+ } catch (MDBCServiceException e) {
+ e.printStackTrace();
+ }
+ if(rs!=null) {
+ for (Row row : rs) {
+ set.add(row.getString("TABLE_NAME").toUpperCase());
+ }
+ }
+ return set;
+ }
+ /**
+ * This method is called whenever there is a SELECT on a local SQL table, wherein it first checks the local
+ * dirty bits table to see if there are any keys in Cassandra whose value has not yet been sent to SQL
+ * @param tableName This is the table on which the select is being performed
+ */
+ @Override
+ public void readDirtyRowsAndUpdateDb(DBInterface dbi, String tableName) {
+ // Read dirty rows of this table from Music
+ TableInfo ti = dbi.getTableInfo(tableName);
+ List<Map<String,Object>> objlist = getDirtyRows(ti,tableName);
+ PreparedQueryObject pQueryObject = null;
+ String pre_cql = String.format("SELECT * FROM %s.%s WHERE ", music_ns, tableName);
+ List<Object> vallist = new ArrayList<Object>();
+ StringBuilder sb = new StringBuilder();
+ //\TODO Perform a batch operation instead of each row at a time
+ for (Map<String,Object> map : objlist) {
+ pQueryObject = new PreparedQueryObject();
+ sb.setLength(0);
+ vallist.clear();
+ String pfx = "";
+ for (String key : map.keySet()) {
+ sb.append(pfx).append(key).append("=?");
+ vallist.add(map.get(key));
+ pQueryObject.addValue(map.get(key));
+ pfx = " AND ";
+ }
+
+ String cql = pre_cql + sb.toString();
+ System.out.println("readDirtyRowsAndUpdateDb: cql: "+cql);
+ pQueryObject.appendQueryString(cql);
+ ResultSet dirtyRows = null;
+ try {
+ //\TODO Why is this an eventual put?, this should be an atomic
+ dirtyRows = MusicCore.get(pQueryObject);
+ } catch (MusicServiceException e) {
+
+ e.printStackTrace();
+ }
/*
Session sess = getMusicSession();
PreparedStatement ps = getPreparedStatementFromCache(cql);
@@ -628,58 +674,58 @@ public class MusicMixin implements MusicInterface {
synchronized (sess) {
dirtyRows = sess.execute(bound);
}*/
- List<Row> rows = dirtyRows.all();
- if (rows.isEmpty()) {
- // No rows, the row must have been deleted
- deleteRowFromSqlDb(dbi,tableName, map);
- } else {
- for (Row row : rows) {
- writeMusicRowToSQLDb(dbi,tableName, row);
- }
- }
- }
- }
-
- private void deleteRowFromSqlDb(DBInterface dbi, String tableName, Map<String, Object> map) {
- dbi.deleteRowFromSqlDb(tableName, map);
- TableInfo ti = dbi.getTableInfo(tableName);
- List<Object> vallist = new ArrayList<Object>();
- for (int i = 0; i < ti.columns.size(); i++) {
- if (ti.iskey.get(i)) {
- String col = ti.columns.get(i);
- Object val = map.get(col);
- vallist.add(val);
- }
- }
- cleanDirtyRow(ti, tableName, new JSONObject(vallist));
- }
- /**
- * This functions copies the contents of a row in Music into the corresponding row in the SQL table
- * @param tableName This is the name of the table in both Music and swl
- * @param musicRow This is the row in Music that is being copied into SQL
- */
- private void writeMusicRowToSQLDb(DBInterface dbi, String tableName, Row musicRow) {
- // First construct the map of columns and their values
- TableInfo ti = dbi.getTableInfo(tableName);
- Map<String, Object> map = new HashMap<String, Object>();
- List<Object> vallist = new ArrayList<Object>();
- String rowid = tableName;
- for (String col : ti.columns) {
- Object val = getValue(musicRow, col);
- map.put(col, val);
- if (ti.iskey(col)) {
- vallist.add(val);
- rowid += "_" + val.toString();
- }
- }
-
- logger.debug("Blocking rowid: "+rowid);
- in_progress.add(rowid); // Block propagation of the following INSERT/UPDATE
-
- dbi.insertRowIntoSqlDb(tableName, map);
-
- logger.debug("Unblocking rowid: "+rowid);
- in_progress.remove(rowid); // Unblock propagation
+ List<Row> rows = dirtyRows.all();
+ if (rows.isEmpty()) {
+ // No rows, the row must have been deleted
+ deleteRowFromSqlDb(dbi,tableName, map);
+ } else {
+ for (Row row : rows) {
+ writeMusicRowToSQLDb(dbi,tableName, row);
+ }
+ }
+ }
+ }
+
+ private void deleteRowFromSqlDb(DBInterface dbi, String tableName, Map<String, Object> map) {
+ dbi.deleteRowFromSqlDb(tableName, map);
+ TableInfo ti = dbi.getTableInfo(tableName);
+ List<Object> vallist = new ArrayList<Object>();
+ for (int i = 0; i < ti.columns.size(); i++) {
+ if (ti.iskey.get(i)) {
+ String col = ti.columns.get(i);
+ Object val = map.get(col);
+ vallist.add(val);
+ }
+ }
+ cleanDirtyRow(ti, tableName, new JSONObject(vallist));
+ }
+ /**
+ * This functions copies the contents of a row in Music into the corresponding row in the SQL table
+ * @param tableName This is the name of the table in both Music and swl
+ * @param musicRow This is the row in Music that is being copied into SQL
+ */
+ private void writeMusicRowToSQLDb(DBInterface dbi, String tableName, Row musicRow) {
+ // First construct the map of columns and their values
+ TableInfo ti = dbi.getTableInfo(tableName);
+ Map<String, Object> map = new HashMap<String, Object>();
+ List<Object> vallist = new ArrayList<Object>();
+ String rowid = tableName;
+ for (String col : ti.columns) {
+ Object val = getValue(musicRow, col);
+ map.put(col, val);
+ if (ti.iskey(col)) {
+ vallist.add(val);
+ rowid += "_" + val.toString();
+ }
+ }
+
+ logger.debug("Blocking rowid: "+rowid);
+ in_progress.add(rowid); // Block propagation of the following INSERT/UPDATE
+
+ dbi.insertRowIntoSqlDb(tableName, map);
+
+ logger.debug("Unblocking rowid: "+rowid);
+ in_progress.remove(rowid); // Unblock propagation
// try {
// String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString());
@@ -695,8 +741,8 @@ public class MusicMixin implements MusicInterface {
// }
// }
- ti = dbi.getTableInfo(tableName);
- cleanDirtyRow(ti, tableName, new JSONObject(vallist));
+ ti = dbi.getTableInfo(tableName);
+ cleanDirtyRow(ti, tableName, new JSONObject(vallist));
// String selectQuery = "select "+ primaryKeyName+" FROM "+tableName+" WHERE "+primaryKeyName+"="+primaryKeyValue+";";
// java.sql.ResultSet rs = executeSQLRead(selectQuery);
@@ -712,111 +758,111 @@ public class MusicMixin implements MusicInterface {
// e.printStackTrace();
// }
- //clean the music dirty bits table
+ //clean the music dirty bits table
// String dirtyRowIdsTableName = music_ns+".DIRTY_"+tableName+"_"+myId;
// String deleteQuery = "DELETE FROM "+dirtyRowIdsTableName+" WHERE dirtyRowKeys=$$"+primaryKeyValue+"$$;";
// executeMusicWriteQuery(deleteQuery);
- }
- private Object getValue(Row musicRow, String colname) {
- ColumnDefinitions cdef = musicRow.getColumnDefinitions();
- DataType colType;
- try {
- colType= cdef.getType(colname);
- }
- catch(IllegalArgumentException e) {
- logger.warn("Colname is not part of table metadata: "+e);
- throw e;
- }
- String typeStr = colType.getName().toString().toUpperCase();
- switch (typeStr) {
- case "BIGINT":
- return musicRow.getLong(colname);
- case "BOOLEAN":
- return musicRow.getBool(colname);
- case "BLOB":
- return musicRow.getString(colname);
- case "DATE":
- return musicRow.getString(colname);
- case "DECIMAL":
- return musicRow.getDecimal(colname);
- case "DOUBLE":
- return musicRow.getDouble(colname);
- case "SMALLINT":
- case "INT":
- return musicRow.getInt(colname);
- case "TIMESTAMP":
- return musicRow.getTimestamp(colname);
- case "UUID":
- return musicRow.getUUID(colname);
- default:
- logger.error(EELFLoggerDelegate.errorLogger, "UNEXPECTED COLUMN TYPE: columname="+colname+", columntype="+typeStr);
- // fall thru
- case "VARCHAR":
- return musicRow.getString(colname);
- }
- }
-
- /**
- * This method is called whenever there is an INSERT or UPDATE to a local SQL table, wherein it updates the
- * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. Music propagates
- * it to the other replicas.
- *
- * @param tableName This is the table that has changed.
- * @param changedRow This is information about the row that has changed
- */
- @Override
- public void updateDirtyRowAndEntityTableInMusic(TableInfo ti, String tableName, JSONObject changedRow) {
- // Build the CQL command
- Object[] objects = getObjects(ti,tableName,changedRow);
- StringBuilder fields = new StringBuilder();
- StringBuilder values = new StringBuilder();
- String rowid = tableName;
- Object[] newrow = new Object[objects.length];
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- String pfx = "";
- int keyoffset=0;
- for (int i = 0; i < objects.length; i++) {
- if (!ti.hasKey() && i==0) {
- //We need to tack on cassandra's uid in place of a primary key
- fields.append(MDBC_PRIMARYKEY_NAME);
- values.append("?");
- newrow[i] = UUID.fromString((String) objects[i]);
- pQueryObject.addValue(newrow[i]);
- keyoffset=-1;
- pfx = ", ";
- continue;
- }
- fields.append(pfx).append(ti.columns.get(i+keyoffset));
- values.append(pfx).append("?");
- pfx = ", ";
- if (objects[i] instanceof byte[]) {
- // Cassandra doesn't seem to have a Codec to translate a byte[] to a ByteBuffer
- newrow[i] = ByteBuffer.wrap((byte[]) objects[i]);
- pQueryObject.addValue(newrow[i]);
- } else if (objects[i] instanceof Reader) {
- // Cassandra doesn't seem to have a Codec to translate a Reader to a ByteBuffer either...
- newrow[i] = ByteBuffer.wrap(readBytesFromReader((Reader) objects[i]));
- pQueryObject.addValue(newrow[i]);
- } else {
- newrow[i] = objects[i];
- pQueryObject.addValue(newrow[i]);
- }
- if (i+keyoffset>=0 && ti.iskey.get(i+keyoffset)) {
- rowid += "_" + newrow[i].toString();
- }
- }
-
- if (in_progress.contains(rowid)) {
- // This call to updateDirtyRowAndEntityTableInMusic() was called as a result of a Cassandra -> H2 update; ignore
- logger.debug(EELFLoggerDelegate.applicationLogger, "updateDirtyRowAndEntityTableInMusic: bypassing MUSIC update on "+rowid);
-
- } else {
- // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update
- String cql = String.format("INSERT INTO %s.%s (%s) VALUES (%s);", music_ns, tableName, fields.toString(), values.toString());
-
- pQueryObject.appendQueryString(cql);
- String primaryKey = getMusicKeyFromRow(ti,tableName, changedRow);
- updateMusicDB(tableName, primaryKey, pQueryObject);
+ }
+ private Object getValue(Row musicRow, String colname) {
+ ColumnDefinitions cdef = musicRow.getColumnDefinitions();
+ DataType colType;
+ try {
+ colType= cdef.getType(colname);
+ }
+ catch(IllegalArgumentException e) {
+ logger.warn("Colname is not part of table metadata: "+e);
+ throw e;
+ }
+ String typeStr = colType.getName().toString().toUpperCase();
+ switch (typeStr) {
+ case "BIGINT":
+ return musicRow.getLong(colname);
+ case "BOOLEAN":
+ return musicRow.getBool(colname);
+ case "BLOB":
+ return musicRow.getString(colname);
+ case "DATE":
+ return musicRow.getString(colname);
+ case "DECIMAL":
+ return musicRow.getDecimal(colname);
+ case "DOUBLE":
+ return musicRow.getDouble(colname);
+ case "SMALLINT":
+ case "INT":
+ return musicRow.getInt(colname);
+ case "TIMESTAMP":
+ return musicRow.getTimestamp(colname);
+ case "UUID":
+ return musicRow.getUUID(colname);
+ default:
+ logger.error(EELFLoggerDelegate.errorLogger, "UNEXPECTED COLUMN TYPE: columname="+colname+", columntype="+typeStr);
+ // fall thru
+ case "VARCHAR":
+ return musicRow.getString(colname);
+ }
+ }
+
+ /**
+ * This method is called whenever there is an INSERT or UPDATE to a local SQL table, wherein it updates the
+ * MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write. Music propagates
+ * it to the other replicas.
+ *
+ * @param tableName This is the table that has changed.
+ * @param changedRow This is information about the row that has changed
+ */
+ @Override
+ public void updateDirtyRowAndEntityTableInMusic(TableInfo ti, String tableName, JSONObject changedRow) {
+ // Build the CQL command
+ Object[] objects = getObjects(ti,tableName,changedRow);
+ StringBuilder fields = new StringBuilder();
+ StringBuilder values = new StringBuilder();
+ String rowid = tableName;
+ Object[] newrow = new Object[objects.length];
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ String pfx = "";
+ int keyoffset=0;
+ for (int i = 0; i < objects.length; i++) {
+ if (!ti.hasKey() && i==0) {
+ //We need to tack on cassandra's uid in place of a primary key
+ fields.append(MDBC_PRIMARYKEY_NAME);
+ values.append("?");
+ newrow[i] = UUID.fromString((String) objects[i]);
+ pQueryObject.addValue(newrow[i]);
+ keyoffset=-1;
+ pfx = ", ";
+ continue;
+ }
+ fields.append(pfx).append(ti.columns.get(i+keyoffset));
+ values.append(pfx).append("?");
+ pfx = ", ";
+ if (objects[i] instanceof byte[]) {
+ // Cassandra doesn't seem to have a Codec to translate a byte[] to a ByteBuffer
+ newrow[i] = ByteBuffer.wrap((byte[]) objects[i]);
+ pQueryObject.addValue(newrow[i]);
+ } else if (objects[i] instanceof Reader) {
+ // Cassandra doesn't seem to have a Codec to translate a Reader to a ByteBuffer either...
+ newrow[i] = ByteBuffer.wrap(readBytesFromReader((Reader) objects[i]));
+ pQueryObject.addValue(newrow[i]);
+ } else {
+ newrow[i] = objects[i];
+ pQueryObject.addValue(newrow[i]);
+ }
+ if (i+keyoffset>=0 && ti.iskey.get(i+keyoffset)) {
+ rowid += "_" + newrow[i].toString();
+ }
+ }
+
+ if (in_progress.contains(rowid)) {
+ // This call to updateDirtyRowAndEntityTableInMusic() was called as a result of a Cassandra -> H2 update; ignore
+ logger.debug(EELFLoggerDelegate.applicationLogger, "updateDirtyRowAndEntityTableInMusic: bypassing MUSIC update on "+rowid);
+
+ } else {
+ // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update
+ String cql = String.format("INSERT INTO %s.%s (%s) VALUES (%s);", music_ns, tableName, fields.toString(), values.toString());
+
+ pQueryObject.appendQueryString(cql);
+ String primaryKey = getMusicKeyFromRow(ti,tableName, changedRow);
+ updateMusicDB(tableName, primaryKey, pQueryObject);
/*PreparedStatement ps = getPreparedStatementFromCache(cql);
BoundStatement bound = ps.bind(newrow);
@@ -825,208 +871,207 @@ public class MusicMixin implements MusicInterface {
synchronized (sess) {
sess.execute(bound);
}*/
- // Mark the dirty rows in music for all the replicas but us
- markDirtyRow(ti,tableName, changedRow);
- }
- }
-
-
-
- private byte[] readBytesFromReader(Reader rdr) {
- StringBuilder sb = new StringBuilder();
- try {
- int ch;
- while ((ch = rdr.read()) >= 0) {
- sb.append((char)ch);
- }
- } catch (IOException e) {
- logger.warn("readBytesFromReader: "+e);
- }
- return sb.toString().getBytes();
- }
-
- protected PreparedStatement getPreparedStatementFromCache(String cql) {
- // Note: have to hope that the Session never changes!
- if (!ps_cache.containsKey(cql)) {
- Session sess = getMusicSession();
- PreparedStatement ps = sess.prepare(cql);
- ps_cache.put(cql, ps);
- }
- return ps_cache.get(cql);
- }
-
- /**
- * This method gets a connection to Music
- * @return the Cassandra Session to use
- */
- protected Session getMusicSession() {
- // create cassandra session
- if (musicSession == null) {
- logger.info(EELFLoggerDelegate.applicationLogger, "Creating New Music Session");
- mCon = new MusicConnector(musicAddress);
- musicSession = mCon.getSession();
- }
- return musicSession;
- }
-
- /**
- * This method executes a write query in Music
- * @param cql the CQL to be sent to Cassandra
- */
- protected void executeMusicWriteQuery(String cql) {
- logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- ReturnType rt = MusicCore.eventualPut(pQueryObject);
- if(rt.getResult().getResult().toLowerCase().equals("failure")) {
- logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage());
- }
+ // Mark the dirty rows in music for all the replicas but us
+ markDirtyRow(ti,tableName, changedRow);
+ }
+ }
+
+
+
+ private byte[] readBytesFromReader(Reader rdr) {
+ StringBuilder sb = new StringBuilder();
+ try {
+ int ch;
+ while ((ch = rdr.read()) >= 0) {
+ sb.append((char)ch);
+ }
+ } catch (IOException e) {
+ logger.warn("readBytesFromReader: "+e);
+ }
+ return sb.toString().getBytes();
+ }
+
+ protected PreparedStatement getPreparedStatementFromCache(String cql) {
+ // Note: have to hope that the Session never changes!
+ if (!ps_cache.containsKey(cql)) {
+ Session sess = getMusicSession();
+ PreparedStatement ps = sess.prepare(cql);
+ ps_cache.put(cql, ps);
+ }
+ return ps_cache.get(cql);
+ }
+
+ /**
+ * This method gets a connection to Music
+ * @return the Cassandra Session to use
+ */
+ protected Session getMusicSession() {
+ // create cassandra session
+ if (musicSession == null) {
+ logger.info(EELFLoggerDelegate.applicationLogger, "Creating New Music Session");
+ mCon = new MusicConnector(musicAddress);
+ musicSession = mCon.getSession();
+ }
+ return musicSession;
+ }
+
+ /**
+ * This method executes a write query in Music
+ * @param cql the CQL to be sent to Cassandra
+ */
+ protected void executeMusicWriteQuery(String cql) {
+ logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ ReturnType rt = MusicCore.eventualPut(pQueryObject);
+ if(rt.getResult().getResult().toLowerCase().equals("failure")) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage());
+ }
/*Session sess = getMusicSession();
SimpleStatement s = new SimpleStatement(cql);
s.setReadTimeoutMillis(60000);
synchronized (sess) {
sess.execute(s);
}*/
- }
-
- /**
- * This method executes a read query in Music
- * @param cql the CQL to be sent to Cassandra
- * @return a ResultSet containing the rows returned from the query
- */
- protected ResultSet executeMusicRead(String cql) {
- logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- ResultSet results = null;
- try {
- results = MusicCore.get(pQueryObject);
- } catch (MusicServiceException e) {
-
- e.printStackTrace();
- }
- return results;
- /*Session sess = getMusicSession();
- synchronized (sess) {
- return sess.execute(cql);
- }*/
- }
-
- /**
- * Returns the default primary key name that this mixin uses
- */
- public String getMusicDefaultPrimaryKeyName() {
- return MDBC_PRIMARYKEY_NAME;
- }
-
- /**
- * Return the function for cassandra's primary key generation
- */
- public String generateUniqueKey() {
- return MDBCUtils.generateUniqueKey().toString();
- }
-
- @Override
- public String getMusicKeyFromRowWithoutPrimaryIndexes(TableInfo ti, String table, JSONObject dbRow) {
- //\TODO this operation is super expensive to perform, both latency and BW
- // it is better to add additional where clauses, and have the primary key
- // to be composed of known columns of the table
- // Adding this primary indexes would be an additional burden to the developers, which spanner
- // also does, but otherwise performance is really bad
- // At least it should have a set of columns that are guaranteed to be unique
- StringBuilder cqlOperation = new StringBuilder();
- cqlOperation.append("SELECT * FROM ")
- .append(music_ns)
- .append(".")
- .append(table);
- ResultSet musicResults = executeMusicRead(cqlOperation.toString());
- Object[] dbRowObjects = getObjects(ti,table,dbRow);
- while (!musicResults.isExhausted()) {
- Row musicRow = musicResults.one();
- if (rowIs(ti, musicRow, dbRowObjects)) {
- return ((UUID)getValue(musicRow, MDBC_PRIMARYKEY_NAME)).toString();
- }
- }
- //should never reach here
- return null;
- }
-
- /**
- * Checks to see if this row is in list of database entries
- * @param ti
- * @param musicRow
- * @param dbRow
- * @return
- */
- private boolean rowIs(TableInfo ti, Row musicRow, Object[] dbRow) {
- //System.out.println("Comparing " + musicRow.toString());
- boolean sameRow=true;
- for (int i=0; i<ti.columns.size(); i++) {
- Object val = getValue(musicRow, ti.columns.get(i));
- if (!dbRow[i].equals(val)) {
- sameRow=false;
- break;
- }
- }
- return sameRow;
- }
-
- @Override
- public String getMusicKeyFromRow(TableInfo ti, String tableName, JSONObject row) {
- List<String> keyCols = ti.getKeyColumns();
- if(keyCols.isEmpty()){
- throw new IllegalArgumentException("Table doesn't have defined primary indexes ");
- }
- StringBuilder key = new StringBuilder();
- String pfx = "";
- for(String keyCol: keyCols) {
- key.append(pfx);
- key.append(row.get(keyCol));
- pfx = ",";
- }
- String keyStr = key.toString();
- return keyStr;
- }
-
- public void updateMusicDB(String tableName, String primaryKey, PreparedQueryObject pQObject) {
- ReturnType rt = MusicCore.eventualPut(pQObject);
- if(rt.getResult().getResult().toLowerCase().equals("failure")) {
- System.out.println("Failure while critical put..."+rt.getMessage());
- }
- }
-
- /**
- * Build a preparedQueryObject that appends a transaction to the mriTable
- * @param mriTable
- * @param uuid
- * @param table
- * @param redoUuid
- * @return
- */
- private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, String table, UUID redoUuid){
+ }
+
+ /**
+ * This method executes a read query in Music
+ * @param cql the CQL to be sent to Cassandra
+ * @return a ResultSet containing the rows returned from the query
+ */
+ protected ResultSet executeMusicRead(String cql) throws MDBCServiceException {
+ logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ ResultSet results = null;
+ try {
+ results = MusicCore.get(pQueryObject);
+ } catch (MusicServiceException e) {
+ logger.error("Error executing music get operation for query: ["+cql+"]");
+ throw new MDBCServiceException("Error executing get: "+e.getMessage());
+ }
+ return results;
+ }
+
+ /**
+ * Returns the default primary key name that this mixin uses
+ */
+ public String getMusicDefaultPrimaryKeyName() {
+ return MDBC_PRIMARYKEY_NAME;
+ }
+
+ /**
+ * Return the function for cassandra's primary key generation
+ */
+ @Override
+ public UUID generateUniqueKey() {
+ return MDBCUtils.generateUniqueKey();
+ }
+
+ @Override
+ public String getMusicKeyFromRowWithoutPrimaryIndexes(TableInfo ti, String table, JSONObject dbRow) {
+ //\TODO this operation is super expensive to perform, both latency and BW
+ // it is better to add additional where clauses, and have the primary key
+ // to be composed of known columns of the table
+ // Adding this primary indexes would be an additional burden to the developers, which spanner
+ // also does, but otherwise performance is really bad
+ // At least it should have a set of columns that are guaranteed to be unique
+ StringBuilder cqlOperation = new StringBuilder();
+ cqlOperation.append("SELECT * FROM ")
+ .append(music_ns)
+ .append(".")
+ .append(table);
+ ResultSet musicResults = null;
+ try {
+ musicResults = executeMusicRead(cqlOperation.toString());
+ } catch (MDBCServiceException e) {
+ return null;
+ }
+ Object[] dbRowObjects = getObjects(ti,table,dbRow);
+ while (!musicResults.isExhausted()) {
+ Row musicRow = musicResults.one();
+ if (rowIs(ti, musicRow, dbRowObjects)) {
+ return ((UUID)getValue(musicRow, MDBC_PRIMARYKEY_NAME)).toString();
+ }
+ }
+ //should never reach here
+ return null;
+ }
+
+ /**
+ * Checks to see if this row is in list of database entries
+ * @param ti
+ * @param musicRow
+ * @param dbRow
+ * @return
+ */
+ private boolean rowIs(TableInfo ti, Row musicRow, Object[] dbRow) {
+ //System.out.println("Comparing " + musicRow.toString());
+ boolean sameRow=true;
+ for (int i=0; i<ti.columns.size(); i++) {
+ Object val = getValue(musicRow, ti.columns.get(i));
+ if (!dbRow[i].equals(val)) {
+ sameRow=false;
+ break;
+ }
+ }
+ return sameRow;
+ }
+
+ @Override
+ public String getMusicKeyFromRow(TableInfo ti, String tableName, JSONObject row) {
+ List<String> keyCols = ti.getKeyColumns();
+ if(keyCols.isEmpty()){
+ throw new IllegalArgumentException("Table doesn't have defined primary indexes ");
+ }
+ StringBuilder key = new StringBuilder();
+ String pfx = "";
+ for(String keyCol: keyCols) {
+ key.append(pfx);
+ key.append(row.get(keyCol));
+ pfx = ",";
+ }
+ String keyStr = key.toString();
+ return keyStr;
+ }
+
+ public void updateMusicDB(String tableName, String primaryKey, PreparedQueryObject pQObject) {
+ ReturnType rt = MusicCore.eventualPut(pQObject);
+ if(rt.getResult().getResult().toLowerCase().equals("failure")) {
+ System.out.println("Failure while critical put..."+rt.getMessage());
+ }
+ }
+
+ /**
+ * Build a preparedQueryObject that appends a transaction to the mriTable
+ * @param mriTable
+ * @param uuid
+ * @param table
+ * @param redoUuid
+ * @return
+ */
+ private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, String table, UUID redoUuid){
PreparedQueryObject query = new PreparedQueryObject();
StringBuilder appendBuilder = new StringBuilder();
appendBuilder.append("UPDATE ")
- .append(music_ns)
- .append(".")
- .append(mriTable)
- .append(" SET txredolog = txredolog +[('")
- .append(table)
- .append("',")
- .append(redoUuid)
- .append(")] WHERE rangeid = ")
- .append(uuid)
- .append(";");
+ .append(music_ns)
+ .append(".")
+ .append(mriTable)
+ .append(" SET txredolog = txredolog +[('")
+ .append(table)
+ .append("',")
+ .append(redoUuid)
+ .append(")] WHERE rangeid = ")
+ .append(uuid)
+ .append(";");
query.appendQueryString(appendBuilder.toString());
return query;
}
- protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException {
- UUID mriIndex = partition.getMusicRangeInformationIndex();
- String lockId;
- lockId = MusicCore.createLockReference(fullyQualifiedKey);
- //\TODO Handle better failures to acquire locks
+ protected ReturnType acquireLock(String fullyQualifiedKey, String lockId) throws MDBCServiceException{
ReturnType lockReturn;
+ //\TODO Handle better failures to acquire locks
try {
lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId);
} catch (MusicLockingException e) {
@@ -1039,40 +1084,46 @@ public class MusicMixin implements MusicInterface {
logger.error(EELFLoggerDelegate.errorLogger, "Error in executing query music, when locking key: "+fullyQualifiedKey);
throw new MDBCServiceException("Error in executing query music, when locking: "+fullyQualifiedKey);
}
- //\TODO this is wrong, we should have a better way to obtain a lock forcefully, clean the queue and obtain the lock
+ return lockReturn;
+ }
+
+ protected List<LockResult> waitForLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException {
+ List<LockResult> result = new ArrayList<>();
+ String lockId;
+ lockId = MusicCore.createLockReference(fullyQualifiedKey);
+ ReturnType lockReturn = acquireLock(fullyQualifiedKey,lockId);
if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) {
- try {
- MusicCore.forciblyReleaseLock(fullyQualifiedKey,lockId);
- CassaLockStore lockingServiceHandle = MusicCore.getLockingServiceHandle();
- CassaLockStore.LockObject lockOwner = lockingServiceHandle.peekLockQueue(music_ns,
- this.musicRangeInformationTableName, mriIndex.toString());
- while(lockOwner.lockRef != lockId) {
- MusicCore.forciblyReleaseLock(fullyQualifiedKey, lockOwner.lockRef);
- try {
- lockOwner = lockingServiceHandle.peekLockQueue(music_ns,
- this.musicRangeInformationTableName, mriIndex.toString());
- } catch(NullPointerException e){
- //Ignore null pointer exception
- lockId = MusicCore.createLockReference(fullyQualifiedKey);
- break;
- }
+ //\TODO Improve the exponential backoff
+ int n = 1;
+ int low = 1;
+ int high = 1000;
+ Random r = new Random();
+ while(MusicCore.whoseTurnIsIt(fullyQualifiedKey)!=lockId){
+ try {
+ Thread.sleep(((int) Math.round(Math.pow(2, n)) * 1000)
+ + (r.nextInt(high - low) + low));
+ } catch (InterruptedException e) {
+ continue;
+ }
+ if(n++==20){
+ throw new MDBCServiceException("Lock was impossible to obtain, waited for 20 exponential backoffs!") ;
}
- lockReturn = MusicCore.acquireLock(fullyQualifiedKey,lockId);
-
- } catch (MusicLockingException e) {
- throw new MDBCServiceException("Could not lock the corresponding lock");
- } catch (MusicServiceException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Error in music, when locking key: "+fullyQualifiedKey);
- throw new MDBCServiceException("Error in music, when locking: "+fullyQualifiedKey);
- } catch (MusicQueryException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Error in executing query music, when locking key: "+fullyQualifiedKey);
- throw new MDBCServiceException("Error in executing query music, when locking: "+fullyQualifiedKey);
}
}
+ partition.setLockId(lockId);
+ result.add(new LockResult(partition.getMusicRangeInformationIndex(),lockId,true));
+ return result;
+ }
+
+ protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException {
+ UUID mriIndex = partition.getMusicRangeInformationIndex();
+ String lockId;
+ lockId = MusicCore.createLockReference(fullyQualifiedKey);
+ ReturnType lockReturn = acquireLock(fullyQualifiedKey,lockId);
+ //\TODO this is wrong, we should have a better way to obtain a lock forcefully, clean the queue and obtain the lock
if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) {
- throw new MDBCServiceException("Could not lock the corresponding lock");
+ return null;
}
- //TODO: Java newbie here, verify that this lockId is actually assigned to the global DatabasePartition in the StateManager instance
partition.setLockId(lockId);
return lockId;
}
@@ -1092,28 +1143,21 @@ public class MusicMixin implements MusicInterface {
* Writes the transaction information to metric's txDigest and musicRangeInformation table
* This officially commits the transaction globally
*/
- @Override
- public void commitLog(DatabasePartition partition,
- HashMap<Range,StagingTable> transactionDigest, String txId ,
- TxCommitProgress progressKeeper) throws MDBCServiceException {
- if (transactionDigest==null || transactionDigest.size()==0) {
- return;
- }
- logger.info("Commiting lock for " + partition.getMusicRangeInformationIndex() + ", txID=" + txId);
- UUID mriIndex = partition.getMusicRangeInformationIndex();
- if(mriIndex==null) {
- //\TODO Fetch MriIndex from the Range Information Table
- throw new MDBCServiceException("TIT Index retrieval not yet implemented");
- }
+ @Override
+ public void commitLog(DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{
+ UUID mriIndex = partition.getMusicRangeInformationIndex();
+ if(mriIndex==null) {
+ own(partition.getSnapshot(),partition);
+ }
String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex.toString();
- //0. See if reference to lock was already created
- String lockId = partition.getLockId();
- if(lockId == null || lockId.isEmpty()) {
- lockId = createAndAssignLock(fullyQualifiedMriKey,partition);
- }
-
- UUID commitId;
- //Generate a local commit id
+ //0. See if reference to lock was already created
+ String lockId = partition.getLockId();
+ if(lockId == null || lockId.isEmpty()) {
+ waitForLock(fullyQualifiedMriKey,partition);
+ }
+
+ UUID commitId;
+ //Generate a local commit id
if(progressKeeper.containsTx(txId)) {
commitId = progressKeeper.getCommitId(txId);
}
@@ -1123,7 +1167,7 @@ public class MusicMixin implements MusicInterface {
}
//Add creation type of transaction digest
- //1. Push new row to RRT and obtain its index
+ //1. Push new row to RRT and obtain its index
String serializedTransactionDigest;
try {
serializedTransactionDigest = MDBCUtils.toString(transactionDigest);
@@ -1133,11 +1177,11 @@ public class MusicMixin implements MusicInterface {
MusicTxDigestId digestId = new MusicTxDigestId(commitId);
addTxDigest(digestId, serializedTransactionDigest);
//2. Save RRT index to RQ
- if(progressKeeper!= null) {
- progressKeeper.setRecordId(txId,digestId);
- }
- //3. Append RRT index into the corresponding TIT row array
- appendToRedoLog(mriIndex,partition,digestId);
+ if(progressKeeper!= null) {
+ progressKeeper.setRecordId(txId,digestId);
+ }
+ //3. Append RRT index into the corresponding TIT row array
+ appendToRedoLog(mriIndex,partition,digestId);
}
/**
@@ -1146,91 +1190,104 @@ public class MusicMixin implements MusicInterface {
* @param rowValues
* @return
*/
- @SuppressWarnings("unused")
- private String getUid(String tableName, String string, Object[] rowValues) {
- //
- // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update
- String cql = String.format("SELECT * FROM %s.%s;", music_ns, tableName);
- PreparedStatement ps = getPreparedStatementFromCache(cql);
- BoundStatement bound = ps.bind();
- bound.setReadTimeoutMillis(60000);
- Session sess = getMusicSession();
- ResultSet rs;
- synchronized (sess) {
- rs = sess.execute(bound);
- }
-
- //should never reach here
- logger.error(EELFLoggerDelegate.errorLogger, "Could not find the row in the primary key");
- return null;
- }
-
- @Override
- public Object[] getObjects(TableInfo ti, String tableName, JSONObject row) {
- // \FIXME: we may need to add the primary key of the row if it was autogenerated by MUSIC
- List<String> cols = ti.columns;
- int size = cols.size();
- boolean hasDefault = false;
- if(row.has(getMusicDefaultPrimaryKeyName())) {
- size++;
- hasDefault = true;
- }
-
- Object[] objects = new Object[size];
- int idx = 0;
- if(hasDefault) {
- objects[idx++] = row.getString(getMusicDefaultPrimaryKeyName());
- }
- for(String col : ti.columns) {
- objects[idx]=row.get(col);
- }
- return objects;
- }
-
- @Override
- public List<UUID> getPartitionIndexes() {
- ArrayList<UUID> partitions = new ArrayList<UUID>();
- String cql = String.format("SELECT rangeid FROM %s.%s", music_ns, musicRangeInformationTableName);
- ResultSet rs = executeMusicRead(cql);
- for (Row r: rs) {
- partitions.add(r.getUUID("rangeid"));
- }
- return partitions;
- }
-
+ @SuppressWarnings("unused")
+ private String getUid(String tableName, String string, Object[] rowValues) {
+ //
+ // Update local MUSIC node. Note: in Cassandra you can insert again on an existing key..it becomes an update
+ String cql = String.format("SELECT * FROM %s.%s;", music_ns, tableName);
+ PreparedStatement ps = getPreparedStatementFromCache(cql);
+ BoundStatement bound = ps.bind();
+ bound.setReadTimeoutMillis(60000);
+ Session sess = getMusicSession();
+ ResultSet rs;
+ synchronized (sess) {
+ rs = sess.execute(bound);
+ }
+
+ //should never reach here
+ logger.error(EELFLoggerDelegate.errorLogger, "Could not find the row in the primary key");
+ return null;
+ }
+
+ public Object[] getObjects(TableInfo ti, String tableName, JSONObject row) {
+ // \FIXME: we may need to add the primary key of the row if it was autogenerated by MUSIC
+ List<String> cols = ti.columns;
+ int size = cols.size();
+ boolean hasDefault = false;
+ if(row.has(getMusicDefaultPrimaryKeyName())) {
+ size++;
+ hasDefault = true;
+ }
+
+ Object[] objects = new Object[size];
+ int idx = 0;
+ if(hasDefault) {
+ objects[idx++] = row.getString(getMusicDefaultPrimaryKeyName());
+ }
+ for(String col : ti.columns) {
+ objects[idx]=row.get(col);
+ }
+ return objects;
+ }
+
+ @Override
+ public List<UUID> getPartitionIndexes() throws MDBCServiceException {
+ ArrayList<UUID> partitions = new ArrayList<UUID>();
+ String cql = String.format("SELECT rangeid FROM %s.%s", music_ns, musicRangeInformationTableName);
+ ResultSet rs = executeMusicRead(cql);
+ for (Row r: rs) {
+ partitions.add(r.getUUID("rangeid"));
+ }
+ return partitions;
+ }
+
+ List<Range> getRanges(Row newRow){
+ List<Range> partitions = new ArrayList<>();
+ Set<String> tables = newRow.getSet("keys",String.class);
+ for (String table:tables){
+ partitions.add(new Range(table));
+ }
+ return partitions;
+ }
+
+ MusicRangeInformationRow getMRIRowFromCassandraRow(Row newRow){
+ UUID partitionIndex = newRow.getUUID("rangeid");
+ List<TupleValue> log = newRow.getList("txredolog",TupleValue.class);
+ List<MusicTxDigestId> digestIds = new ArrayList<>();
+ for(TupleValue t: log){
+ //final String tableName = t.getString(0);
+ final UUID index = t.getUUID(1);
+ digestIds.add(new MusicTxDigestId(index));
+ }
+ List<Range> partitions = new ArrayList<>();
+ Set<String> tables = newRow.getSet("keys",String.class);
+ for (String table:tables){
+ partitions.add(new Range(table));
+ }
+ return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex, ""),
+ digestIds, newRow.getString("ownerid"),newRow.getString("metricprocessid"));
+ }
+
@Override
public MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException {
- //TODO: verify that lock id is valid before calling the database operations function
+ //TODO: verify that lock id is valid before calling the database operations function
//UUID id = partition.getMusicRangeInformationIndex();
String cql = String.format("SELECT * FROM %s.%s WHERE rangeid = ?;", music_ns, musicRangeInformationTableName);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- pQueryObject.addValue(partitionIndex);
- Row newRow;
- try {
- newRow = executeMusicUnlockedQuorumGet(pQueryObject);
- } catch (MDBCServiceException e) {
- logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName);
- throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
- }
-
- List<TupleValue> log = newRow.getList("txredolog",TupleValue.class);
- List<MusicTxDigestId> digestIds = new ArrayList<>();
- for(TupleValue t: log){
- //final String tableName = t.getString(0);
- final UUID index = t.getUUID(1);
- digestIds.add(new MusicTxDigestId(index));
- }
- List<Range> partitions = new ArrayList<>();
- Set<String> tables = newRow.getSet("keys",String.class);
- for (String table:tables){
- partitions.add(new Range(table));
- }
- return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex, ""),
- digestIds, newRow.getString("ownerid"),newRow.getString("metricprocessid"));
- }
-
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ pQueryObject.addValue(partitionIndex);
+ Row newRow;
+ try {
+ newRow = executeMusicUnlockedQuorumGet(pQueryObject);
+ } catch (MDBCServiceException e) {
+ logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName);
+ throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
+ }
+
+ return getMRIRowFromCassandraRow(newRow);
+ }
+
/**
* This function creates the TransactionInformation table. It contain information related
@@ -1254,7 +1311,7 @@ public class MusicMixin implements MusicInterface {
//TODO: Frozen is only needed for old versions of cassandra, please update correspondingly
fields.append("txredolog list<frozen<tuple<text,uuid>>> ");
String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));",
- this.music_ns, tableName, fields, priKey);
+ this.music_ns, tableName, fields, priKey);
try {
executeMusicWriteQuery(this.music_ns,tableName,cql);
} catch (MDBCServiceException e) {
@@ -1262,251 +1319,437 @@ public class MusicMixin implements MusicInterface {
throw(e);
}
}
-
-
+
+
@Override
public DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException {
- DatabasePartition newPartition = info.getDBPartition();
+ DatabasePartition newPartition = info.getDBPartition();
+
String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+newPartition.getMusicRangeInformationIndex().toString();
String lockId = createAndAssignLock(fullyQualifiedMriKey,newPartition);
- createEmptyMriRow(info.getMetricProcessId(),lockId,new ArrayList<Range>());
- throw new UnsupportedOperationException();
+ if(lockId == null || lockId.isEmpty()){
+ throw new MDBCServiceException("Error initializing music range information, error creating a lock for a new row") ;
+ }
+ createEmptyMriRow(newPartition.getMusicRangeInformationIndex(),info.getMetricProcessId(),lockId,newPartition.getSnapshot());
+ return newPartition;
}
-
+
/**
* Creates a new empty MRI row
* @param processId id of the process that is going to own initially this.
* @return uuid associated to the new row
*/
private UUID createEmptyMriRow(String processId, String lockId, List<Range> ranges)
- throws MDBCServiceException {
+ throws MDBCServiceException {
UUID id = MDBCUtils.generateUniqueKey();
return createEmptyMriRow(id,processId,lockId,ranges);
}
-
+
/**
* Creates a new empty MRI row
* @param processId id of the process that is going to own initially this.
* @return uuid associated to the new row
*/
private UUID createEmptyMriRow(UUID id, String processId, String lockId, List<Range> ranges)
- throws MDBCServiceException{
- StringBuilder insert = new StringBuilder("INSERT INTO ")
- .append(this.music_ns)
- .append('.')
- .append(this.musicRangeInformationTableName)
- .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ")
- .append("(")
- .append(id)
- .append(",{");
- boolean first=true;
- for(Range r: ranges){
- if(first){ first=false; }
- else {
- insert.append(',');
- }
- insert.append("'").append(r.toString()).append("'");
- }
- insert.append("},'")
- .append((lockId==null)?"":lockId)
- .append("','")
- .append(processId)
- .append("',[]);");
- PreparedQueryObject query = new PreparedQueryObject();
- query.appendQueryString(insert.toString());
- try {
- executeMusicLockedPut(this.music_ns,this.musicRangeInformationTableName,id.toString(),query,lockId,null);
- } catch (MDBCServiceException e) {
- logger.error("Initialization error: Failure to add new row to transaction information");
- throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
- }
- return id;
+ throws MDBCServiceException{
+ StringBuilder insert = new StringBuilder("INSERT INTO ")
+ .append(this.music_ns)
+ .append('.')
+ .append(this.musicRangeInformationTableName)
+ .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ")
+ .append("(")
+ .append(id)
+ .append(",{");
+ boolean first=true;
+ for(Range r: ranges){
+ if(first){ first=false; }
+ else {
+ insert.append(',');
+ }
+ insert.append("'").append(r.toString()).append("'");
+ }
+ insert.append("},'")
+ .append((lockId==null)?"":lockId)
+ .append("','")
+ .append(processId)
+ .append("',[]);");
+ PreparedQueryObject query = new PreparedQueryObject();
+ query.appendQueryString(insert.toString());
+ try {
+ executeMusicLockedPut(this.music_ns,this.musicRangeInformationTableName,id.toString(),query,lockId,null);
+ } catch (MDBCServiceException e) {
+ logger.error("Initialization error: Failure to add new row to transaction information");
+ throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
+ }
+ return id;
}
@Override
public void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException {
- logger.info("Appending to redo log for partition " + partition.getMusicRangeInformationIndex() + " txId=" + newRecord.txId);
- PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, mriRowId, musicTxDigestTableName, newRecord.txId);
- ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, mriRowId.toString(), appendQuery, partition.getLockId(), null);
- if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
- logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage());
- throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage());
- }
- }
-
+ logger.info("Appending to redo log for partition " + partition.getMusicRangeInformationIndex() + " txId=" + newRecord.txId);
+ PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, mriRowId, musicTxDigestTableName, newRecord.txId);
+ ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, mriRowId.toString(), appendQuery, partition.getLockId(), null);
+ if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
+ logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage());
+ throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage());
+ }
+ }
+
public void createMusicTxDigest() throws MDBCServiceException {
- createMusicTxDigest(-1);
- }
-
-
- /**
- * This function creates the MusicTxDigest table. It contain information related to each transaction committed
- * * LeaseId: id associated with the lease, text
- * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
- * * TransactionDigest: text that contains all the changes in the transaction
- */
- private void createMusicTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException {
- String tableName = this.musicTxDigestTableName;
- if(musicTxDigestTableNumber >= 0) {
- tableName = tableName +
- "-" +
- Integer.toString(musicTxDigestTableNumber);
- }
- String priKey = "txid";
- StringBuilder fields = new StringBuilder();
- fields.append("txid uuid, ");
- fields.append("transactiondigest text ");//notice lack of ','
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey);
- try {
- executeMusicWriteQuery(this.music_ns,tableName,cql);
- } catch (MDBCServiceException e) {
- logger.error("Initialization error: Failure to create redo records table");
- throw(e);
- }
- }
-
-
- /**
- * Writes the transaction history to the txDigest
- */
+ createMusicTxDigest(-1);
+ }
+
+
+ /**
+ * This function creates the MusicTxDigest table. It contain information related to each transaction committed
+ * * LeaseId: id associated with the lease, text
+ * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
+ * * TransactionDigest: text that contains all the changes in the transaction
+ */
+ private void createMusicTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException {
+ String tableName = this.musicTxDigestTableName;
+ if(musicTxDigestTableNumber >= 0) {
+ tableName = tableName +
+ "-" +
+ Integer.toString(musicTxDigestTableNumber);
+ }
+ String priKey = "txid";
+ StringBuilder fields = new StringBuilder();
+ fields.append("txid uuid, ");
+ fields.append("transactiondigest text ");//notice lack of ','
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey);
+ try {
+ executeMusicWriteQuery(this.music_ns,tableName,cql);
+ } catch (MDBCServiceException e) {
+ logger.error("Initialization error: Failure to create redo records table");
+ throw(e);
+ }
+ }
+
+
+ /**
+ * Writes the transaction history to the txDigest
+ */
@Override
public void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException {
- //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest);
- PreparedQueryObject query = new PreparedQueryObject();
- String cqlQuery = "INSERT INTO " +
- this.music_ns +
- '.' +
- this.musicTxDigestTableName +
- " (txid,transactiondigest) " +
- "VALUES (" +
- newId.txId + ",'" +
- transactionDigest +
- "');";
- query.appendQueryString(cqlQuery);
- //\TODO check if I am not shooting on my own foot
- try {
- MusicCore.nonKeyRelatedPut(query,"critical");
- } catch (MusicServiceException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage());
- throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString());
- }
- }
-
+ //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest);
+ PreparedQueryObject query = new PreparedQueryObject();
+ String cqlQuery = "INSERT INTO " +
+ this.music_ns +
+ '.' +
+ this.musicTxDigestTableName +
+ " (txid,transactiondigest) " +
+ "VALUES (" +
+ newId.txId + ",'" +
+ transactionDigest +
+ "');";
+ query.appendQueryString(cqlQuery);
+ //\TODO check if I am not shooting on my own foot
+ try {
+ MusicCore.nonKeyRelatedPut(query,"critical");
+ } catch (MusicServiceException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage());
+ throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString());
+ }
+ }
+ @Override
+ public HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException {
+ String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ pQueryObject.addValue(id.txId);
+ Row newRow;
+ try {
+ newRow = executeMusicUnlockedQuorumGet(pQueryObject);
+ } catch (MDBCServiceException e) {
+ logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.txId);
+ throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
+ }
+ String digest = newRow.getString("transactiondigest");
+ HashMap<Range,StagingTable> changes;
+ try {
+ changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest);
+ } catch (IOException e) {
+ logger.error("IOException when deserializing digest failed with an invalid class for id:"+id.txId);
+ throw new MDBCServiceException("Deserializng digest failed with ioexception");
+ } catch (ClassNotFoundException e) {
+ logger.error("Deserializng digest failed with an invalid class for id:"+id.txId);
+ throw new MDBCServiceException("Deserializng digest failed with an invalid class");
+ }
+ return changes;
+ }
+
+ /**
+ * This function is used to find all the related uuids associated with the required ranges
+ * @param ranges ranges to be find
+ * @return a map that associated each MRI row to the corresponding ranges
+ */
+ private Map<UUID,List<Range>> findRangeRows(List<Range> ranges) throws MDBCServiceException {
+ /* \TODO this function needs to be improved, by creating an additional index, or at least keeping a local cache
+ Additionally, we should at least used pagination and the token function, to avoid retrieving the whole table at
+ once, this can become problematic if we have too many connections in the overall METRIC system */
+ Map<UUID,List<Range>> result = new HashMap<>();
+ List<Range> rangesCopy = new LinkedList<>(ranges);
+ int counter=0;
+ StringBuilder cqlOperation = new StringBuilder();
+ cqlOperation.append("SELECT * FROM ")
+ .append(music_ns)
+ .append(".")
+ .append(musicRangeInformationTableName);
+ ResultSet musicResults = executeMusicRead(cqlOperation.toString());
+ while (!musicResults.isExhausted()) {
+ Row musicRow = musicResults.one();
+ UUID mriIndex = musicRow.getUUID("rangeid");
+ final List<Range> musicRanges = getRanges(musicRow);
+ for(Range retrievedRange : musicRanges) {
+ for (Iterator<Range> iterator = rangesCopy.iterator(); iterator.hasNext(); ) {
+ Range range = iterator.next();
+ if (retrievedRange.overlaps(range)) {
+ // Remove the current element from the iterator and the list.
+ if(!result.containsKey(mriIndex)){
+ result.put(mriIndex,new ArrayList<>());
+ }
+ List<Range> foundRanges = result.get(mriIndex);
+ foundRanges.add(range);
+ iterator.remove();
+ }
+ }
+ }
+ }
+ if(!rangesCopy.isEmpty()){
+ StringBuilder tables = new StringBuilder();
+ for(Range range: rangesCopy){
+ tables.append(range.toString()).append(',');
+ }
+ logger.error("Row in MRI doesn't exist for tables [ "+tables.toString()+"]");
+ throw new MDBCServiceException("MRI row doesn't exist for tables "+tables.toString());
+ }
+ return result;
+ }
+
+ private List<LockResult> lockRow(UUID rowId, List<Range> ranges, DatabasePartition partition)
+ throws MDBCServiceException {
+ List<LockResult> result = new ArrayList<>();
+ if(partition.getMusicRangeInformationIndex()==rowId){
+ result.add(new LockResult(rowId,partition.getLockId(),false));
+ return result;
+ }
+ //\TODO: this function needs to be improved, to track possible changes in the owner of a set of ranges
+ String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+rowId.toString();
+ //return List<Range> knownRanges, UUID mriIndex, String lockId
+ DatabasePartition newPartition = new DatabasePartition(ranges,rowId,null);
+ return waitForLock(fullyQualifiedMriKey,newPartition);
+ }
+
+ @Override
+ public OwnershipReturn own(List<Range> ranges, DatabasePartition partition) throws MDBCServiceException {
+ UUID newId = generateUniqueKey();
+ return appendRange(newId.toString(),ranges,partition);
+ }
+
+ /**
+ * This function is used to check if we need to create a new row in MRI, beacause one of the new ranges is not contained
+ * @param ranges ranges that should be contained in the partition
+ * @param partition currently own partition
+ * @return
+ */
+ public boolean isAppendRequired(List<Range> ranges, DatabasePartition partition){
+ for(Range r: ranges){
+ if(!partition.isContained(r)){
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private List<UUID> mergeMriRows(String newId, Map<UUID,LockResult> lock, DatabasePartition partition)
+ throws MDBCServiceException {
+ List<UUID> oldIds = new ArrayList<>();
+ List<Range> newRanges = new ArrayList<>();
+ for (Map.Entry<UUID,LockResult> entry : lock.entrySet()) {
+ oldIds.add(entry.getKey());
+ final MusicRangeInformationRow mriRow = getMusicRangeInformation(entry.getKey());
+ final DatabasePartition dbPartition = mriRow.getDBPartition();
+ newRanges.addAll(dbPartition.getSnapshot());
+ }
+ DatabasePartition newPartition = new DatabasePartition(newRanges,UUID.fromString(newId),null);
+ String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+newId;
+ final List<LockResult> lockResults = waitForLock(fullyQualifiedMriKey, newPartition);
+ if(lockResults.size()!=1||!lockResults.get(0).newLock){
+ logger.error("When merging rows, lock returned an invalid error");
+ throw new MDBCServiceException("When merging MRI rows, lock returned an invalid error");
+ }
+ final LockResult lockResult = lockResults.get(0);
+ partition.updateDatabasePartition(newPartition);
+ createEmptyMriRow(partition.getMusicRangeInformationIndex(),myId,lockResult.ownerId,partition.getSnapshot());
+ return oldIds;
+ }
@Override
- public HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException {
- String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- pQueryObject.addValue(id.txId);
- Row newRow;
- try {
- newRow = executeMusicUnlockedQuorumGet(pQueryObject);
- } catch (MDBCServiceException e) {
- logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.txId);
- throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
- }
- String digest = newRow.getString("transactiondigest");
- HashMap<Range,StagingTable> changes;
- try {
- changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest);
- } catch (IOException e) {
- logger.error("IOException when deserializing digest failed with an invalid class for id:"+id.txId);
- throw new MDBCServiceException("Deserializng digest failed with ioexception");
- } catch (ClassNotFoundException e) {
- logger.error("Deserializng digest failed with an invalid class for id:"+id.txId);
- throw new MDBCServiceException("Deserializng digest failed with an invalid class");
- }
- return changes;
+ public OwnershipReturn appendRange(String rangeId, List<Range> ranges, DatabasePartition partition)
+ throws MDBCServiceException {
+ if(!isAppendRequired(ranges,partition)){
+ return new OwnershipReturn(partition.getLockId(),UUID.fromString(rangeId),null);
+ }
+ Map<UUID,List<Range>> rows = findRangeRows(ranges);
+ HashMap<UUID,LockResult> rowLock=new HashMap<>();
+ boolean newLock = false;
+ //\TODO: perform this operations in parallel
+ for(Map.Entry<UUID,List<Range>> row : rows.entrySet()){
+ List<LockResult> locks;
+ try {
+ locks = lockRow(row.getKey(),row.getValue(), partition);
+ } catch (MDBCServiceException e) {
+ //TODO: Make a decision if retry or just fail?
+ logger.error("Error locking row");
+ throw e;
+ }
+ for(LockResult l : locks){
+ newLock = newLock || l.getNewLock();
+ rowLock.put(l.getIndex(),l);
+ }
+ }
+ String lockId;
+ List<UUID> oldIds = null;
+ if(rowLock.size()!=1){
+ oldIds = mergeMriRows(rangeId, rowLock, partition);
+ lockId = partition.getLockId();
+ }
+ else{
+ List<LockResult> list = new ArrayList<>(rowLock.values());
+ LockResult lockResult = list.get(0);
+ lockId = lockResult.getOwnerId();
+ }
+
+ return new OwnershipReturn(lockId,UUID.fromString(rangeId),oldIds);
}
@Override
- public void own(List<Range> ranges){
- throw new UnsupportedOperationException();
+ public void relinquish(String ownerId, String rangeId) throws MDBCServiceException{
+ String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+rangeId;
+ try {
+ MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey,ownerId);
+ } catch (MusicLockingException e) {
+ throw new MDBCServiceException(e.getMessage());
+ }
+ }
+
+ /**
+ * This function is used to rate the number of times we relinquish at the end of a transaction
+ * @return true if we should try to relinquish, else should avoid relinquishing in this iteration
+ */
+ private boolean canTryRelinquishing(){
+ return true;
}
@Override
- public void appendRange(String rangeId, List<Range> ranges){
- throw new UnsupportedOperationException();
+ public void relinquishIfRequired(DatabasePartition partition) throws MDBCServiceException {
+ if(!canTryRelinquishing() || !partition.isLocked()){
+ return;
+ }
+ CassaLockStore lsHandle;
+ try {
+ lsHandle = MusicCore.getLockingServiceHandle();
+ } catch (MusicLockingException e) {
+ logger.error("Error obtaining the locking service handle when checking if relinquish was required");
+ throw new MDBCServiceException("Error obtaining locking service"+e.getMessage());
+ }
+ long lockQueueSize;
+ try {
+ lockQueueSize = lsHandle.getLockQueueSize(music_ns, this.musicRangeInformationTableName, partition.getMusicRangeInformationIndex().toString());
+ } catch (MusicServiceException|MusicQueryException e) {
+ logger.error("Error obtaining the lock queue size");
+ throw new MDBCServiceException("Error obtaining lock queue size: "+e.getMessage());
+ }
+ if(lockQueueSize> 1){
+ //If there is any other node waiting, we just relinquish ownership
+ try {
+ relinquish(partition.getLockId(),partition.getMusicRangeInformationIndex().toString());
+ } catch (MDBCServiceException e) {
+ logger.error("Error relinquishing lock, will use timeout to solve");
+ }
+ partition.setLockId("");
+ }
}
+ /**
+ * This method executes a write query in Music
+ * @param cql the CQL to be sent to Cassandra
+ */
+ private static void executeMusicWriteQuery(String keyspace, String table, String cql)
+ throws MDBCServiceException {
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ ResultType rt = null;
+ try {
+ rt = MusicCore.createTable(keyspace,table,pQueryObject,"critical");
+ } catch (MusicServiceException e) {
+ //\TODO: handle better, at least transform into an MDBCServiceException
+ e.printStackTrace();
+ }
+ String result = rt.getResult();
+ if (result==null || result.toLowerCase().equals("failure")) {
+ throw new MDBCServiceException("Music eventual put failed");
+ }
+ }
+
+ private static Row executeMusicLockedGet(String keyspace, String table, PreparedQueryObject cqlObject, String primaryKey,
+ String lock)
+ throws MDBCServiceException{
+ ResultSet result;
+ try {
+ result = MusicCore.criticalGet(keyspace,table,primaryKey,cqlObject,lock);
+ } catch(MusicServiceException e){
+ //\TODO: handle better, at least transform into an MDBCServiceException
+ e.printStackTrace();
+ throw new MDBCServiceException("Error executing critical get");
+ }
+ if(result.isExhausted()){
+ throw new MDBCServiceException("There is not a row that matches the id "+primaryKey);
+ }
+ return result.one();
+ }
+
+ private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject)
+ throws MDBCServiceException{
+ ResultSet result = MusicCore.quorumGet(cqlObject);
+ //\TODO: handle better, at least transform into an MDBCServiceException
+ if(result.isExhausted()){
+ throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]");
+ }
+ return result.one();
+ }
+
+ private void executeMusicLockedPut(String namespace, String tableName,
+ String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId,
+ MusicCore.Condition conditionInfo) throws MDBCServiceException {
+ ReturnType rt ;
+ if(lockId==null) {
+ try {
+ rt = MusicCore.atomicPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, conditionInfo);
+ } catch (MusicLockingException e) {
+ logger.error("Music locked put failed");
+ throw new MDBCServiceException("Music locked put failed");
+ } catch (MusicServiceException e) {
+ logger.error("Music service fail: Music locked put failed");
+ throw new MDBCServiceException("Music service fail: Music locked put failed");
+ } catch (MusicQueryException e) {
+ logger.error("Music query fail: locked put failed");
+ throw new MDBCServiceException("Music query fail: Music locked put failed");
+ }
+ }
+ else {
+ rt = MusicCore.criticalPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, lockId, conditionInfo);
+ }
+ if (rt.getResult().getResult().toLowerCase().equals("failure")) {
+ throw new MDBCServiceException("Music locked put failed");
+ }
+ }
+
+
@Override
- public void relinquish(String ownerId, String rangeId){
- throw new UnsupportedOperationException();
- }
-
- /**
- * This method executes a write query in Music
- * @param cql the CQL to be sent to Cassandra
- */
- private static void executeMusicWriteQuery(String keyspace, String table, String cql)
- throws MDBCServiceException {
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- ResultType rt = null;
- try {
- rt = MusicCore.createTable(keyspace,table,pQueryObject,"critical");
- } catch (MusicServiceException e) {
- //\TODO: handle better, at least transform into an MDBCServiceException
- e.printStackTrace();
- }
- String result = rt.getResult();
- if (result==null || result.toLowerCase().equals("failure")) {
- throw new MDBCServiceException("Music eventual put failed");
- }
- }
-
- private static Row executeMusicLockedGet(String keyspace, String table, PreparedQueryObject cqlObject, String primaryKey,
- String lock)
- throws MDBCServiceException{
- ResultSet result;
- try {
- result = MusicCore.criticalGet(keyspace,table,primaryKey,cqlObject,lock);
- } catch(MusicServiceException e){
- //\TODO: handle better, at least transform into an MDBCServiceException
- e.printStackTrace();
- throw new MDBCServiceException("Error executing critical get");
- }
- if(result.isExhausted()){
- throw new MDBCServiceException("There is not a row that matches the id "+primaryKey);
- }
- return result.one();
- }
-
- private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject)
- throws MDBCServiceException{
- ResultSet result = MusicCore.quorumGet(cqlObject);
- //\TODO: handle better, at least transform into an MDBCServiceException
- if(result.isExhausted()){
- throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]");
- }
- return result.one();
- }
-
- private void executeMusicLockedPut(String namespace, String tableName,
- String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId,
- MusicCore.Condition conditionInfo) throws MDBCServiceException {
- ReturnType rt ;
- if(lockId==null) {
- try {
- rt = MusicCore.atomicPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, conditionInfo);
- } catch (MusicLockingException e) {
- logger.error("Music locked put failed");
- throw new MDBCServiceException("Music locked put failed");
- } catch (MusicServiceException e) {
- logger.error("Music service fail: Music locked put failed");
- throw new MDBCServiceException("Music service fail: Music locked put failed");
- } catch (MusicQueryException e) {
- logger.error("Music query fail: locked put failed");
- throw new MDBCServiceException("Music query fail: Music locked put failed");
- }
- }
- else {
- rt = MusicCore.criticalPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, lockId, conditionInfo);
- }
- if (rt.getResult().getResult().toLowerCase().equals("failure")) {
- throw new MDBCServiceException("Music locked put failed");
- }
- }
+ public void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException{
+ throw new NotImplementedException("Error, replay transaction in music mixin needs to be implemented");
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
index 15384ad..9a8f543 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
@@ -38,6 +38,7 @@ import org.json.JSONObject;
import org.json.JSONTokener;
import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
import org.onap.music.mdbc.tables.Operation;
@@ -49,6 +50,7 @@ import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.delete.Delete;
import net.sf.jsqlparser.statement.insert.Insert;
import net.sf.jsqlparser.statement.update.Update;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
/**
* This class provides the methods that MDBC needs in order to mirror data to/from a
@@ -586,7 +588,7 @@ NEW.field refers to the new value
// the actual columns, otherwise performance when doing range queries are going
// to be even worse (see the else bracket down)
//
- String musicKey = mi.generateUniqueKey();
+ String musicKey = MDBCUtils.generateUniqueKey().toString();
/*} else {
//get key from data
musicKey = msm.getMusicKeyFromRowWithoutPrimaryIndexes(tbl,newRow);
@@ -647,7 +649,7 @@ NEW.field refers to the new value
JSONObject jo = new JSONObject();
if (!getTableInfo(tableName).hasKey()) {
- String musicKey = mi.generateUniqueKey();
+ String musicKey = MDBCUtils.generateUniqueKey().toString();
jo.put(mi.getMusicDefaultPrimaryKeyName(), musicKey);
}
@@ -690,7 +692,12 @@ NEW.field refers to the new value
//
return null;
}
-
+
+
+ public String applyDigest(Map<Range, StagingTable> digest){
+ throw new NotImplementedException();
+ }
+
@SuppressWarnings("unused")
@Deprecated
private ArrayList<String> getMusicKey(String sql) {
@@ -807,7 +814,6 @@ NEW.field refers to the new value
/**
* Parse the transaction digest into individual events
* @param transaction - base 64 encoded, serialized digest
- * @param dbi
*/
public void replayTransaction(HashMap<Range,StagingTable> transaction) throws SQLException {
boolean autocommit = jdbcConn.getAutoCommit();
@@ -840,7 +846,7 @@ NEW.field refers to the new value
/**
* Replays operation into database, usually from txDigest
- * @param stmt
+ * @param jdbcStmt
* @param r
* @param op
* @throws SQLException
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java b/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java
index bc9a8fc..c14d5c9 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/query/QueryProcessor.java
@@ -21,22 +21,17 @@
package org.onap.music.mdbc.query;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.avatica.util.Quoting;
-import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlDialect;
-import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlSelect;
-import org.apache.calcite.sql.SqlSetOption;
import org.apache.calcite.sql.SqlUpdate;
import org.apache.calcite.sql.fun.SqlInOperator;
import org.apache.calcite.sql.parser.SqlParseException;
@@ -44,7 +39,6 @@ import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParserImplFactory;
import org.apache.calcite.sql.parser.impl.SqlParserImpl;
import org.apache.calcite.sql.util.SqlBasicVisitor;
-import org.apache.calcite.sql.util.SqlShuttle;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.sql.validate.SqlConformanceEnum;
import org.apache.calcite.util.Util;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
index 7057172..8784a76 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
@@ -71,25 +71,34 @@ public class MusicTxDigest {
DBInterface dbi = ((MdbcConnection) stateManager.getConnection("daemon")).getDBInterface();
while (true) {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(daemonSleepTimeS));
//update
logger.info(String.format("[%s] Background MusicTxDigest daemon updating local db",
new Timestamp(System.currentTimeMillis())));
//1) get all other partitions from musicrangeinformation
- List<UUID> partitions = mi.getPartitionIndexes();
+ List<UUID> partitions = null;
+ try {
+ partitions = mi.getPartitionIndexes();
+ } catch (MDBCServiceException e) {
+ logger.error("Error obtainting partition indexes, trying again next iteration");
+ continue;
+ }
//2) for each partition I don't own
- DatabasePartition myPartition = stateManager.getRanges();
- for (UUID partition: partitions) {
- if (!partition.equals(myPartition.getMusicRangeInformationIndex())){
- try {
- replayDigestForPartition(mi, partition, dbi);
- } catch (MDBCServiceException e) {
- logger.error("Unable to update for partition : " + partition + ". " + e.getMessage());
- continue;
+ List<DatabasePartition> ranges = stateManager.getRanges();
+ if(ranges.size()!=0) {
+ DatabasePartition myPartition = ranges.get(0);
+ for (UUID partition : partitions) {
+ if (!partition.equals(myPartition.getMusicRangeInformationIndex())) {
+ try {
+ replayDigestForPartition(mi, partition, dbi);
+ } catch (MDBCServiceException e) {
+ logger.error("Unable to update for partition : " + partition + ". " + e.getMessage());
+ continue;
+ }
}
}
}
- Thread.sleep(TimeUnit.SECONDS.toMillis(daemonSleepTimeS));
}
}
@@ -100,7 +109,7 @@ public class MusicTxDigest {
* @param dbi interface to the database that will replay the operations
* @throws MDBCServiceException
*/
- public void replayDigestForPartition(MusicInterface mi, UUID partitionId, DBInterface dbi) throws MDBCServiceException {
+ public static void replayDigestForPartition(MusicInterface mi, UUID partitionId, DBInterface dbi) throws MDBCServiceException {
List<MusicTxDigestId> partitionsRedoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog();
for (MusicTxDigestId txId: partitionsRedoLogTxIds) {
HashMap<Range, StagingTable> transaction = mi.getTxDigest(txId);
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
index 0870be9..dc1bcce 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
@@ -24,6 +24,8 @@ import java.io.Serializable;
import org.json.JSONObject;
import org.json.JSONTokener;
+import static java.util.Objects.hash;
+
public final class Operation implements Serializable{
private static final long serialVersionUID = -1215301985078183104L;
@@ -53,6 +55,11 @@ public final class Operation implements Serializable{
}
@Override
+ public int hashCode(){
+ return hash(TYPE,NEW_VAL);
+ }
+
+ @Override
public boolean equals(Object o){
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tests/MAIN.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tests/MAIN.java
index 348d891..3de4b7a 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tests/MAIN.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tests/MAIN.java
@@ -25,10 +25,10 @@ import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.List;
-import org.apache.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.json.JSONTokener;
+import org.onap.music.logging.EELFLoggerDelegate;
/**
* Run all the tests against all the configurations specified in /tests.json.
@@ -86,7 +86,8 @@ public class MAIN {
}
}
public void run() {
- Logger logger = Logger.getLogger(this.getClass());
+
+ EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(this.getClass());
for (int ix = 0; ix < configs.length(); ix++) {
JSONObject config = configs.getJSONObject(ix);
int succ = 0, fail = 0;