aboutsummaryrefslogtreecommitdiffstats
path: root/mdbc-server/src/main
diff options
context:
space:
mode:
authorArthur Martella <arthur.martella.1@att.com>2019-03-26 18:02:52 -0400
committerTschaen, Brendan <ctschaen@att.com>2019-03-27 12:54:33 -0400
commitdd2937cafd226068dbe518d0b706dc4e6fd164ee (patch)
treec3658ce8519b1b39ba48679c4350f29974be176e /mdbc-server/src/main
parenta40ee886c28bc28db1794792f1fb312b723d48fb (diff)
Improve commit log and benchmarks
Attempting to reconcile Enrique's change at https://gerrit.onap.org/r/#/c/82999/ with the current master. Change-Id: Id669c267e89d185a18d4dfa9a24852ddefcd83eb Issue-ID: MUSIC-369 Signed-off-by: Arthur Martella <arthur.martella.1@att.com>
Diffstat (limited to 'mdbc-server/src/main')
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java25
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java2
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java1
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/Range.java6
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java33
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/configurations/ClusterConfiguration.java88
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java3
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java69
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java11
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java3
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java41
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java513
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java38
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java33
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java176
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java144
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java12
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java12
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java120
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tools/ClusterSetup.java80
-rwxr-xr-xmdbc-server/src/main/resources/music.properties2
21 files changed, 774 insertions, 638 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
index 7377c4f..72b8899 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
@@ -53,7 +53,7 @@ import org.onap.music.mdbc.ownership.Dag;
import org.onap.music.mdbc.ownership.DagNode;
import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
import org.onap.music.mdbc.query.QueryProcessor;
-import org.onap.music.mdbc.tables.MusicTxDigest;
+import org.onap.music.mdbc.tables.MusicTxDigestDaemon;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.TxCommitProgress;
@@ -115,6 +115,10 @@ public class MdbcConnection implements Connection {
logger.debug("Mdbc connection created with id: "+id);
}
+ public DBInterface getDatabaseInterface(){
+ return this.dbi;
+ }
+
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
logger.error(EELFLoggerDelegate.errorLogger, "proxyconn unwrap: " + iface.getName());
@@ -263,6 +267,11 @@ public class MdbcConnection implements Connection {
jdbcConn.close();
logger.debug("Connection was closed for id:" + id);
}
+ try {
+ mi.relinquish(partition.getLockId(),partition.getMRIIndex().toString());
+ } catch (MDBCServiceException e) {
+ throw new SQLException("Failure during relinquish of partition",e);
+ }
}
@Override
@@ -509,7 +518,7 @@ public class MdbcConnection implements Connection {
this.partition.updateDatabasePartition(tempPartition);
statemanager.getOwnAndCheck().reloadAlreadyApplied(this.partition);
}
- dbi.preStatementHook(sql);
+ dbi.preStatementHook(sql);
}
@@ -603,4 +612,16 @@ public class MdbcConnection implements Connection {
mi.relinquishIfRequired(partition);
}
+ public Connection getConnection(){
+ return jdbcConn;
+ }
+
+ public DatabasePartition getPartition() {
+ return partition;
+ }
+
+ public StagingTable getTransactionDigest(){
+ return transactionDigest;
+ }
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
index 1712c30..42b9710 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
@@ -20,7 +20,7 @@
package org.onap.music.mdbc;
import org.onap.music.mdbc.configurations.NodeConfiguration;
-import org.onap.music.mdbc.tables.MusicTxDigest;
+import org.onap.music.mdbc.tables.MusicTxDigestDaemon;
import org.apache.calcite.avatica.remote.Driver.Serialization;
import org.apache.calcite.avatica.remote.LocalService;
import org.apache.calcite.avatica.server.HttpServer;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
index 9fb36ae..8f79840 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
@@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.mdbc.configurations.NodeConfiguration;
-import org.onap.music.mdbc.tables.MusicTxDigest;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
index c498952..16e7170 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
@@ -30,9 +30,7 @@ import java.util.Objects;
* In the future we may decide to partition ranges differently
* @author Enrique Saurez
*/
-public class Range implements Serializable, Cloneable{
-
- private static final long serialVersionUID = 1610744496930800088L;
+public class Range implements Cloneable{
private String table;
@@ -61,7 +59,7 @@ public class Range implements Serializable, Cloneable{
}
@Override
- protected Range clone() {
+ public Range clone() {
Range newRange = null;
try{
newRange = (Range) super.clone();
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index f0d9832..1105bda 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
@@ -36,7 +35,7 @@ import org.onap.music.mdbc.mixins.MusicInterface;
import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn;
import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
import org.onap.music.mdbc.tables.MriReference;
-import org.onap.music.mdbc.tables.MusicTxDigest;
+import org.onap.music.mdbc.tables.MusicTxDigestDaemon;
import org.onap.music.mdbc.tables.TxCommitProgress;
import java.io.IOException;
@@ -48,7 +47,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -95,6 +93,7 @@ public class StateManager {
/** map of transactions that have already been applied/updated in this sites SQL db */
private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
private OwnershipAndCheckpoint ownAndCheck;
+ private Thread txDaemon ;
/**
* For testing purposes only
@@ -122,19 +121,22 @@ public class StateManager {
musicmixin = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT);
initMusic();
- initSqlDatabase();
-
+ initSqlDatabase();
+ initTxDaemonThread();
String t = info.getProperty(Configuration.KEY_OWNERSHIP_TIMEOUT);
- long timeoutMs = (t == null) ? Configuration.DEFAULT_OWNERSHIP_TIMEOUT : Integer.parseInt(t);
+ long timeout = (t == null) ? Configuration.DEFAULT_OWNERSHIP_TIMEOUT : Integer.parseInt(t);
alreadyApplied = new ConcurrentHashMap<>();
- ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeoutMs);
-
- rangesToWarmup = initWarmupRanges();
- logger.info("Warmup ranges for this site is " + rangesToWarmup);
+ ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout);
+ }
- MusicTxDigest txDaemon = new MusicTxDigest(this);
- txDaemon.startBackgroundDaemon(Integer.parseInt(
- info.getProperty(Configuration.TX_DAEMON_SLEEPTIME_S, Configuration.TX_DAEMON_SLEEPTIME_S_DEFAULT)));
+ protected void initTxDaemonThread(){
+ txDaemon = new Thread(
+ new MusicTxDigestDaemon(Integer.parseInt(
+ info.getProperty(Configuration.TX_DAEMON_SLEEPTIME_S, Configuration.TX_DAEMON_SLEEPTIME_S_DEFAULT)),
+ this));
+ txDaemon.setName("TxDaemon");
+ txDaemon.setDaemon(true);
+ txDaemon.start();
}
/**
@@ -163,6 +165,7 @@ public class StateManager {
.append(";");
Statement stmt = sqlConnection.createStatement();
stmt.execute(sql.toString());
+ sqlConnection.close();
} catch (SQLException e) {
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
ErrorTypes.GENERALSERVICEERROR);
@@ -243,7 +246,7 @@ public class StateManager {
returnArray = new ArrayList<>(eventualRanges);
}
else{
- returnArray= null;
+ returnArray= new ArrayList<>();
}
}
finally{
@@ -291,7 +294,7 @@ public class StateManager {
}
if(connectionRanges.containsKey(connectionId)){
//We relinquish all locks obtained by a given
- relinquish(connectionRanges.get(connectionId));
+ //relinquish(connectionRanges.get(connectionId));
connectionRanges.remove(connectionId);
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/ClusterConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/ClusterConfiguration.java
new file mode 100644
index 0000000..943fd46
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/ClusterConfiguration.java
@@ -0,0 +1,88 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+package org.onap.music.mdbc.configurations;
+
+import com.google.gson.Gson;
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import org.onap.music.datastore.PreparedQueryObject;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.exceptions.MusicServiceException;
+import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.main.MusicCore;
+import org.onap.music.mdbc.mixins.MusicMixin;
+
+public class ClusterConfiguration {
+ private String internalNamespace;
+ private int internalReplicationFactor;
+ private String musicNamespace;
+ private int musicReplicationFactor;
+ private String mriTableName;
+ private String mtxdTableName;
+ private String eventualMtxdTableName;
+ private String nodeInfoTableName;
+ private String rangeDependencyTableName;
+
+ private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(ClusterConfiguration.class);
+
+ public void initNamespaces() throws MDBCServiceException{
+ MusicMixin.createKeyspace(internalNamespace,internalReplicationFactor);
+ MusicMixin.createKeyspace(musicNamespace,musicReplicationFactor);
+ }
+
+ public void initTables() throws MDBCServiceException{
+ MusicMixin.createMusicRangeInformationTable(musicNamespace, mriTableName);
+ MusicMixin.createMusicTxDigest(mtxdTableName,musicNamespace, -1);
+ MusicMixin.createMusicEventualTxDigest(eventualMtxdTableName,musicNamespace, -1);
+ MusicMixin.createMusicNodeInfoTable(nodeInfoTableName,musicNamespace,-1);
+ MusicMixin.createMusicRangeDependencyTable(musicNamespace,rangeDependencyTableName);
+ }
+
+ private void initInternalTable() throws MDBCServiceException {
+ StringBuilder createKeysTableCql = new StringBuilder("CREATE TABLE IF NOT EXISTS ")
+ .append(internalNamespace)
+ .append(".unsynced_keys (key text PRIMARY KEY);");
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ queryObject.appendQueryString(createKeysTableCql.toString());
+ try {
+ MusicCore.createTable(internalNamespace,"unsynced_keys", queryObject,"critical");
+ } catch (MusicServiceException e) {
+ logger.error("Error creating unsynced keys table" );
+ throw new MDBCServiceException("Error creating unsynced keys table", e);
+ }
+ }
+
+ public static ClusterConfiguration readJsonFromFile(String filepath) throws FileNotFoundException {
+ BufferedReader br;
+ try {
+ br = new BufferedReader(
+ new FileReader(filepath));
+ } catch (FileNotFoundException e) {
+ logger.error(EELFLoggerDelegate.errorLogger,"File was not found when reading json"+e);
+ throw e;
+ }
+ Gson gson = new Gson();
+ ClusterConfiguration config = gson.fromJson(br, ClusterConfiguration.class);
+ return config;
+ }
+
+
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
index 5349219..38309d5 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
@@ -50,6 +50,9 @@ public class NodeConfiguration {
}
protected List<Range> toRanges(String tables){
+ if(tables.isEmpty()){
+ return new ArrayList<>();
+ }
List<Range> newRange = new ArrayList<>();
String[] tablesArray=tables.split(",");
for(String table: tablesArray) {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
index a9d179f..343a8b8 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
@@ -20,14 +20,12 @@
package org.onap.music.mdbc.configurations;
import com.datastax.driver.core.ResultSet;
-import java.util.stream.Collectors;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
-import org.onap.music.mdbc.RedoRow;
import org.onap.music.mdbc.mixins.MusicMixin;
-import org.onap.music.mdbc.tables.MusicTxDigest;
+import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import com.google.gson.Gson;
import org.onap.music.datastore.PreparedQueryObject;
@@ -48,11 +46,8 @@ public class TablesConfiguration {
private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TablesConfiguration.class);
private List<PartitionInformation> partitions;
- private String internalNamespace;
- private int internalReplicationFactor;
+ String tableToPartitionName;
private String musicNamespace;
- private int musicReplicationFactor;
- private String tableToPartitionName;
private String partitionInformationTableName;
private String redoHistoryTableName;
private String sqlDatabaseName;
@@ -67,8 +62,6 @@ public class TablesConfiguration {
*/
public List<NodeConfiguration> initializeAndCreateNodeConfigurations() throws MDBCServiceException {
logger.info("initializing the required spaces");
- createKeyspaces();
- initInternalNamespace();
List<NodeConfiguration> nodeConfigs = new ArrayList<>();
if(partitions == null){
@@ -77,10 +70,7 @@ public class TablesConfiguration {
}
for(PartitionInformation partitionInfo : partitions){
String mriTableName = partitionInfo.mriTableName;
- checkIfMriIsEmpty(mriTableName);
- //0) Create the corresponding Music Range Information table
- MusicMixin.createMusicRangeInformationTable(musicNamespace,mriTableName);
-
+ checkIfMriDoesNotExists(mriTableName,partitionInfo);
String partitionId;
if(partitionInfo.partitionId==null || partitionInfo.partitionId.isEmpty()){
//1) Create a row in the partition info table
@@ -108,24 +98,7 @@ public class TablesConfiguration {
return nodeConfigs;
}
- private void createKeyspaces() throws MDBCServiceException {
- MusicMixin.createKeyspace(internalNamespace,internalReplicationFactor);
- MusicMixin.createKeyspace(musicNamespace,musicReplicationFactor);
-
- }
-
- private void checkIfMriIsEmpty(String mriTableName) throws MDBCServiceException {
- //First check if table exists
- StringBuilder checkTableExistsString = new StringBuilder("SELECT table_name FROM system_schema.tables WHERE keyspace_name='")
- .append(musicNamespace)
- .append("';");
- PreparedQueryObject checkTableExists = new PreparedQueryObject();
- checkTableExists.appendQueryString(checkTableExistsString.toString());
- final ResultSet resultSet = MusicCore.quorumGet(checkTableExists);
- if(resultSet.isExhausted()){
- //Table doesn't exist
- return;
- }
+ private void checkIfMriDoesNotExists(String mriTableName, PartitionInformation partition) throws MDBCServiceException {
//If exists, check if empty
StringBuilder checkRowsInTableString = new StringBuilder("SELECT * FROM ")
.append(musicNamespace)
@@ -134,27 +107,19 @@ public class TablesConfiguration {
.append("';");
PreparedQueryObject checkRowsInTable = new PreparedQueryObject();
checkRowsInTable.appendQueryString(checkRowsInTableString.toString());
- final ResultSet resultSet2 = MusicCore.quorumGet(checkTableExists);
- if(!resultSet2.isExhausted()) {
- throw new MDBCServiceException("When initializing the configuration of the system, the MRI should not exits "
- + "be empty");
+ final ResultSet resultSet = MusicCore.quorumGet(checkRowsInTable);
+ while(resultSet!=null && !resultSet.isExhausted()){
+ final MusicRangeInformationRow mriRowFromCassandraRow = MusicMixin.getMRIRowFromCassandraRow(resultSet.one());
+ List<Range> ranges = mriRowFromCassandraRow.getDBPartition().getSnapshot();
+ for(Range range: partition.getTables()) {
+ if (Range.overlaps(ranges,range.getTable())){
+ throw new MDBCServiceException("MRI row already exists");
+ }
+ }
}
}
- private void initInternalNamespace() throws MDBCServiceException {
- StringBuilder createKeysTableCql = new StringBuilder("CREATE TABLE IF NOT EXISTS ")
- .append(internalNamespace)
- .append(".unsynced_keys (key text PRIMARY KEY);");
- PreparedQueryObject queryObject = new PreparedQueryObject();
- queryObject.appendQueryString(createKeysTableCql.toString());
- try {
- MusicCore.createTable(internalNamespace,"unsynced_keys", queryObject,"critical");
- } catch (MusicServiceException e) {
- logger.error("Error creating unsynced keys table" );
- throw new MDBCServiceException("Error creating unsynced keys table", e);
- }
- }
public static TablesConfiguration readJsonFromFile(String filepath) throws FileNotFoundException {
BufferedReader br;
@@ -174,7 +139,6 @@ public class TablesConfiguration {
private List<Range> tables;
private String owner;
private String mriTableName;
- private String mtxdTableName;
private String partitionId;
public List<Range> getTables() {
@@ -209,12 +173,5 @@ public class TablesConfiguration {
this.partitionId = partitionId;
}
- public String getMtxdTableName(){
- return mtxdTableName;
- }
-
- public void setMtxdTableName(String mtxdTableName) {
- this.mtxdTableName = mtxdTableName;
- }
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java
index fd8651a..60c97d1 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java
@@ -51,8 +51,8 @@ public class MdbcTestClient {
}
Connection connection;
try {
- String metricURL = "http://localhost:300000/test";
- if(args[0] != null) {
+ String metricURL = "http://localhost:30000/test";
+ if (args.length>0 && args[0] != null) {
metricURL = args[0];
}
connection = DriverManager.getConnection("jdbc:avatica:remote:url=" + metricURL+ ";serialization=protobuf");
@@ -60,7 +60,6 @@ public class MdbcTestClient {
e.printStackTrace();
return;
}
-
try {
connection.setAutoCommit(false);
} catch (SQLException e) {
@@ -68,7 +67,6 @@ public class MdbcTestClient {
return;
}
-
final String sql = "CREATE TABLE IF NOT EXISTS Persons (\n" +
" PersonID int,\n" +
" LastName varchar(255),\n" +
@@ -84,7 +82,6 @@ public class MdbcTestClient {
e.printStackTrace();
return;
}
-
boolean execute = true;
// try {
// execute = stmt.execute(sql);
@@ -123,9 +120,9 @@ public class MdbcTestClient {
}
try {
- //execute = insertStmt.execute(insertSQL);
+ execute = insertStmt.execute(insertSQL);
execute = insertStmt.execute(insertSQL1);
- //execute = insertStmt.execute(insertSQL2);
+ execute = insertStmt.execute(insertSQL2);
//execute = insertStmt.execute(insertSQL3);
//execute = insertStmt.execute(insertSQL4);
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
index a514dc6..a594918 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
@@ -19,6 +19,7 @@
*/
package org.onap.music.mdbc.mixins;
+import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
@@ -130,4 +131,6 @@ public interface DBInterface {
void enableForeignKeyChecks() throws SQLException;
void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException;
+
+ Connection getSQLConnection();
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
index 00f6d00..8b91b28 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
@@ -21,11 +21,8 @@ package org.onap.music.mdbc.mixins;
import com.datastax.driver.core.ResultSet;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
+
import org.json.JSONObject;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.exceptions.MusicLockingException;
@@ -230,12 +227,12 @@ public interface MusicInterface {
/**
* This function is used to append an index to the redo log in a MRI row
- * @param partition information related to ownership of partitions, used to verify ownership
+ * @param MRIIndex index of the row to which the record is going to be added (obtained from the Partition)
+ * @param lockId reference to lock associated to the row in the MRI table MRIIndex.
* @param newRecord index of the new record to be appended to the redo log
* @throws MDBCServiceException
*/
- void appendToRedoLog( DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException;
-
+ void appendToRedoLog(UUID MRIIndex, String lockId, MusicTxDigestId newRecord)throws MDBCServiceException;
/**
* This functions adds the tx digest to
* @param newId id used as index in the MTD table
@@ -261,7 +258,7 @@ public interface MusicInterface {
* @throws MDBCServiceException
*/
- public LinkedHashMap<UUID, StagingTable> getEveTxDigest(String nodeName) throws MDBCServiceException;
+ LinkedHashMap<UUID, StagingTable> getEveTxDigest(String nodeName) throws MDBCServiceException;
/**
* Function used to retrieve a given transaction digest and deserialize it
* @param id of the transaction digest to be retrieved
@@ -277,17 +274,6 @@ public interface MusicInterface {
*/
void relinquishIfRequired(DatabasePartition partition) throws MDBCServiceException;
- /**
- * This function is in charge of owning all the ranges requested and creating a new row that show the ownership of all
- * those ranges.
- * @param rangeId new id to be used in the new row
- * @param ranges ranges to be owned by the end of the function called
- * @param partition current ownership status
- * @return
- * @throws MDBCServiceException
- */
- //OwnershipReturn appendRange(String rangeId, List<Range> ranges, DatabasePartition partition) throws MDBCServiceException;
-
/**
* This functions relinquishes a range
* @param ownerId id of the current ownerh
@@ -318,10 +304,17 @@ public interface MusicInterface {
List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException;
- public void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException;
- public LockResult requestLock(LockRequest request) throws MDBCServiceException;
- public void releaseLocks(Map<UUID, LockResult> newLocks) throws MDBCServiceException;
- public OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges,
+
+ void deleteMriRow(MusicRangeInformationRow row) throws MDBCServiceException;
+
+ void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException;
+
+ LockResult requestLock(LockRequest request) throws MDBCServiceException;
+
+ void releaseLocks(Map<UUID, LockResult> newLocks) throws MDBCServiceException;
+
+ OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges,
Map<UUID, LockResult> locks, UUID ownershipId) throws MDBCServiceException;
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index c6cc512..e548f1a 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -37,8 +37,9 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.BiFunction;
+import java.util.concurrent.*;
+
+import com.datastax.driver.core.*;
import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONObject;
import org.onap.music.datastore.Condition;
@@ -67,14 +68,6 @@ import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.RangeDependency;
import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.TxCommitProgress;
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.ColumnDefinitions;
-import com.datastax.driver.core.DataType;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.TupleValue;
/**
* This class provides the methods that MDBC needs to access Cassandra directly in order to provide persistence
@@ -106,16 +99,19 @@ public class MusicMixin implements MusicInterface {
public static final String KEY_MUSIC_RFACTOR = "music_rfactor";
/** The property name to use to provide the replication factor for Cassandra. */
public static final String KEY_MUSIC_NAMESPACE = "music_namespace";
+ /** The property name to use to provide a flag indicating if compression is required */
+ public static final String KEY_COMPRESSION = "mdbc_compression";
/** Namespace for the tables in MUSIC (Cassandra) */
public static final String DEFAULT_MUSIC_NAMESPACE = "namespace";
/** The default property value to use for the Cassandra IP address. */
public static final String DEFAULT_MUSIC_ADDRESS = "localhost";
/** The default property value to use for the Cassandra replication factor. */
- public static final int DEFAULT_MUSIC_RFACTOR = 1;
+ public static final int DEFAULT_MUSIC_RFACTOR = 3;
/** The default primary string column, if none is provided. */
public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid";
/** Type of the primary key, if none is defined by the user */
public static final String MDBC_PRIMARYKEY_TYPE = "uuid";
+ public static final boolean DEFAULT_COMPRESSION = true;
//\TODO Add logic to change the names when required and create the tables when necessary
@@ -178,9 +174,12 @@ public class MusicMixin implements MusicInterface {
//typemap.put(Types.DATE, "TIMESTAMP");
}
+
protected final String music_ns;
protected final String myId;
protected final String[] allReplicaIds;
+ protected ExecutorService commitExecutorThreads;
+
private final String musicAddress;
private final int music_rfactor;
private MusicConnector mCon = null;
@@ -189,7 +188,7 @@ public class MusicMixin implements MusicInterface {
private Map<String, PreparedStatement> ps_cache = new HashMap<>();
private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>());
private StateManager stateManager;
-
+ private boolean useCompression;
public MusicMixin() {
@@ -206,14 +205,16 @@ public class MusicMixin implements MusicInterface {
// Default to using the host_ids of the various peers as the replica IDs (this is probably preferred)
this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS);
logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: musicAddress="+musicAddress);
-
- String s = info.getProperty(KEY_MUSIC_RFACTOR);
- this.music_rfactor = (s == null) ? DEFAULT_MUSIC_RFACTOR : Integer.parseInt(s);
+ MusicDataStore dsHandle = null;
+ try {
+ dsHandle = MusicDataStoreHandle.getDSHandle();
+ } catch (MusicServiceException e) {
+ e.printStackTrace();
+ }
this.myId = info.getProperty(KEY_MY_ID, getMyHostId());
logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: myId="+myId);
-
this.allReplicaIds = info.getProperty(KEY_REPLICAS, getAllHostIds()).split(",");
logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: allReplicaIds="+info.getProperty(KEY_REPLICAS, this.myId));
@@ -222,7 +223,14 @@ public class MusicMixin implements MusicInterface {
this.stateManager = stateManager;
+ String c = info.getProperty(KEY_COMPRESSION);
+ this.useCompression = (c == null) ? DEFAULT_COMPRESSION: Boolean.parseBoolean(c);
+
+ String s = info.getProperty(KEY_MUSIC_RFACTOR);
+ this.music_rfactor = (s == null) ? DEFAULT_MUSIC_RFACTOR : Integer.parseInt(s);
+
initializeMetricTables();
+ commitExecutorThreads = Executors.newFixedThreadPool(4);
}
public String getMusicTxDigestTableName(){
@@ -244,8 +252,16 @@ public class MusicMixin implements MusicInterface {
public static void createKeyspace(String keyspace, int replicationFactor) throws MDBCServiceException {
Map<String,Object> replicationInfo = new HashMap<>();
- replicationInfo.put("'class'", "'SimpleStrategy'");
- replicationInfo.put("'replication_factor'", replicationFactor);
+ replicationInfo.put("'class'", "'NetworkTopologyStrategy'");
+ if(replicationFactor==3){
+ replicationInfo.put("'dc1'", 1);
+ replicationInfo.put("'dc2'", 1);
+ replicationInfo.put("'dc3'", 1);
+ }
+ else {
+ replicationInfo.put("'class'", "'SimpleStrategy'");
+ replicationInfo.put("'replication_factor'", replicationFactor);
+ }
PreparedQueryObject queryObject = new PreparedQueryObject();
queryObject.appendQueryString(
@@ -317,7 +333,7 @@ public class MusicMixin implements MusicInterface {
createMusicEventualTxDigest();
createMusicNodeInfoTable();
createMusicRangeInformationTable(this.music_ns,this.musicRangeInformationTableName);
- createMusicRangeDependencyTable();
+ createMusicRangeDependencyTable(this.music_ns,this.musicRangeDependencyTableName);
}
catch(MDBCServiceException e){
logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC");
@@ -1207,67 +1223,6 @@ public class MusicMixin implements MusicInterface {
return previous.keySet().equals(current.keySet());
}
- protected List<Range> waitForLock(LockRequest request, DatabasePartition partition,
- Map<UUID, LockResult> rowLock) throws MDBCServiceException {
- List<Range> newRanges = new ArrayList<>();
- if(partition.getMRIIndex()!=request.getId()){
- throw new MDBCServiceException("Invalid argument for wait for lock, range id in request and partition should match");
- }
- String fullyQualifiedKey= music_ns+"."+ request.getTable()+"."+request.getId();
- String lockId = MusicCore.createLockReference(fullyQualifiedKey);
- ReturnType lockReturn = acquireLock(fullyQualifiedKey,lockId);
- if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) {
- //\TODO Improve the exponential backoff
- List<Range> pendingToLock = request.getToLockRanges();
- Map<UUID, String> currentLockRef = new HashMap<>();
- int n = 1;
- int low = 1;
- int high = 1000;
- Random r = new Random();
- Map<Range, RangeMriRow> rangeRows = findRangeRows(pendingToLock);
- NavigableMap<UUID, List<Range>> rowsToLock = getPendingRows(rangeRows);
- NavigableMap<UUID, List<Range>> prevRows = new TreeMap<>();
- while (!pendingToLock.isEmpty() && isDifferent(prevRows,rowsToLock) ) {
- pendingToLock.clear();
- try {
- Thread.sleep(((int) Math.round(Math.pow(2, n)) * 1000)
- + (r.nextInt(high - low) + low));
- } catch (InterruptedException e) {
- continue;
- }
- n++;
- if (n == 20) {
- throw new MDBCServiceException("Lock was impossible to obtain, waited for 20 exponential backoffs!");
- }
- //\TODO do this in parallel
- //\TODO there is a race condition here, from the time we get the find range rows, to the time we lock the row,
- //\TODO this race condition can only be solved if require to obtain lock to all related rows in MRI
- //\TODO before fully owning the range
- //\TODO The rows need to be lock in increasing order of timestamp
- //there could be a new row created
- // Note: This loop needs to be perfomed in sorted order of timebased UUID
- for (Map.Entry<UUID, List<Range>> pending : rowsToLock.entrySet()) {
- List<Range> rs = lockRow(request, pending, currentLockRef, fullyQualifiedKey, lockId, pendingToLock, rowLock);
- newRanges.addAll(rs);
- }
- if (n++ == 20) {
- throw new MDBCServiceException(
- "Lock was impossible to obtain, waited for 20 exponential backoffs!");
- }
- rangeRows = findRangeRows(pendingToLock);
- prevRows = rowsToLock;
- rowsToLock = getPendingRows(rangeRows);
- }
- }
- else {
- partition.setLockId(lockId);
- rowLock.put(partition.getMRIIndex(),new LockResult(partition.getMRIIndex(), lockId, true, partition.getSnapshot()));
- }
- return newRanges;
- }
-
-
-
protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException {
UUID mriIndex = partition.getMRIIndex();
String lockId;
@@ -1315,13 +1270,14 @@ public class MusicMixin implements MusicInterface {
}
}
- protected void appendIndexToMri(String lockId, UUID commitId, UUID MriIndex) throws MDBCServiceException{
- PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MriIndex, musicTxDigestTableName, commitId);
- ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, MriIndex.toString(), appendQuery, lockId, null);
- if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
- logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage());
- throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage());
+ public void createAndAddTxDigest(final StagingTable transactionDigest, UUID digestId)
+ throws MDBCServiceException {
+ ByteBuffer serializedTransactionDigest;
+ serializedTransactionDigest = transactionDigest.getSerializedStagingAndClean();
+ if(useCompression){
+ serializedTransactionDigest = StagingTable.Compress(serializedTransactionDigest);
}
+ addTxDigest(digestId, serializedTransactionDigest);
}
/**
@@ -1331,9 +1287,7 @@ public class MusicMixin implements MusicInterface {
@Override
public void commitLog(DatabasePartition partition,List<Range> eventualRanges, StagingTable transactionDigest,
String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException {
- // first deal with commit for eventually consistent tables
- filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper);
-
+
if(partition==null){
logger.warn("Trying tcommit log with null partition");
return;
@@ -1343,7 +1297,9 @@ public class MusicMixin implements MusicInterface {
logger.warn("Trying to commit log with empty ranges");
return;
}
-
+
+ // first deal with commit for eventually consistent tables
+ filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper);
UUID mriIndex = partition.getMRIIndex();
String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex;
@@ -1353,54 +1309,70 @@ public class MusicMixin implements MusicInterface {
throw new MDBCServiceException("Not able to commit, as you are no longer the lock-holder for this partition");
}
- UUID commitId;
- //Generate a local commit id
- if(progressKeeper.containsTx(txId)) {
- commitId = progressKeeper.getCommitId(txId);
- }
- else{
- logger.error(EELFLoggerDelegate.errorLogger, "Tx with id "+txId+" was not created in the TxCommitProgress ");
- throw new MDBCServiceException("Tx with id "+txId+" was not created in the TxCommitProgress ");
- }
//Add creation type of transaction digest
-
- //1. Push new row to RRT and obtain its index
if(transactionDigest == null || transactionDigest.isEmpty()) {
return;
}
-
- ByteBuffer serializedTransactionDigest;
- if(!transactionDigest.isEmpty()) {
- serializedTransactionDigest = transactionDigest.getSerializedStagingAndClean();
- MusicTxDigestId digestId = new MusicTxDigestId(mriIndex, -1);
- addTxDigest(digestId, serializedTransactionDigest);
- //2. Save RRT index to RQ
- if (progressKeeper != null) {
- progressKeeper.setRecordId(txId, digestId);
+
+ final MusicTxDigestId digestId = new MusicTxDigestId(MDBCUtils.generateUniqueKey(), -1);
+ Callable<Boolean> insertDigestCallable =()-> {
+ try {
+ createAndAddTxDigest(transactionDigest,digestId.transactionId);
+ return true;
+ } catch (MDBCServiceException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Error creating and pushing tx digest to music",e);
+ return false;
}
- //3. Append RRT index into the corresponding TIT row array
- appendToRedoLog(partition, digestId);
- List<Range> ranges = partition.getSnapshot();
- for(Range r : ranges) {
- Map<Range, Pair<MriReference, Integer>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied();
- if(!alreadyApplied.containsKey(r)){
- throw new MDBCServiceException("already applied data structure was not updated correctly and range "
+ };
+ Callable<Boolean> appendCallable=()-> {
+ try {
+ appendToRedoLog(music_ns, mriIndex, digestId.transactionId, lockId, musicTxDigestTableName,
+ musicRangeInformationTableName);
+ return true;
+ } catch (MDBCServiceException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Error creating and pushing tx digest to music",e);
+ return false;
+ }
+ };
+
+ Future<Boolean> appendResultFuture = commitExecutorThreads.submit(appendCallable);
+ Future<Boolean> digestFuture = commitExecutorThreads.submit(insertDigestCallable);
+ try {
+ //Boolean appendResult = appendResultFuture.get();
+ Boolean digestResult = digestFuture.get();
+ if(/*!appendResult ||*/ !digestResult){
+ logger.error(EELFLoggerDelegate.errorLogger, "Error appending to log or adding tx digest");
+ throw new MDBCServiceException("Error appending to log or adding tx digest");
+ }
+ } catch (InterruptedException|ExecutionException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Error executing futures for creating and pushing tx " +
+ "digest to music",e);
+ throw new MDBCServiceException("Failure when retrieving futures for execution of digestion creation and append", e);
+ }
+
+ if (progressKeeper != null) {
+ progressKeeper.setRecordId(txId, digestId);
+ }
+ List<Range> ranges = partition.getSnapshot();
+ for(Range r : ranges) {
+ Map<Range, Pair<MriReference, Integer>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied();
+ if(!alreadyApplied.containsKey(r)){
+ throw new MDBCServiceException("already applied data structure was not updated correctly and range "
+r+" is not contained");
- }
- Pair<MriReference, Integer> rowAndIndex = alreadyApplied.get(r);
- MriReference key = rowAndIndex.getKey();
- if(!mriIndex.equals(key.index)){
- throw new MDBCServiceException("already applied data structure was not updated correctly and range "+
- r+" is not pointing to row: "+mriIndex.toString());
- }
- alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), rowAndIndex.getValue()+1));
}
+ Pair<MriReference, Integer> rowAndIndex = alreadyApplied.get(r);
+ MriReference key = rowAndIndex.getKey();
+ if(!mriIndex.equals(key.index)){
+ throw new MDBCServiceException("already applied data structure was not updated correctly and range "+
+ r+" is not pointing to row: "+mriIndex.toString());
+ }
+ alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), rowAndIndex.getValue()+1));
}
}
private void filterAndAddEventualTxDigest(List<Range> eventualRanges,
- StagingTable transactionDigest, String txId,
- TxCommitProgress progressKeeper) throws MDBCServiceException {
+ StagingTable transactionDigest, String txId,
+ TxCommitProgress progressKeeper) throws MDBCServiceException {
if(eventualRanges == null || eventualRanges.isEmpty()) {
return;
@@ -1410,30 +1382,19 @@ public class MusicMixin implements MusicInterface {
throw new MDBCServiceException();
}
- UUID commitId = getCommitId(txId, progressKeeper);
+ if(!transactionDigest.isEmpty()) {
+ ByteBuffer serialized = transactionDigest.getSerializedEventuallyStagingAndClean();
- ByteBuffer serialized = transactionDigest.getSerializedEventuallyStagingAndClean();
+ if (serialized!=null && useCompression) {
+ serialized = StagingTable.Compress(serialized);
+ }
- if(serialized != null ) {
- MusicTxDigestId digestId = new MusicTxDigestId(commitId,-1);
- addEventualTxDigest(digestId, serialized);
+ if (serialized != null) {
+ MusicTxDigestId digestId = new MusicTxDigestId(MDBCUtils.generateUniqueKey(), -1);
+ addEventualTxDigest(digestId, serialized);
+ }
}
-
- }
-
- private UUID getCommitId(String txId, TxCommitProgress progressKeeper)
- throws MDBCServiceException {
- UUID commitId;
- //Generate a local commit id
- if(progressKeeper.containsTx(txId)) {
- commitId = progressKeeper.getCommitId(txId);
- }
- else{
- logger.error(EELFLoggerDelegate.errorLogger, "Tx with id "+txId+" was not created in the TxCommitProgress ");
- throw new MDBCServiceException("Tx with id "+txId+" was not created in the TxCommitProgress ");
- }
- return commitId;
}
/**
@@ -1503,7 +1464,7 @@ public class MusicMixin implements MusicInterface {
return partitions;
}
- public MusicRangeInformationRow getMRIRowFromCassandraRow(Row newRow){
+ static public MusicRangeInformationRow getMRIRowFromCassandraRow(Row newRow){
UUID partitionIndex = newRow.getUUID("rangeid");
List<TupleValue> log = newRow.getList("txredolog",TupleValue.class);
List<MusicTxDigestId> digestIds = new ArrayList<>();
@@ -1608,9 +1569,13 @@ public class MusicMixin implements MusicInterface {
DatabasePartition newPartition = info.getDBPartition();
String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+newPartition.getMRIIndex().toString();
- String lockId = createAndAssignLock(fullyQualifiedMriKey, newPartition);
+ String lockId;
+ int counter=0;
+ do {
+ lockId = createAndAssignLock(fullyQualifiedMriKey, newPartition);
//TODO: fix this retry logic
- if(lockId == null || lockId.isEmpty()){
+ } while ((lockId ==null||lockId.isEmpty())&&(counter++<3));
+ if (lockId == null || lockId.isEmpty()) {
throw new MDBCServiceException("Error initializing music range information, error creating a lock for a new row" +
"for key "+fullyQualifiedMriKey) ;
}
@@ -1632,7 +1597,7 @@ public class MusicMixin implements MusicInterface {
.append(rangeAndDependencies.getRange().getTable())
.append(",{");
boolean first=true;
- for(Range r: rangeAndDependencies.dependentRanges()){
+ for (Range r: rangeAndDependencies.dependentRanges()) {
if(first){ first=false; }
else {
insert.append(',');
@@ -1658,7 +1623,7 @@ public class MusicMixin implements MusicInterface {
.append(id)
.append(",{");
boolean first=true;
- for(Range r: rangesCopy){
+ for (Range r: rangesCopy) {
if(first){ first=false; }
else {
insert.append(',');
@@ -1706,7 +1671,7 @@ public class MusicMixin implements MusicInterface {
.append(id)
.append(",{");
boolean first=true;
- for(Range r: ranges){
+ for (Range r: ranges) {
if(first){ first=false; }
else {
insert.append(',');
@@ -1731,24 +1696,32 @@ public class MusicMixin implements MusicInterface {
}
@Override
- public void appendToRedoLog(DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException {
- logger.info("Appending to redo log for partition " + partition.getMRIIndex() + " txId=" + newRecord.txId);
- PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, partition.getMRIIndex(),
- musicTxDigestTableName, newRecord.txId);
- ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, partition.getMRIIndex().toString(),
- appendQuery, partition.getLockId(), null);
- if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
+ public void appendToRedoLog(UUID MRIIndex, String lockId, MusicTxDigestId newRecord) throws MDBCServiceException {
+ logger.debug("Appending to redo log for partition " + MRIIndex + " txId=" + newRecord.transactionId);
+ appendToRedoLog(music_ns,MRIIndex,newRecord.transactionId,lockId,musicTxDigestTableName,
+ musicRangeInformationTableName);
+ }
+
+ public void appendToRedoLog(String musicNamespace, UUID MRIIndex, UUID transactionId, String lockId,
+ String musicTxDigestTableName, String musicRangeInformationTableName)
+ throws MDBCServiceException{
+ PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MRIIndex,
+ musicTxDigestTableName, transactionId);
+ ReturnType returnType = MusicCore.criticalPut(musicNamespace, musicRangeInformationTableName, MRIIndex.toString(),
+ appendQuery, lockId, null);
+ //returnType.getExecutionInfo()
+ if (returnType.getResult().compareTo(ResultType.SUCCESS) != 0) {
logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage());
throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage());
}
}
public void createMusicTxDigest() throws MDBCServiceException {
- createMusicTxDigest(-1);
+ createMusicTxDigest(this.musicTxDigestTableName,this.music_ns,-1);
}
public void createMusicEventualTxDigest() throws MDBCServiceException {
- createMusicEventualTxDigest(-1);
+ createMusicEventualTxDigest(musicEventualTxDigestTableName,music_ns,-1);
}
@@ -1758,9 +1731,9 @@ public class MusicMixin implements MusicInterface {
* * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
* * TransactionDigest: text that contains all the changes in the transaction
*/
- private void createMusicEventualTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException {
- String tableName = this.musicEventualTxDigestTableName;
- if(musicTxDigestTableNumber >= 0) {
+ public static void createMusicEventualTxDigest(String musicEventualTxDigestTableName, String musicNamespace, int musicTxDigestTableNumber) throws MDBCServiceException {
+ String tableName = musicEventualTxDigestTableName;
+ if (musicTxDigestTableNumber >= 0) {
tableName = tableName +
"-" +
Integer.toString(musicTxDigestTableNumber);
@@ -1769,12 +1742,13 @@ public class MusicMixin implements MusicInterface {
StringBuilder fields = new StringBuilder();
fields.append("txid uuid, ");
fields.append("transactiondigest blob, ");
+ fields.append("compressed boolean, ");
fields.append("txTimeId TIMEUUID ");//notice lack of ','
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey);
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
try {
- executeMusicWriteQuery(this.music_ns,tableName,cql);
+ executeMusicWriteQuery(musicNamespace,tableName,cql);
} catch (MDBCServiceException e) {
- logger.error("Initialization error: Failure to create redo records table");
+ logger.error("Initialization error: Failure to create eventual tx digest table");
throw(e);
}
}
@@ -1786,9 +1760,9 @@ public class MusicMixin implements MusicInterface {
* * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
* * TransactionDigest: text that contains all the changes in the transaction
*/
- private void createMusicTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException {
- String tableName = this.musicTxDigestTableName;
- if(musicTxDigestTableNumber >= 0) {
+ public static void createMusicTxDigest(String musicTxDigestTableName, String musicNamespace, int musicTxDigestTableNumber) throws MDBCServiceException {
+ String tableName = musicTxDigestTableName;
+ if (musicTxDigestTableNumber >= 0) {
tableName = tableName +
"-" +
Integer.toString(musicTxDigestTableNumber);
@@ -1796,26 +1770,29 @@ public class MusicMixin implements MusicInterface {
String priKey = "txid";
StringBuilder fields = new StringBuilder();
fields.append("txid uuid, ");
+ fields.append("compressed boolean, ");
fields.append("transactiondigest blob ");//notice lack of ','
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey);
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace,
+ tableName, fields, priKey);
try {
- executeMusicWriteQuery(this.music_ns,tableName,cql);
+ executeMusicWriteQuery(musicNamespace,tableName,cql);
} catch (MDBCServiceException e) {
logger.error("Initialization error: Failure to create redo records table");
throw(e);
}
}
- private void createMusicRangeDependencyTable() throws MDBCServiceException {
- String tableName = this.musicRangeDependencyTableName;
+ public static void createMusicRangeDependencyTable(String musicNamespace,String musicRangeDependencyTableName)
+ throws MDBCServiceException {
+ String tableName = musicRangeDependencyTableName;
String priKey = "range";
StringBuilder fields = new StringBuilder();
fields.append("range text, ");
fields.append("dependencies set<text> ");//notice lack of ','
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName,
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName,
fields, priKey);
try {
- executeMusicWriteQuery(this.music_ns,tableName,cql);
+ executeMusicWriteQuery(musicNamespace,tableName,cql);
} catch (MDBCServiceException e) {
logger.error("Initialization error: Failure to create redo records table");
throw(e);
@@ -1828,18 +1805,23 @@ public class MusicMixin implements MusicInterface {
@Override
public void addTxDigest(MusicTxDigestId newId, ByteBuffer transactionDigest) throws MDBCServiceException {
//\TODO: Save Prepared query to history
+ addTxDigest(newId.transactionId,transactionDigest);
+ }
+
+ private void addTxDigest(UUID digestId, ByteBuffer transactionDigest) throws MDBCServiceException{
PreparedQueryObject query = new PreparedQueryObject();
- String cql = String.format("INSERT INTO %s.%s (txid,transactiondigest) VALUES (?,?);",this.music_ns,
+ String cql = String.format("INSERT INTO %s.%s (txid,transactiondigest,compressed ) VALUES (?,?,?);",this.music_ns,
this.musicTxDigestTableName);
query.appendQueryString(cql);
- query.addValue(newId.txId);
+ query.addValue(digestId);
query.addValue(transactionDigest);
+ query.addValue(useCompression);
//\TODO check if I am not shooting on my own foot
try {
MusicCore.nonKeyRelatedPut(query,"critical");
} catch (MusicServiceException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage());
- throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString(), e);
+ logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for digest id "+digestId.toString()+ "with error "+e.getErrorMessage());
+ throw new MDBCServiceException("Transaction Digest serialization for digest id "+digestId.toString(), e);
}
}
@@ -1854,11 +1836,11 @@ public class MusicMixin implements MusicInterface {
this.music_ns +
'.' +
this.musicEventualTxDigestTableName +
- " (txid,transactiondigest,txTimeId) " +
+ " (txid,transactiondigest,compressed,txTimeId) " +
"VALUES (" +
- newId.txId + ",'" +
- transactionDigest +
- "'," +
+ newId.transactionId+ ",'" +
+ transactionDigest + "'," +
+ useCompression + ","+
// "toTimestamp(now())" +
"now()" +
");";
@@ -1867,8 +1849,8 @@ public class MusicMixin implements MusicInterface {
try {
MusicCore.nonKeyRelatedPut(query,"critical");
} catch (MusicServiceException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage());
- throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString(), e);
+ logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.transactionId.toString()+ "with error "+e.getErrorMessage());
+ throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.transactionId.toString(), e);
}
}
@@ -1877,17 +1859,21 @@ public class MusicMixin implements MusicInterface {
String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName);
PreparedQueryObject pQueryObject = new PreparedQueryObject();
pQueryObject.appendQueryString(cql);
- pQueryObject.addValue(id.txId);
+ pQueryObject.addValue(id.transactionId);
Row newRow;
try {
newRow = executeMusicUnlockedQuorumGet(pQueryObject);
} catch (MDBCServiceException e) {
- logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.txId);
+ logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.transactionId);
throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e);
}
ByteBuffer digest = newRow.getBytes("transactiondigest");
+ Boolean compressed = newRow.getBool("compressed");
StagingTable changes;
try {
+ if(compressed){
+ digest = StagingTable.Decompress(digest);
+ }
changes = new StagingTable(digest);
} catch (MDBCServiceException e) {
logger.error("Deserializng digest failed with an exception:"+e.getErrorMessage());
@@ -1921,10 +1907,14 @@ public class MusicMixin implements MusicInterface {
while (!rs.isExhausted()) {
Row row = rs.one();
ByteBuffer digest = row.getBytes("transactiondigest");
+ Boolean compressed = row.getBool("compressed");
//String txTimeId = row.getString("txtimeid"); //???
UUID txTimeId = row.getUUID("txtimeid");
try {
+ if(compressed){
+ digest=StagingTable.Decompress(digest);
+ }
changes = new StagingTable(digest);
} catch (MDBCServiceException e) {
logger.error("Deserializng digest failed: "+e.getErrorMessage());
@@ -1935,9 +1925,6 @@ public class MusicMixin implements MusicInterface {
return ecDigestInformation;
}
-
-
-
ResultSet getAllMriCassandraRows() throws MDBCServiceException {
StringBuilder cqlOperation = new StringBuilder();
cqlOperation.append("SELECT * FROM ")
@@ -1959,34 +1946,6 @@ public class MusicMixin implements MusicInterface {
return rows;
}
- private RangeMriRow findRangeRow(Range range) throws MDBCServiceException {
- RangeMriRow row = null;
- final ResultSet musicResults = getAllMriCassandraRows();
- while (!musicResults.isExhausted()) {
- Row musicRow = musicResults.one();
- final MusicRangeInformationRow mriRow = getMRIRowFromCassandraRow(musicRow);
- final List<Range> musicRanges = getRanges(musicRow);
- //\TODO optimize this for loop to avoid redudant access
- for(Range retrievedRange : musicRanges) {
- if (retrievedRange.overlaps(range)) {
- if(row==null){
- row = new RangeMriRow(range);
- row.setCurrentRow(mriRow);
- }
- else if(row.getCurrentRow().getTimestamp() < mriRow.getTimestamp()){
- row.addOldRow(row.getCurrentRow());
- row.setCurrentRow(mriRow);
- }
- }
- }
- }
- if(row==null){
- logger.error("Row in MRI doesn't exist for Range "+range.toString());
- throw new MDBCServiceException("Row in MRI doesn't exist for Range "+range.toString());
- }
- return row;
- }
-
/**
* This function is used to find all the related uuids associated with the required ranges
* @param ranges ranges to be find
@@ -2037,18 +1996,6 @@ public class MusicMixin implements MusicInterface {
return result;
}
- private List<Range> lockRow(LockRequest request, DatabasePartition partition,Map<UUID, LockResult> rowLock)
- throws MDBCServiceException {
- if(partition.getMRIIndex().equals(request.getId()) && partition.isLocked()){
- return new ArrayList<>();
- }
- //\TODO: this function needs to be improved, to track possible changes in the owner of a set of ranges
- String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+request.getId().toString();
- //return List<Range> knownRanges, UUID mriIndex, String lockId
- DatabasePartition newPartition = new DatabasePartition(request.getToLockRanges(),request.getId(),null);
- return waitForLock(request,newPartition,rowLock);
- }
-
private void unlockKeyInMusic(String table, String key, String lockref) {
String fullyQualifiedKey= music_ns+"."+ table+"."+lockref;
MusicCore.destroyLockRef(fullyQualifiedKey,lockref);
@@ -2182,8 +2129,6 @@ public class MusicMixin implements MusicInterface {
return new OwnershipReturn(ownershipId, ownRow.getOwnerId(), ownRow.getIndex(),ranges,extendedDag);
}
-
-
/**
* This function is used to check if we need to create a new row in MRI, beacause one of the new ranges is not contained
* @param ranges ranges that should be contained in the partition
@@ -2199,78 +2144,7 @@ public class MusicMixin implements MusicInterface {
return false;
}
- private Map<UUID,String> mergeMriRows(String newId, Map<UUID,LockResult> lock, DatabasePartition partition)
- throws MDBCServiceException {
- Map<UUID,String> oldIds = new HashMap<>();
- List<Range> newRanges = new ArrayList<>();
- for (Map.Entry<UUID,LockResult> entry : lock.entrySet()) {
- oldIds.put(entry.getKey(),entry.getValue().getOwnerId());
- //\TODO check if we need to do a locked get? Is that even required?
- final MusicRangeInformationRow mriRow = getMusicRangeInformation(entry.getKey());
- final DatabasePartition dbPartition = mriRow.getDBPartition();
- newRanges.addAll(dbPartition.getSnapshot());
- }
- DatabasePartition newPartition = new DatabasePartition(newRanges,UUID.fromString(newId),null);
- String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+newId;
- UUID newUUID = UUID.fromString(newId);
- LockRequest newRequest = new LockRequest(musicRangeInformationTableName,newUUID,newRanges);
- waitForLock(newRequest, newPartition,lock);
- if(!lock.containsKey(newUUID)||!lock.get(newUUID).isNewLock()){
- logger.error("When merging rows, lock returned an invalid error");
- throw new MDBCServiceException("When merging MRI rows, lock returned an invalid error");
- }
- final LockResult lockResult = lock.get(newUUID);
- partition.updateDatabasePartition(newPartition);
- logger.info("Creating MRI " +partition.getMRIIndex()+ " for ranges " + partition.getSnapshot());
- createEmptyMriRow(this.music_ns,this.musicRangeInformationTableName,partition.getMRIIndex(),myId,
- lockResult.getOwnerId(),partition.getSnapshot(),true);
- return oldIds;
- }
-
- private void obtainAllLocks(NavigableMap<UUID, List<Range>> rowsToLock,DatabasePartition partition,
- List<Range> newRanges,Map<UUID, LockResult> rowLock) throws MDBCServiceException {
- //\TODO: perform this operations in parallel
- for(Map.Entry<UUID,List<Range>> row : rowsToLock.entrySet()){
- List<Range> additionalRanges;
- try {
- LockRequest newRequest = new LockRequest(musicRangeInformationTableName,row.getKey(),row.getValue());
- additionalRanges =lockRow(newRequest, partition, rowLock);
- } catch (MDBCServiceException e) {
- //TODO: Make a decision if retry or just fail?
- logger.error("Error locking row",e);
- throw e;
- }
- newRanges.addAll(additionalRanges);
- }
- }
-
-/* @Override
- public OwnershipReturn appendRange(String rangeId, List<Range> ranges, DatabasePartition partition)
- throws MDBCServiceException {
- if(!isAppendRequired(ranges,partition)){
- return new OwnershipReturn(partition.getLockId(),UUID.fromString(rangeId),null,null);
- }
- Map<Range, RangeMriRow> rows = findRangeRows(ranges);
- final NavigableMap<UUID, List<Range>> rowsToLock = getPendingRows(rows);
- HashMap<UUID, LockResult> rowLock = new HashMap<>();
- List<Range> newRanges = new ArrayList<>();
-
- obtainAllLocks(rowsToLock,partition,newRanges,rowLock);
- String lockId;
- Map<UUID,String> oldIds = null;
- if(rowLock.size()!=1){
- oldIds = mergeMriRows(rangeId, rowLock, partition);
- lockId = partition.getLockId();
- }
- else{
- List<LockResult> list = new ArrayList<>(rowLock.values());
- LockResult lockResult = list.get(0);
- lockId = lockResult.getOwnerId();
- }
-
- return new OwnershipReturn(lockId,UUID.fromString(rangeId),oldIds,newRanges);
- }*/
@Override
public void relinquish(String ownerId, String rangeId) throws MDBCServiceException{
@@ -2290,7 +2164,9 @@ public class MusicMixin implements MusicInterface {
* @return true if we should try to relinquish, else should avoid relinquishing in this iteration
*/
private boolean canTryRelinquishing(){
- return true;
+ //\TODO: Fix this!!!! REALLY IMPORTANT TO BE FIX
+ // This should actually have some mechanism to relinquish ownership
+ return false;
}
@Override
@@ -2446,7 +2322,7 @@ public class MusicMixin implements MusicInterface {
}
public void createMusicNodeInfoTable() throws MDBCServiceException {
- createMusicNodeInfoTable(-1);
+ createMusicNodeInfoTable(musicNodeInfoTableName,music_ns,-1);
}
/**
@@ -2460,8 +2336,8 @@ public class MusicMixin implements MusicInterface {
* * TxTimeID, TIMEUUID.
* * LastTxDigestID, uuid. (not needed as of now!!)
*/
- private void createMusicNodeInfoTable(int nodeInfoTableNumber) throws MDBCServiceException {
- String tableName = this.musicNodeInfoTableName;
+ public static void createMusicNodeInfoTable(String musicNodeInfoTableName, String musicNamespace, int nodeInfoTableNumber) throws MDBCServiceException {
+ String tableName = musicNodeInfoTableName;
if(nodeInfoTableNumber >= 0) {
tableName = tableName +
"-" +
@@ -2478,13 +2354,13 @@ public class MusicMixin implements MusicInterface {
String cql = String.format(
"CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));",
- this.music_ns,
+ musicNamespace,
tableName,
fields,
priKey);
try {
- executeMusicWriteQuery(this.music_ns,tableName,cql);
+ executeMusicWriteQuery(musicNamespace,tableName,cql);
} catch (MDBCServiceException e) {
logger.error("Initialization error: Failure to create node information table");
throw(e);
@@ -2514,4 +2390,19 @@ public class MusicMixin implements MusicInterface {
}
+ @Override
+ public void deleteMriRow(MusicRangeInformationRow row) throws MDBCServiceException{
+ String cql = String.format("DELETE FROM %s.%s WHERE rangeid = ?;", music_ns, musicRangeInformationTableName);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ pQueryObject.addValue(row.getPartitionIndex());
+ ReturnType rt ;
+ try {
+ rt = MusicCore.atomicPut(music_ns, musicRangeDependencyTableName, row.getPartitionIndex().toString(),
+ pQueryObject, null);
+ } catch (MusicLockingException|MusicQueryException|MusicServiceException e) {
+ logger.error("Failure when deleting mri row");
+ new MDBCServiceException("Error deleting mri row",e);
+ }
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
index 0afacea..af6b859 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
@@ -74,13 +74,15 @@ public class MySQLMixin implements DBInterface {
public static final String TRANS_TBL = "MDBC_TRANSLOG";
private static final String CREATE_TBL_SQL =
"CREATE TABLE IF NOT EXISTS "+TRANS_TBL+
- " (IX INT AUTO_INCREMENT, OP CHAR(1), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA VARCHAR(1024), CONNECTION_ID INT,PRIMARY KEY (IX))";
+ " (IX INT AUTO_INCREMENT, OP CHAR(1), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA VARCHAR(1024), " +
+ "CONNECTION_ID INT, PRIMARY KEY (IX));";
private final MusicInterface mi;
private final int connId;
private final String dbName;
private final Connection jdbcConn;
private final Map<String, TableInfo> tables;
+ private PreparedStatement deleteStagingStatement;
private boolean server_tbl_created = false;
public MySQLMixin() {
@@ -89,14 +91,20 @@ public class MySQLMixin implements DBInterface {
this.dbName = null;
this.jdbcConn = null;
this.tables = null;
+ this.deleteStagingStatement = null;
}
- public MySQLMixin(MusicInterface mi, String url, Connection conn, Properties info) {
+ public MySQLMixin(MusicInterface mi, String url, Connection conn, Properties info) throws SQLException {
this.mi = mi;
this.connId = generateConnID(conn);
this.dbName = getDBName(conn);
this.jdbcConn = conn;
this.tables = new HashMap<String, TableInfo>();
}
+
+ private PreparedStatement getStagingDeletePreparedStatement() throws SQLException {
+ return jdbcConn.prepareStatement("DELETE FROM "+TRANS_TBL+" WHERE (IX BETWEEN ? AND ? ) AND " +
+ "CONNECTION_ID = ?;");
+ }
// This is used to generate a unique connId for this connection to the DB.
private int generateConnID(Connection conn) {
int rv = (int) System.currentTimeMillis(); // random-ish
@@ -297,6 +305,7 @@ mysql> describe tables;
Statement stmt = jdbcConn.createStatement();
stmt.execute(CREATE_TBL_SQL);
stmt.close();
+ this.deleteStagingStatement = getStagingDeletePreparedStatement();
logger.info(EELFLoggerDelegate.applicationLogger,"createSQLTriggers: Server side dirty table created.");
server_tbl_created = true;
} catch (SQLException e) {
@@ -594,11 +603,15 @@ NEW.field refers to the new value
// copy from DB.MDBC_TRANSLOG where connid == myconnid
// then delete from MDBC_TRANSLOG
String sql2 = "SELECT IX, TABLENAME, OP, ROWDATA,KEYDATA FROM "+TRANS_TBL +" WHERE CONNECTION_ID = " + this.connId;
+ Integer biggestIx = Integer.MIN_VALUE;
+ Integer smallestIx = Integer.MAX_VALUE;
try {
ResultSet rs = executeSQLRead(sql2);
Set<Integer> rows = new TreeSet<Integer>();
while (rs.next()) {
int ix = rs.getInt("IX");
+ biggestIx = Integer.max(biggestIx,ix);
+ smallestIx = Integer.min(smallestIx,ix);
String op = rs.getString("OP");
OperationType opType = toOpEnum(op);
String tbl = rs.getString("TABLENAME");
@@ -609,17 +622,13 @@ NEW.field refers to the new value
rows.add(ix);
}
rs.getStatement().close();
+ // batch delete operations
if (rows.size() > 0) {
- //TODO: DO batch deletion
- sql2 = "DELETE FROM "+TRANS_TBL+" WHERE IX = ?";
- PreparedStatement ps = jdbcConn.prepareStatement(sql2);
- logger.debug("Executing: "+sql2);
- logger.debug(" For ix = "+rows);
- for (int ix : rows) {
- ps.setInt(1, ix);
- ps.execute();
- }
- ps.close();
+ this.deleteStagingStatement.setInt(1,smallestIx);
+ this.deleteStagingStatement.setInt(2,biggestIx);
+ this.deleteStagingStatement.setInt(3,this.connId);
+ logger.debug("Staging delete: Executing with vals ["+smallestIx+","+biggestIx+","+this.connId+"]");
+ this.deleteStagingStatement.execute();
}
} catch (SQLException e) {
logger.warn("Exception in postStatementHook: "+e);
@@ -1030,4 +1039,9 @@ NEW.field refers to the new value
String sql = "DELETE FROM " + TRANS_TBL + " WHERE CONNECTION_ID = " + this.connId;
jdbcStmt.executeQuery(sql);
}
+
+ @Override
+ public Connection getSQLConnection(){
+ return jdbcConn;
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
index 057b550..e76e1b1 100644
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
@@ -214,12 +214,7 @@ public class OwnershipAndCheckpoint{
checkpointLock.unlock();
break;
} else {
- final StagingTable txDigest = mi.getTxDigest(pair.getKey());
- applyTxDigest(rangesToWarmup,di, txDigest);
- for (Range r : pair.getValue()) {
- MusicRangeInformationRow row = node.getRow();
- alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index));
- }
+ applyDigestAndUpdateDataStructures(mi, di, rangesToWarmup, node, pair);
}
pair = node.nextNotAppliedTransaction(rangeSet);
enableForeignKeys(di);
@@ -233,6 +228,24 @@ public class OwnershipAndCheckpoint{
}
}
+ private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface di, List<Range> ranges, DagNode node,
+ Pair<MusicTxDigestId, List<Range>> pair) throws MDBCServiceException {
+ final StagingTable txDigest;
+ try {
+ txDigest = mi.getTxDigest(pair.getKey());
+ } catch (MDBCServiceException e) {
+ logger.warn("Transaction digest was not found, this could be caused by a failure of the previous owner"
+ +"And would normally only happen as the last ID of the corresponding redo log. Please check that this is the"
+ +" case for txID "+pair.getKey().transactionId.toString());
+ return;
+ }
+ applyTxDigest(ranges,di, txDigest);
+ for (Range r : pair.getValue()) {
+ MusicRangeInformationRow row = node.getRow();
+ alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index));
+ }
+ }
+
private void applyRequiredChanges(MusicInterface mi, DBInterface db, Dag extendedDag, List<Range> ranges, UUID ownOpId)
throws MDBCServiceException {
Set<Range> rangeSet = new HashSet<Range>(ranges);
@@ -242,12 +255,7 @@ public class OwnershipAndCheckpoint{
if(node!=null) {
Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet);
while (pair != null) {
- final StagingTable txDigest = mi.getTxDigest(pair.getKey());
- applyTxDigest(ranges, db, txDigest);
- for (Range r : pair.getValue()) {
- MusicRangeInformationRow row = node.getRow();
- alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index));
- }
+ applyDigestAndUpdateDataStructures(mi, db, ranges, node, pair);
pair = node.nextNotAppliedTransaction(rangeSet);
if (timeout(ownOpId)) {
enableForeignKeys(db);
@@ -393,6 +401,7 @@ public class OwnershipAndCheckpoint{
return rowsPerLatestRange;
}
+
public Map<Range, Pair<MriReference, Integer>> getAlreadyApplied() {
return this.alreadyApplied;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
deleted file mode 100644
index 5b3872a..0000000
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * ============LICENSE_START====================================================
- * org.onap.music.mdbc
- * =============================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END======================================================
- */
-package org.onap.music.mdbc.tables;
-
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import org.onap.music.exceptions.MDBCServiceException;
-import org.onap.music.logging.EELFLoggerDelegate;
-import org.onap.music.mdbc.DatabasePartition;
-import org.onap.music.mdbc.MdbcConnection;
-import org.onap.music.mdbc.Range;
-import org.onap.music.mdbc.StateManager;
-import org.onap.music.mdbc.mixins.DBInterface;
-import org.onap.music.mdbc.mixins.MusicInterface;
-
-public class MusicTxDigest {
- private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicTxDigest.class);
-
- //private MdbcServerLogic mdbcServer;
- //private NodeConfiguration config;
- private StateManager stateManager;
-
- public MusicTxDigest(StateManager stateManager) {
- this.stateManager = stateManager;
- }
-
- /**
- * Runs the body of the background daemon
- * @param daemonSleepTimeS time, in seconds, between updates
- * @throws InterruptedException
- */
- public void backgroundDaemon(int daemonSleepTimeS) throws InterruptedException {
- MusicInterface mi = stateManager.getMusicInterface();
- DBInterface dbi = ((MdbcConnection) stateManager.getConnection("daemon")).getDBInterface();
-
- while (true) {
- Thread.sleep(TimeUnit.SECONDS.toMillis(daemonSleepTimeS));
- //update
- logger.info(String.format("[%s] Background MusicTxDigest daemon updating local db",
- new Timestamp(System.currentTimeMillis())));
-
- //1) get all other partitions from musicrangeinformation
- List<UUID> partitions = null;
- try {
- partitions = mi.getPartitionIndexes();
- } catch (MDBCServiceException e) {
- logger.error("Error obtainting partition indexes, trying again next iteration");
- continue;
- }
- //2) for each partition I don't own
- final Set<Range> warmupRanges = stateManager.getRangesToWarmup();
- final List<DatabasePartition> currentPartitions = stateManager.getPartitions();
- if (currentPartitions.size() != 0) {
- for (DatabasePartition part : currentPartitions) {
- List<Range> partitionRanges = part.getSnapshot();
- warmupRanges.removeAll(partitionRanges);
- }
- try {
- stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
- } catch (MDBCServiceException e) {
- logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
- continue;
- }
- }
-
- //Step 3: ReplayDigest() for E.C conditions
- try {
- replayDigest(mi,dbi, stateManager.getEventualRanges());
- } catch (MDBCServiceException e) {
- logger.error("Unable to perform Eventual Consistency operations" + e.getMessage());
- continue;
- }
-
- }
- }
-
- /**
- * Replay the digest for eventual consistency.
- * @param mi music interface
- * @param dbi interface to the database that will replay the operations
- * @param ranges only these ranges will be applied from the digests
- * @throws MDBCServiceException
- */
- public void replayDigest(MusicInterface mi, DBInterface dbi, List<Range> ranges) throws MDBCServiceException {
- StagingTable transaction;
- String nodeName = stateManager.getMdbcServerName();
-
- LinkedHashMap<UUID,StagingTable> ecDigestInformation = mi.getEveTxDigest(nodeName);
- Set<UUID> keys = ecDigestInformation.keySet();
- for(UUID txTimeID:keys){
- transaction = ecDigestInformation.get(txTimeID);
- try {
- dbi.replayTransaction(transaction, ranges); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??)
- } catch (SQLException e) {
- logger.error("EC:Rolling back the entire digest replay.");
- return;
- }
- logger.info("EC: Successfully replayed transaction ");
-
- try {
- mi.updateNodeInfoTableWithTxTimeIDKey(txTimeID, nodeName);
- } catch (MDBCServiceException e) {
- logger.error("EC:Rolling back the entire digest replay.");
- }
- }
-;
- }
-
-
- /**
- * Replay the digest for a given partition
- * @param mi music interface
- * @param partitionId the partition to be replayed
- * @param dbi interface to the database that will replay the operations
- * @throws MDBCServiceException
- */
- public static void replayDigestForPartition(MusicInterface mi, UUID partitionId, DBInterface dbi) throws MDBCServiceException {
- final MusicRangeInformationRow row = mi.getMusicRangeInformation(partitionId);
- List<MusicTxDigestId> partitionsRedoLogTxIds = row.getRedoLog();
- for (MusicTxDigestId txId: partitionsRedoLogTxIds) {
- StagingTable transaction = mi.getTxDigest(txId);
- try {
- //\TODO do this two operations in parallel
- dbi.replayTransaction(transaction, row.getDBPartition().getSnapshot());
- mi.replayTransaction(transaction);
- } catch (SQLException e) {
- logger.error("Rolling back the entire digest replay. " + partitionId);
- return;
- }
- logger.info("Successfully replayed transaction " + txId);
- }
- //todo, keep track of where I am in pointer
- }
-
- /**
- * Start the background daemon defined by this object
- * Spawns a new thread and runs "backgroundDaemon"
- * @param daemonSleepTimeS time, in seconds, between updates run by daemon
- */
- public void startBackgroundDaemon(int daemonSleepTimeS) {
- class MusicTxBackgroundDaemon implements Runnable {
- public void run() {
- while (true) {
- try {
- logger.info("MusicTxDigest background daemon started");
- backgroundDaemon(daemonSleepTimeS);
- } catch (InterruptedException e) {
- logger.error("MusicTxDigest background daemon stopped " + e.getMessage());
- }
- }
- }
- }
- Thread t = new Thread(new MusicTxBackgroundDaemon());
- t.start();
-
- }
-}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
new file mode 100644
index 0000000..4f3a3bf
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
@@ -0,0 +1,144 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+package org.onap.music.mdbc.tables;
+
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.DatabasePartition;
+import org.onap.music.mdbc.MdbcConnection;
+import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.StateManager;
+import org.onap.music.mdbc.mixins.DBInterface;
+import org.onap.music.mdbc.mixins.MusicInterface;
+
+public class MusicTxDigestDaemon implements Runnable {
+
+ private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicTxDigestDaemon.class);
+
+ private StateManager stateManager;
+ private int daemonSleepTimeS;
+
+ public MusicTxDigestDaemon(int daemonSleepTimeS, StateManager stateManager) {
+ this.stateManager = stateManager;
+ this.daemonSleepTimeS = daemonSleepTimeS;
+ }
+
+ /**
+ * Replay the digest for eventual consistency.
+ *
+ * @param mi music interface
+ * @param dbi interface to the database that will replay the operations
+ * @param ranges only these ranges will be applied from the digests
+ */
+ public void replayDigest(MusicInterface mi, DBInterface dbi, List<Range> ranges) throws MDBCServiceException {
+ StagingTable transaction;
+ String nodeName = stateManager.getMdbcServerName();
+
+ LinkedHashMap<UUID, StagingTable> ecDigestInformation = mi.getEveTxDigest(nodeName);
+ Set<UUID> keys = ecDigestInformation.keySet();
+ for (UUID txTimeID : keys) {
+ transaction = ecDigestInformation.get(txTimeID);
+ try {
+ dbi.replayTransaction(transaction,
+ ranges); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??)
+ } catch (SQLException e) {
+ logger.error("EC:Rolling back the entire digest replay.");
+ return;
+ }
+ logger.info("EC: Successfully replayed transaction ");
+
+ try {
+ mi.updateNodeInfoTableWithTxTimeIDKey(txTimeID, nodeName);
+ } catch (MDBCServiceException e) {
+ logger.error("EC:Rolling back the entire digest replay.");
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ logger.info("MusicTxDigest background daemon started");
+ if (stateManager == null) {
+ logger.error("State manager is null in background daemon");
+ return;
+ }
+ MusicInterface mi = stateManager.getMusicInterface();
+
+ if (mi == null) {
+ logger.error("Music interface or DB interface is null in background daemon");
+ return;
+ }
+ while (true) {
+ try {
+ MdbcConnection conn = (MdbcConnection) stateManager.getConnection("daemon");
+ if (conn == null) {
+ logger.error("Connection created is null in background daemon");
+ return;
+ }
+ DBInterface dbi = (conn).getDBInterface();
+ //update
+ logger.info(String.format("[%s] Background MusicTxDigest daemon updating local db",
+ new Timestamp(System.currentTimeMillis())));
+
+ //1) get all other partitions from musicrangeinformation
+ List<UUID> partitions = null;
+ try {
+ partitions = mi.getPartitionIndexes();
+ } catch (MDBCServiceException e) {
+ logger.error("Error obtainting partition indexes, trying again next iteration");
+ continue;
+ }
+ //2) for each partition I don't own
+ final Set<Range> warmupRanges = stateManager.getRangesToWarmup();
+ final List<DatabasePartition> currentPartitions = stateManager.getPartitions();
+ List<Range> missingRanges = new ArrayList<>();
+ if (currentPartitions.size() != 0) {
+ for (DatabasePartition part : currentPartitions) {
+ List<Range> partitionRanges = part.getSnapshot();
+ warmupRanges.removeAll(partitionRanges);
+ }
+ try {
+ stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
+ } catch (MDBCServiceException e) {
+ logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
+ continue;
+ }
+ }
+
+ //Step 3: ReplayDigest() for E.C conditions
+ try {
+ replayDigest(mi, dbi, stateManager.getEventualRanges());
+ } catch (MDBCServiceException e) {
+ logger.error("Unable to perform Eventual Consistency operations" + e.getMessage());
+ continue;
+ }
+ conn.close();
+ Thread.sleep(TimeUnit.SECONDS.toMillis(this.daemonSleepTimeS));
+ } catch (InterruptedException | SQLException e) {
+ logger.error("MusicTxDigest background daemon stopped " + e.getMessage(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
index 1c37db0..8fa49a9 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
@@ -22,16 +22,16 @@ package org.onap.music.mdbc.tables;
import java.util.UUID;
public final class MusicTxDigestId {
- public final UUID txId;
+ public final UUID transactionId;
public final int index;
- public MusicTxDigestId(UUID primaryKey, int index) {
- this.txId= primaryKey;
+ public MusicTxDigestId(UUID digestId, int index) {
+ this.transactionId= digestId;
this.index=index;
}
public boolean isEmpty() {
- return (this.txId==null);
+ return (this.transactionId==null);
}
@Override
@@ -40,11 +40,11 @@ public final class MusicTxDigestId {
if(o == null) return false;
if(!(o instanceof MusicTxDigestId)) return false;
MusicTxDigestId other = (MusicTxDigestId) o;
- return other.txId.equals(this.txId);
+ return other.transactionId.equals(this.transactionId);
}
@Override
public int hashCode(){
- return txId.hashCode();
+ return transactionId.hashCode();
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
index a9ab25f..eda6191 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
@@ -43,6 +43,18 @@ public final class Operation implements Serializable{
KEY = key;
}
+ @Override
+ protected Object clone() throws CloneNotSupportedException {
+ Operation clone = null;
+ try {
+ clone = (Operation) super.clone();
+ }
+ catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
+ }
+ return clone;
+ }
+
public String getTable(){
return TABLE;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
index 03c7259..dbed9e4 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
@@ -21,11 +21,14 @@ package org.onap.music.mdbc.tables;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+import javax.validation.constraints.Null;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.Range;
@@ -36,7 +39,7 @@ import org.onap.music.mdbc.proto.ProtoDigest.Digest.Row.OpType;
public class StagingTable {
- private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(StagingTable.class);
+ private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(StagingTable.class);
private ArrayList<Operation> operations;
boolean builderInitialized;
Builder digestBuilder;
@@ -47,6 +50,40 @@ public class StagingTable {
this(new HashSet<>());
logger.debug("Creating staging table with no parameters, most likely this is wrong, unless you are testing");
}
+
+ public StagingTable(StagingTable other) throws CloneNotSupportedException {
+ if(other==null){
+ throw new NullPointerException("Invalid constructor parameter passed, it is null");
+ }
+ //TODO this is a highly inefficient deep copy, please don't use in prod
+ operations=null;
+ if(other.operations!=null) {
+ Iterator<Operation> iterator = other.operations.iterator();
+ operations = new ArrayList<>();
+ while (iterator.hasNext()) {
+ operations.add((Operation) iterator.next().clone());
+ }
+ }
+ builderInitialized=other.builderInitialized;
+ digestBuilder=null;
+ if(other.digestBuilder!=null) {
+ CompleteDigest build = other.digestBuilder.build();
+ digestBuilder = build.toBuilder();
+ }
+ eventuallyBuilder=null;
+ if(other.eventuallyBuilder!=null) {
+ CompleteDigest build2 = other.digestBuilder.build();
+ eventuallyBuilder = build2.toBuilder();
+ }
+ eventuallyConsistentRanges=null;
+ if(other.eventuallyConsistentRanges!=null) {
+ eventuallyConsistentRanges = new HashSet<>();
+ Iterator<Range> rangeIter = other.eventuallyConsistentRanges.iterator();
+ while (rangeIter.hasNext()) {
+ eventuallyConsistentRanges.add(rangeIter.next().clone());
+ }
+ }
+ }
public StagingTable(Set<Range> eventuallyConsistentRanges) {
//operations = new ArrayList<Operation>();
@@ -74,6 +111,67 @@ public class StagingTable {
}
}
+ public static ByteBuffer Compress(ByteBuffer serializedStaging) throws MDBCServiceException {
+ if(serializedStaging.hasArray()) {
+ //\TODO: Use JAVA 11 to simplify this process using ByteBuffer natively
+ Deflater compressor = new Deflater();
+ final byte[] inputArray = serializedStaging.array();
+ compressor.setInput(inputArray);
+ compressor.finish();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(serializedStaging.array().length);
+ byte[] buf = new byte[1024];
+ try {
+ while (!compressor.finished()) {
+ int i = compressor.deflate(buf);
+ bos.write(buf, 0, i);
+ }
+ } finally {
+ compressor.end();
+ try {
+ bos.close();
+ } catch (IOException e) {
+ throw new MDBCServiceException("Error closing ByetArrayOutputStream:",e);
+ }
+ }
+ byte[] output = bos.toByteArray();
+ logger.debug("Staging table compressed from: "+inputArray.length+" to "+output.length);
+ return ByteBuffer.wrap(output);
+ }
+ else{
+ throw new MDBCServiceException("Byte buffer was not created correctly, it should wrap an array");
+ }
+ }
+
+ public static ByteBuffer Decompress(ByteBuffer compressedStaging) throws MDBCServiceException {
+ if(compressedStaging.hasArray()) {
+ //\TODO: Use JAVA 11 to simplify this process using ByteBuffer natively
+ Inflater decompressor = new Inflater();
+ byte[] inputArray = compressedStaging.array();
+ decompressor.setInput(inputArray);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream(inputArray.length);
+ byte[] buffer = new byte[1024];
+ while (!decompressor.finished()) {
+ int decompressSize = 0;
+ try {
+ decompressSize = decompressor.inflate(buffer);
+ } catch (DataFormatException e) {
+ throw new MDBCServiceException("error decompressing input data",e);
+ }
+ outputStream.write(buffer, 0, decompressSize);
+ }
+ try {
+ outputStream.close();
+ } catch (IOException e) {
+ throw new MDBCServiceException("Error closing output byte stream",e);
+ }
+ byte[] output = outputStream.toByteArray();
+ return ByteBuffer.wrap(output);
+ }
+ else{
+ throw new MDBCServiceException("Byte buffer was not created correctly, it should wrap an array");
+ }
+ }
+
synchronized public boolean isBuilderInitialized(){
return isBuilderInitialized();
}
@@ -108,7 +206,7 @@ public class StagingTable {
}
logger.warn("Get operation list with this type of initialization is not suggested for the"
+ "staging table");
- ArrayList newOperations = new ArrayList();
+ ArrayList<Operation> newOperations = new ArrayList<>();
for(Row row : digestBuilder.getRowsList()){
final OpType type = row.getType();
OperationType newType = (type==OpType.INSERT)?OperationType.INSERT:(type==OpType.DELETE)?
@@ -123,9 +221,10 @@ public class StagingTable {
throw new MDBCServiceException("This type of staging table is unmutable, please use the constructor"
+ "with no parameters");
}
- ByteString serialized = digestBuilder.build().toByteString();
+ byte[] bytes = digestBuilder.build().toByteArray();
+ ByteBuffer serialized = ByteBuffer.wrap(bytes);
digestBuilder.clear();
- return serialized.asReadOnlyByteBuffer();
+ return serialized;
}
synchronized public ByteBuffer getSerializedEventuallyStagingAndClean() throws MDBCServiceException {
@@ -136,9 +235,10 @@ public class StagingTable {
if(eventuallyBuilder == null || eventuallyBuilder.getRowsCount()==0){
return null;
}
- ByteString serialized = eventuallyBuilder.build().toByteString();
+ byte[] bytes = eventuallyBuilder.build().toByteArray();
+ ByteBuffer serialized = ByteBuffer.wrap(bytes);
eventuallyBuilder.clear();
- return serialized.asReadOnlyByteBuffer();
+ return serialized;
}
synchronized public boolean isEmpty() {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/ClusterSetup.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/ClusterSetup.java
new file mode 100644
index 0000000..d560915
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/ClusterSetup.java
@@ -0,0 +1,80 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+package org.onap.music.mdbc.tools;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import java.io.FileNotFoundException;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.main.MusicUtil;
+import org.onap.music.mdbc.configurations.ClusterConfiguration;
+
+public class ClusterSetup {
+ public static final EELFLoggerDelegate LOG = EELFLoggerDelegate.getLogger(ClusterSetup.class);
+
+ @Parameter(names = { "-c", "--configuration" }, required = true,
+ description = "This is the input file that is going to have the configuration to setup the cluster")
+ private String configurationFile;
+ @Parameter(names = { "-h", "-help", "--help" }, help = true,
+ description = "Print the help message")
+ private boolean help = false;
+
+ private ClusterConfiguration inputConfig;
+
+ public ClusterSetup(){}
+
+
+ public void readInput(){
+ LOG.info("Reading inputs");
+ try {
+ inputConfig = ClusterConfiguration.readJsonFromFile(configurationFile);
+ } catch (FileNotFoundException e) {
+ LOG.error("Input file is invalid or not found");
+ System.exit(1);
+ }
+ }
+
+ public void createAll() throws MDBCServiceException {
+ inputConfig.initNamespaces();
+ inputConfig.initTables();
+ }
+
+ public static void main(String[] args) {
+ LOG.info("Starting cassandra cluster initializer");
+ LOG.info("Using music file configuration:"+ MusicUtil.getMusicPropertiesFilePath());
+ ClusterSetup configs = new ClusterSetup();
+ @SuppressWarnings("deprecation")
+ JCommander jc = new JCommander(configs, args);
+ if (configs.help) {
+ jc.usage();
+ System.exit(1);
+ return;
+ }
+ configs.readInput();
+ try {
+ configs.createAll();
+ } catch (MDBCServiceException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ System.exit(0);
+ }
+}
diff --git a/mdbc-server/src/main/resources/music.properties b/mdbc-server/src/main/resources/music.properties
index 0b34ff9..1aaf7fd 100755
--- a/mdbc-server/src/main/resources/music.properties
+++ b/mdbc-server/src/main/resources/music.properties
@@ -5,4 +5,4 @@ cassandra.user =\
cassandra.password =\
cassandra
music_namespace =\
- mdbc_namespace \ No newline at end of file
+ mdbc_namespace