diff options
Diffstat (limited to 'mdbc-server/src')
26 files changed, 818 insertions, 682 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java index 7377c4f..72b8899 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java @@ -53,7 +53,7 @@ import org.onap.music.mdbc.ownership.Dag; import org.onap.music.mdbc.ownership.DagNode; import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint; import org.onap.music.mdbc.query.QueryProcessor; -import org.onap.music.mdbc.tables.MusicTxDigest; +import org.onap.music.mdbc.tables.MusicTxDigestDaemon; import org.onap.music.mdbc.tables.MusicRangeInformationRow; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.TxCommitProgress; @@ -115,6 +115,10 @@ public class MdbcConnection implements Connection { logger.debug("Mdbc connection created with id: "+id); } + public DBInterface getDatabaseInterface(){ + return this.dbi; + } + @Override public <T> T unwrap(Class<T> iface) throws SQLException { logger.error(EELFLoggerDelegate.errorLogger, "proxyconn unwrap: " + iface.getName()); @@ -263,6 +267,11 @@ public class MdbcConnection implements Connection { jdbcConn.close(); logger.debug("Connection was closed for id:" + id); } + try { + mi.relinquish(partition.getLockId(),partition.getMRIIndex().toString()); + } catch (MDBCServiceException e) { + throw new SQLException("Failure during relinquish of partition",e); + } } @Override @@ -509,7 +518,7 @@ public class MdbcConnection implements Connection { this.partition.updateDatabasePartition(tempPartition); statemanager.getOwnAndCheck().reloadAlreadyApplied(this.partition); } - dbi.preStatementHook(sql); + dbi.preStatementHook(sql); } @@ -603,4 +612,16 @@ public class MdbcConnection implements Connection { mi.relinquishIfRequired(partition); } + public Connection getConnection(){ + return jdbcConn; + } + + public DatabasePartition getPartition() { + return partition; + } + + public StagingTable getTransactionDigest(){ + return transactionDigest; + } + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java index 1712c30..42b9710 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java @@ -20,7 +20,7 @@ package org.onap.music.mdbc; import org.onap.music.mdbc.configurations.NodeConfiguration; -import org.onap.music.mdbc.tables.MusicTxDigest; +import org.onap.music.mdbc.tables.MusicTxDigestDaemon; import org.apache.calcite.avatica.remote.Driver.Serialization; import org.apache.calcite.avatica.remote.LocalService; import org.apache.calcite.avatica.server.HttpServer; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java index 9fb36ae..8f79840 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java @@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.mdbc.configurations.NodeConfiguration; -import org.onap.music.mdbc.tables.MusicTxDigest; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java index c498952..16e7170 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Range.java @@ -30,9 +30,7 @@ import java.util.Objects; * In the future we may decide to partition ranges differently * @author Enrique Saurez */ -public class Range implements Serializable, Cloneable{ - - private static final long serialVersionUID = 1610744496930800088L; +public class Range implements Cloneable{ private String table; @@ -61,7 +59,7 @@ public class Range implements Serializable, Cloneable{ } @Override - protected Range clone() { + public Range clone() { Range newRange = null; try{ newRange = (Range) super.clone(); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index f0d9832..1105bda 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; -import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.tuple.Pair; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; @@ -36,7 +35,7 @@ import org.onap.music.mdbc.mixins.MusicInterface; import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn; import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint; import org.onap.music.mdbc.tables.MriReference; -import org.onap.music.mdbc.tables.MusicTxDigest; +import org.onap.music.mdbc.tables.MusicTxDigestDaemon; import org.onap.music.mdbc.tables.TxCommitProgress; import java.io.IOException; @@ -48,7 +47,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Properties; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -95,6 +93,7 @@ public class StateManager { /** map of transactions that have already been applied/updated in this sites SQL db */ private Map<Range, Pair<MriReference, Integer>> alreadyApplied; private OwnershipAndCheckpoint ownAndCheck; + private Thread txDaemon ; /** * For testing purposes only @@ -122,19 +121,22 @@ public class StateManager { musicmixin = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT); initMusic(); - initSqlDatabase(); - + initSqlDatabase(); + initTxDaemonThread(); String t = info.getProperty(Configuration.KEY_OWNERSHIP_TIMEOUT); - long timeoutMs = (t == null) ? Configuration.DEFAULT_OWNERSHIP_TIMEOUT : Integer.parseInt(t); + long timeout = (t == null) ? Configuration.DEFAULT_OWNERSHIP_TIMEOUT : Integer.parseInt(t); alreadyApplied = new ConcurrentHashMap<>(); - ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeoutMs); - - rangesToWarmup = initWarmupRanges(); - logger.info("Warmup ranges for this site is " + rangesToWarmup); + ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout); + } - MusicTxDigest txDaemon = new MusicTxDigest(this); - txDaemon.startBackgroundDaemon(Integer.parseInt( - info.getProperty(Configuration.TX_DAEMON_SLEEPTIME_S, Configuration.TX_DAEMON_SLEEPTIME_S_DEFAULT))); + protected void initTxDaemonThread(){ + txDaemon = new Thread( + new MusicTxDigestDaemon(Integer.parseInt( + info.getProperty(Configuration.TX_DAEMON_SLEEPTIME_S, Configuration.TX_DAEMON_SLEEPTIME_S_DEFAULT)), + this)); + txDaemon.setName("TxDaemon"); + txDaemon.setDaemon(true); + txDaemon.start(); } /** @@ -163,6 +165,7 @@ public class StateManager { .append(";"); Statement stmt = sqlConnection.createStatement(); stmt.execute(sql.toString()); + sqlConnection.close(); } catch (SQLException e) { logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR); @@ -243,7 +246,7 @@ public class StateManager { returnArray = new ArrayList<>(eventualRanges); } else{ - returnArray= null; + returnArray= new ArrayList<>(); } } finally{ @@ -291,7 +294,7 @@ public class StateManager { } if(connectionRanges.containsKey(connectionId)){ //We relinquish all locks obtained by a given - relinquish(connectionRanges.get(connectionId)); + //relinquish(connectionRanges.get(connectionId)); connectionRanges.remove(connectionId); } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/ClusterConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/ClusterConfiguration.java new file mode 100644 index 0000000..943fd46 --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/ClusterConfiguration.java @@ -0,0 +1,88 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ +package org.onap.music.mdbc.configurations; + +import com.google.gson.Gson; +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.FileReader; +import org.onap.music.datastore.PreparedQueryObject; +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.exceptions.MusicServiceException; +import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.main.MusicCore; +import org.onap.music.mdbc.mixins.MusicMixin; + +public class ClusterConfiguration { + private String internalNamespace; + private int internalReplicationFactor; + private String musicNamespace; + private int musicReplicationFactor; + private String mriTableName; + private String mtxdTableName; + private String eventualMtxdTableName; + private String nodeInfoTableName; + private String rangeDependencyTableName; + + private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(ClusterConfiguration.class); + + public void initNamespaces() throws MDBCServiceException{ + MusicMixin.createKeyspace(internalNamespace,internalReplicationFactor); + MusicMixin.createKeyspace(musicNamespace,musicReplicationFactor); + } + + public void initTables() throws MDBCServiceException{ + MusicMixin.createMusicRangeInformationTable(musicNamespace, mriTableName); + MusicMixin.createMusicTxDigest(mtxdTableName,musicNamespace, -1); + MusicMixin.createMusicEventualTxDigest(eventualMtxdTableName,musicNamespace, -1); + MusicMixin.createMusicNodeInfoTable(nodeInfoTableName,musicNamespace,-1); + MusicMixin.createMusicRangeDependencyTable(musicNamespace,rangeDependencyTableName); + } + + private void initInternalTable() throws MDBCServiceException { + StringBuilder createKeysTableCql = new StringBuilder("CREATE TABLE IF NOT EXISTS ") + .append(internalNamespace) + .append(".unsynced_keys (key text PRIMARY KEY);"); + PreparedQueryObject queryObject = new PreparedQueryObject(); + queryObject.appendQueryString(createKeysTableCql.toString()); + try { + MusicCore.createTable(internalNamespace,"unsynced_keys", queryObject,"critical"); + } catch (MusicServiceException e) { + logger.error("Error creating unsynced keys table" ); + throw new MDBCServiceException("Error creating unsynced keys table", e); + } + } + + public static ClusterConfiguration readJsonFromFile(String filepath) throws FileNotFoundException { + BufferedReader br; + try { + br = new BufferedReader( + new FileReader(filepath)); + } catch (FileNotFoundException e) { + logger.error(EELFLoggerDelegate.errorLogger,"File was not found when reading json"+e); + throw e; + } + Gson gson = new Gson(); + ClusterConfiguration config = gson.fromJson(br, ClusterConfiguration.class); + return config; + } + + +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java index 5349219..38309d5 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java @@ -50,6 +50,9 @@ public class NodeConfiguration { } protected List<Range> toRanges(String tables){ + if(tables.isEmpty()){ + return new ArrayList<>(); + } List<Range> newRange = new ArrayList<>(); String[] tablesArray=tables.split(","); for(String table: tablesArray) { diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java index a9d179f..343a8b8 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java @@ -20,14 +20,12 @@ package org.onap.music.mdbc.configurations; import com.datastax.driver.core.ResultSet; -import java.util.stream.Collectors; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; -import org.onap.music.mdbc.RedoRow; import org.onap.music.mdbc.mixins.MusicMixin; -import org.onap.music.mdbc.tables.MusicTxDigest; +import org.onap.music.mdbc.tables.MusicRangeInformationRow; import com.google.gson.Gson; import org.onap.music.datastore.PreparedQueryObject; @@ -48,11 +46,8 @@ public class TablesConfiguration { private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TablesConfiguration.class); private List<PartitionInformation> partitions; - private String internalNamespace; - private int internalReplicationFactor; + String tableToPartitionName; private String musicNamespace; - private int musicReplicationFactor; - private String tableToPartitionName; private String partitionInformationTableName; private String redoHistoryTableName; private String sqlDatabaseName; @@ -67,8 +62,6 @@ public class TablesConfiguration { */ public List<NodeConfiguration> initializeAndCreateNodeConfigurations() throws MDBCServiceException { logger.info("initializing the required spaces"); - createKeyspaces(); - initInternalNamespace(); List<NodeConfiguration> nodeConfigs = new ArrayList<>(); if(partitions == null){ @@ -77,10 +70,7 @@ public class TablesConfiguration { } for(PartitionInformation partitionInfo : partitions){ String mriTableName = partitionInfo.mriTableName; - checkIfMriIsEmpty(mriTableName); - //0) Create the corresponding Music Range Information table - MusicMixin.createMusicRangeInformationTable(musicNamespace,mriTableName); - + checkIfMriDoesNotExists(mriTableName,partitionInfo); String partitionId; if(partitionInfo.partitionId==null || partitionInfo.partitionId.isEmpty()){ //1) Create a row in the partition info table @@ -108,24 +98,7 @@ public class TablesConfiguration { return nodeConfigs; } - private void createKeyspaces() throws MDBCServiceException { - MusicMixin.createKeyspace(internalNamespace,internalReplicationFactor); - MusicMixin.createKeyspace(musicNamespace,musicReplicationFactor); - - } - - private void checkIfMriIsEmpty(String mriTableName) throws MDBCServiceException { - //First check if table exists - StringBuilder checkTableExistsString = new StringBuilder("SELECT table_name FROM system_schema.tables WHERE keyspace_name='") - .append(musicNamespace) - .append("';"); - PreparedQueryObject checkTableExists = new PreparedQueryObject(); - checkTableExists.appendQueryString(checkTableExistsString.toString()); - final ResultSet resultSet = MusicCore.quorumGet(checkTableExists); - if(resultSet.isExhausted()){ - //Table doesn't exist - return; - } + private void checkIfMriDoesNotExists(String mriTableName, PartitionInformation partition) throws MDBCServiceException { //If exists, check if empty StringBuilder checkRowsInTableString = new StringBuilder("SELECT * FROM ") .append(musicNamespace) @@ -134,27 +107,19 @@ public class TablesConfiguration { .append("';"); PreparedQueryObject checkRowsInTable = new PreparedQueryObject(); checkRowsInTable.appendQueryString(checkRowsInTableString.toString()); - final ResultSet resultSet2 = MusicCore.quorumGet(checkTableExists); - if(!resultSet2.isExhausted()) { - throw new MDBCServiceException("When initializing the configuration of the system, the MRI should not exits " - + "be empty"); + final ResultSet resultSet = MusicCore.quorumGet(checkRowsInTable); + while(resultSet!=null && !resultSet.isExhausted()){ + final MusicRangeInformationRow mriRowFromCassandraRow = MusicMixin.getMRIRowFromCassandraRow(resultSet.one()); + List<Range> ranges = mriRowFromCassandraRow.getDBPartition().getSnapshot(); + for(Range range: partition.getTables()) { + if (Range.overlaps(ranges,range.getTable())){ + throw new MDBCServiceException("MRI row already exists"); + } + } } } - private void initInternalNamespace() throws MDBCServiceException { - StringBuilder createKeysTableCql = new StringBuilder("CREATE TABLE IF NOT EXISTS ") - .append(internalNamespace) - .append(".unsynced_keys (key text PRIMARY KEY);"); - PreparedQueryObject queryObject = new PreparedQueryObject(); - queryObject.appendQueryString(createKeysTableCql.toString()); - try { - MusicCore.createTable(internalNamespace,"unsynced_keys", queryObject,"critical"); - } catch (MusicServiceException e) { - logger.error("Error creating unsynced keys table" ); - throw new MDBCServiceException("Error creating unsynced keys table", e); - } - } public static TablesConfiguration readJsonFromFile(String filepath) throws FileNotFoundException { BufferedReader br; @@ -174,7 +139,6 @@ public class TablesConfiguration { private List<Range> tables; private String owner; private String mriTableName; - private String mtxdTableName; private String partitionId; public List<Range> getTables() { @@ -209,12 +173,5 @@ public class TablesConfiguration { this.partitionId = partitionId; } - public String getMtxdTableName(){ - return mtxdTableName; - } - - public void setMtxdTableName(String mtxdTableName) { - this.mtxdTableName = mtxdTableName; - } } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java index 8ca8517..60c97d1 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java @@ -51,8 +51,8 @@ public class MdbcTestClient { }
Connection connection;
try {
- String metricURL = "http://localhost:300000/test";
- if(args[0] != null) {
+ String metricURL = "http://localhost:30000/test";
+ if (args.length>0 && args[0] != null) {
metricURL = args[0];
}
connection = DriverManager.getConnection("jdbc:avatica:remote:url=" + metricURL+ ";serialization=protobuf");
@@ -60,7 +60,6 @@ public class MdbcTestClient { e.printStackTrace();
return;
}
-
try {
connection.setAutoCommit(false);
} catch (SQLException e) {
@@ -68,7 +67,6 @@ public class MdbcTestClient { return;
}
-
final String sql = "CREATE TABLE IF NOT EXISTS Persons (\n" +
" PersonID int,\n" +
" LastName varchar(255),\n" +
@@ -84,7 +82,6 @@ public class MdbcTestClient { e.printStackTrace();
return;
}
-
boolean execute = true;
// try {
// execute = stmt.execute(sql);
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java index a514dc6..a594918 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java @@ -19,6 +19,7 @@ */ package org.onap.music.mdbc.mixins; +import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; @@ -130,4 +131,6 @@ public interface DBInterface { void enableForeignKeyChecks() throws SQLException; void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException; + + Connection getSQLConnection(); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java index 00f6d00..8b91b28 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java @@ -21,11 +21,8 @@ package org.onap.music.mdbc.mixins; import com.datastax.driver.core.ResultSet; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; + import org.json.JSONObject; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.exceptions.MusicLockingException; @@ -230,12 +227,12 @@ public interface MusicInterface { /** * This function is used to append an index to the redo log in a MRI row - * @param partition information related to ownership of partitions, used to verify ownership + * @param MRIIndex index of the row to which the record is going to be added (obtained from the Partition) + * @param lockId reference to lock associated to the row in the MRI table MRIIndex. * @param newRecord index of the new record to be appended to the redo log * @throws MDBCServiceException */ - void appendToRedoLog( DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException; - + void appendToRedoLog(UUID MRIIndex, String lockId, MusicTxDigestId newRecord)throws MDBCServiceException; /** * This functions adds the tx digest to * @param newId id used as index in the MTD table @@ -261,7 +258,7 @@ public interface MusicInterface { * @throws MDBCServiceException */ - public LinkedHashMap<UUID, StagingTable> getEveTxDigest(String nodeName) throws MDBCServiceException; + LinkedHashMap<UUID, StagingTable> getEveTxDigest(String nodeName) throws MDBCServiceException; /** * Function used to retrieve a given transaction digest and deserialize it * @param id of the transaction digest to be retrieved @@ -277,17 +274,6 @@ public interface MusicInterface { */ void relinquishIfRequired(DatabasePartition partition) throws MDBCServiceException; - /** - * This function is in charge of owning all the ranges requested and creating a new row that show the ownership of all - * those ranges. - * @param rangeId new id to be used in the new row - * @param ranges ranges to be owned by the end of the function called - * @param partition current ownership status - * @return - * @throws MDBCServiceException - */ - //OwnershipReturn appendRange(String rangeId, List<Range> ranges, DatabasePartition partition) throws MDBCServiceException; - /** * This functions relinquishes a range * @param ownerId id of the current ownerh @@ -318,10 +304,17 @@ public interface MusicInterface { List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException; - public void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException; - public LockResult requestLock(LockRequest request) throws MDBCServiceException; - public void releaseLocks(Map<UUID, LockResult> newLocks) throws MDBCServiceException; - public OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges, + + void deleteMriRow(MusicRangeInformationRow row) throws MDBCServiceException; + + void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException; + + LockResult requestLock(LockRequest request) throws MDBCServiceException; + + void releaseLocks(Map<UUID, LockResult> newLocks) throws MDBCServiceException; + + OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges, Map<UUID, LockResult> locks, UUID ownershipId) throws MDBCServiceException; + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index c6cc512..e548f1a 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -37,8 +37,9 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.BiFunction; +import java.util.concurrent.*; + +import com.datastax.driver.core.*; import org.apache.commons.lang3.tuple.Pair; import org.json.JSONObject; import org.onap.music.datastore.Condition; @@ -67,14 +68,6 @@ import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.RangeDependency; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.TxCommitProgress; -import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.ColumnDefinitions; -import com.datastax.driver.core.DataType; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.TupleValue; /** * This class provides the methods that MDBC needs to access Cassandra directly in order to provide persistence @@ -106,16 +99,19 @@ public class MusicMixin implements MusicInterface { public static final String KEY_MUSIC_RFACTOR = "music_rfactor"; /** The property name to use to provide the replication factor for Cassandra. */ public static final String KEY_MUSIC_NAMESPACE = "music_namespace"; + /** The property name to use to provide a flag indicating if compression is required */ + public static final String KEY_COMPRESSION = "mdbc_compression"; /** Namespace for the tables in MUSIC (Cassandra) */ public static final String DEFAULT_MUSIC_NAMESPACE = "namespace"; /** The default property value to use for the Cassandra IP address. */ public static final String DEFAULT_MUSIC_ADDRESS = "localhost"; /** The default property value to use for the Cassandra replication factor. */ - public static final int DEFAULT_MUSIC_RFACTOR = 1; + public static final int DEFAULT_MUSIC_RFACTOR = 3; /** The default primary string column, if none is provided. */ public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid"; /** Type of the primary key, if none is defined by the user */ public static final String MDBC_PRIMARYKEY_TYPE = "uuid"; + public static final boolean DEFAULT_COMPRESSION = true; //\TODO Add logic to change the names when required and create the tables when necessary @@ -178,9 +174,12 @@ public class MusicMixin implements MusicInterface { //typemap.put(Types.DATE, "TIMESTAMP"); } + protected final String music_ns; protected final String myId; protected final String[] allReplicaIds; + protected ExecutorService commitExecutorThreads; + private final String musicAddress; private final int music_rfactor; private MusicConnector mCon = null; @@ -189,7 +188,7 @@ public class MusicMixin implements MusicInterface { private Map<String, PreparedStatement> ps_cache = new HashMap<>(); private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>()); private StateManager stateManager; - + private boolean useCompression; public MusicMixin() { @@ -206,14 +205,16 @@ public class MusicMixin implements MusicInterface { // Default to using the host_ids of the various peers as the replica IDs (this is probably preferred) this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS); logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: musicAddress="+musicAddress); - - String s = info.getProperty(KEY_MUSIC_RFACTOR); - this.music_rfactor = (s == null) ? DEFAULT_MUSIC_RFACTOR : Integer.parseInt(s); + MusicDataStore dsHandle = null; + try { + dsHandle = MusicDataStoreHandle.getDSHandle(); + } catch (MusicServiceException e) { + e.printStackTrace(); + } this.myId = info.getProperty(KEY_MY_ID, getMyHostId()); logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: myId="+myId); - this.allReplicaIds = info.getProperty(KEY_REPLICAS, getAllHostIds()).split(","); logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: allReplicaIds="+info.getProperty(KEY_REPLICAS, this.myId)); @@ -222,7 +223,14 @@ public class MusicMixin implements MusicInterface { this.stateManager = stateManager; + String c = info.getProperty(KEY_COMPRESSION); + this.useCompression = (c == null) ? DEFAULT_COMPRESSION: Boolean.parseBoolean(c); + + String s = info.getProperty(KEY_MUSIC_RFACTOR); + this.music_rfactor = (s == null) ? DEFAULT_MUSIC_RFACTOR : Integer.parseInt(s); + initializeMetricTables(); + commitExecutorThreads = Executors.newFixedThreadPool(4); } public String getMusicTxDigestTableName(){ @@ -244,8 +252,16 @@ public class MusicMixin implements MusicInterface { public static void createKeyspace(String keyspace, int replicationFactor) throws MDBCServiceException { Map<String,Object> replicationInfo = new HashMap<>(); - replicationInfo.put("'class'", "'SimpleStrategy'"); - replicationInfo.put("'replication_factor'", replicationFactor); + replicationInfo.put("'class'", "'NetworkTopologyStrategy'"); + if(replicationFactor==3){ + replicationInfo.put("'dc1'", 1); + replicationInfo.put("'dc2'", 1); + replicationInfo.put("'dc3'", 1); + } + else { + replicationInfo.put("'class'", "'SimpleStrategy'"); + replicationInfo.put("'replication_factor'", replicationFactor); + } PreparedQueryObject queryObject = new PreparedQueryObject(); queryObject.appendQueryString( @@ -317,7 +333,7 @@ public class MusicMixin implements MusicInterface { createMusicEventualTxDigest(); createMusicNodeInfoTable(); createMusicRangeInformationTable(this.music_ns,this.musicRangeInformationTableName); - createMusicRangeDependencyTable(); + createMusicRangeDependencyTable(this.music_ns,this.musicRangeDependencyTableName); } catch(MDBCServiceException e){ logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC"); @@ -1207,67 +1223,6 @@ public class MusicMixin implements MusicInterface { return previous.keySet().equals(current.keySet()); } - protected List<Range> waitForLock(LockRequest request, DatabasePartition partition, - Map<UUID, LockResult> rowLock) throws MDBCServiceException { - List<Range> newRanges = new ArrayList<>(); - if(partition.getMRIIndex()!=request.getId()){ - throw new MDBCServiceException("Invalid argument for wait for lock, range id in request and partition should match"); - } - String fullyQualifiedKey= music_ns+"."+ request.getTable()+"."+request.getId(); - String lockId = MusicCore.createLockReference(fullyQualifiedKey); - ReturnType lockReturn = acquireLock(fullyQualifiedKey,lockId); - if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) { - //\TODO Improve the exponential backoff - List<Range> pendingToLock = request.getToLockRanges(); - Map<UUID, String> currentLockRef = new HashMap<>(); - int n = 1; - int low = 1; - int high = 1000; - Random r = new Random(); - Map<Range, RangeMriRow> rangeRows = findRangeRows(pendingToLock); - NavigableMap<UUID, List<Range>> rowsToLock = getPendingRows(rangeRows); - NavigableMap<UUID, List<Range>> prevRows = new TreeMap<>(); - while (!pendingToLock.isEmpty() && isDifferent(prevRows,rowsToLock) ) { - pendingToLock.clear(); - try { - Thread.sleep(((int) Math.round(Math.pow(2, n)) * 1000) - + (r.nextInt(high - low) + low)); - } catch (InterruptedException e) { - continue; - } - n++; - if (n == 20) { - throw new MDBCServiceException("Lock was impossible to obtain, waited for 20 exponential backoffs!"); - } - //\TODO do this in parallel - //\TODO there is a race condition here, from the time we get the find range rows, to the time we lock the row, - //\TODO this race condition can only be solved if require to obtain lock to all related rows in MRI - //\TODO before fully owning the range - //\TODO The rows need to be lock in increasing order of timestamp - //there could be a new row created - // Note: This loop needs to be perfomed in sorted order of timebased UUID - for (Map.Entry<UUID, List<Range>> pending : rowsToLock.entrySet()) { - List<Range> rs = lockRow(request, pending, currentLockRef, fullyQualifiedKey, lockId, pendingToLock, rowLock); - newRanges.addAll(rs); - } - if (n++ == 20) { - throw new MDBCServiceException( - "Lock was impossible to obtain, waited for 20 exponential backoffs!"); - } - rangeRows = findRangeRows(pendingToLock); - prevRows = rowsToLock; - rowsToLock = getPendingRows(rangeRows); - } - } - else { - partition.setLockId(lockId); - rowLock.put(partition.getMRIIndex(),new LockResult(partition.getMRIIndex(), lockId, true, partition.getSnapshot())); - } - return newRanges; - } - - - protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException { UUID mriIndex = partition.getMRIIndex(); String lockId; @@ -1315,13 +1270,14 @@ public class MusicMixin implements MusicInterface { } } - protected void appendIndexToMri(String lockId, UUID commitId, UUID MriIndex) throws MDBCServiceException{ - PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MriIndex, musicTxDigestTableName, commitId); - ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, MriIndex.toString(), appendQuery, lockId, null); - if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){ - logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage()); - throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage()); + public void createAndAddTxDigest(final StagingTable transactionDigest, UUID digestId) + throws MDBCServiceException { + ByteBuffer serializedTransactionDigest; + serializedTransactionDigest = transactionDigest.getSerializedStagingAndClean(); + if(useCompression){ + serializedTransactionDigest = StagingTable.Compress(serializedTransactionDigest); } + addTxDigest(digestId, serializedTransactionDigest); } /** @@ -1331,9 +1287,7 @@ public class MusicMixin implements MusicInterface { @Override public void commitLog(DatabasePartition partition,List<Range> eventualRanges, StagingTable transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException { - // first deal with commit for eventually consistent tables - filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper); - + if(partition==null){ logger.warn("Trying tcommit log with null partition"); return; @@ -1343,7 +1297,9 @@ public class MusicMixin implements MusicInterface { logger.warn("Trying to commit log with empty ranges"); return; } - + + // first deal with commit for eventually consistent tables + filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper); UUID mriIndex = partition.getMRIIndex(); String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex; @@ -1353,54 +1309,70 @@ public class MusicMixin implements MusicInterface { throw new MDBCServiceException("Not able to commit, as you are no longer the lock-holder for this partition"); } - UUID commitId; - //Generate a local commit id - if(progressKeeper.containsTx(txId)) { - commitId = progressKeeper.getCommitId(txId); - } - else{ - logger.error(EELFLoggerDelegate.errorLogger, "Tx with id "+txId+" was not created in the TxCommitProgress "); - throw new MDBCServiceException("Tx with id "+txId+" was not created in the TxCommitProgress "); - } //Add creation type of transaction digest - - //1. Push new row to RRT and obtain its index if(transactionDigest == null || transactionDigest.isEmpty()) { return; } - - ByteBuffer serializedTransactionDigest; - if(!transactionDigest.isEmpty()) { - serializedTransactionDigest = transactionDigest.getSerializedStagingAndClean(); - MusicTxDigestId digestId = new MusicTxDigestId(mriIndex, -1); - addTxDigest(digestId, serializedTransactionDigest); - //2. Save RRT index to RQ - if (progressKeeper != null) { - progressKeeper.setRecordId(txId, digestId); + + final MusicTxDigestId digestId = new MusicTxDigestId(MDBCUtils.generateUniqueKey(), -1); + Callable<Boolean> insertDigestCallable =()-> { + try { + createAndAddTxDigest(transactionDigest,digestId.transactionId); + return true; + } catch (MDBCServiceException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Error creating and pushing tx digest to music",e); + return false; } - //3. Append RRT index into the corresponding TIT row array - appendToRedoLog(partition, digestId); - List<Range> ranges = partition.getSnapshot(); - for(Range r : ranges) { - Map<Range, Pair<MriReference, Integer>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied(); - if(!alreadyApplied.containsKey(r)){ - throw new MDBCServiceException("already applied data structure was not updated correctly and range " + }; + Callable<Boolean> appendCallable=()-> { + try { + appendToRedoLog(music_ns, mriIndex, digestId.transactionId, lockId, musicTxDigestTableName, + musicRangeInformationTableName); + return true; + } catch (MDBCServiceException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Error creating and pushing tx digest to music",e); + return false; + } + }; + + Future<Boolean> appendResultFuture = commitExecutorThreads.submit(appendCallable); + Future<Boolean> digestFuture = commitExecutorThreads.submit(insertDigestCallable); + try { + //Boolean appendResult = appendResultFuture.get(); + Boolean digestResult = digestFuture.get(); + if(/*!appendResult ||*/ !digestResult){ + logger.error(EELFLoggerDelegate.errorLogger, "Error appending to log or adding tx digest"); + throw new MDBCServiceException("Error appending to log or adding tx digest"); + } + } catch (InterruptedException|ExecutionException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Error executing futures for creating and pushing tx " + + "digest to music",e); + throw new MDBCServiceException("Failure when retrieving futures for execution of digestion creation and append", e); + } + + if (progressKeeper != null) { + progressKeeper.setRecordId(txId, digestId); + } + List<Range> ranges = partition.getSnapshot(); + for(Range r : ranges) { + Map<Range, Pair<MriReference, Integer>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied(); + if(!alreadyApplied.containsKey(r)){ + throw new MDBCServiceException("already applied data structure was not updated correctly and range " +r+" is not contained"); - } - Pair<MriReference, Integer> rowAndIndex = alreadyApplied.get(r); - MriReference key = rowAndIndex.getKey(); - if(!mriIndex.equals(key.index)){ - throw new MDBCServiceException("already applied data structure was not updated correctly and range "+ - r+" is not pointing to row: "+mriIndex.toString()); - } - alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), rowAndIndex.getValue()+1)); } + Pair<MriReference, Integer> rowAndIndex = alreadyApplied.get(r); + MriReference key = rowAndIndex.getKey(); + if(!mriIndex.equals(key.index)){ + throw new MDBCServiceException("already applied data structure was not updated correctly and range "+ + r+" is not pointing to row: "+mriIndex.toString()); + } + alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), rowAndIndex.getValue()+1)); } } private void filterAndAddEventualTxDigest(List<Range> eventualRanges, - StagingTable transactionDigest, String txId, - TxCommitProgress progressKeeper) throws MDBCServiceException { + StagingTable transactionDigest, String txId, + TxCommitProgress progressKeeper) throws MDBCServiceException { if(eventualRanges == null || eventualRanges.isEmpty()) { return; @@ -1410,30 +1382,19 @@ public class MusicMixin implements MusicInterface { throw new MDBCServiceException(); } - UUID commitId = getCommitId(txId, progressKeeper); + if(!transactionDigest.isEmpty()) { + ByteBuffer serialized = transactionDigest.getSerializedEventuallyStagingAndClean(); - ByteBuffer serialized = transactionDigest.getSerializedEventuallyStagingAndClean(); + if (serialized!=null && useCompression) { + serialized = StagingTable.Compress(serialized); + } - if(serialized != null ) { - MusicTxDigestId digestId = new MusicTxDigestId(commitId,-1); - addEventualTxDigest(digestId, serialized); + if (serialized != null) { + MusicTxDigestId digestId = new MusicTxDigestId(MDBCUtils.generateUniqueKey(), -1); + addEventualTxDigest(digestId, serialized); + } } - - } - - private UUID getCommitId(String txId, TxCommitProgress progressKeeper) - throws MDBCServiceException { - UUID commitId; - //Generate a local commit id - if(progressKeeper.containsTx(txId)) { - commitId = progressKeeper.getCommitId(txId); - } - else{ - logger.error(EELFLoggerDelegate.errorLogger, "Tx with id "+txId+" was not created in the TxCommitProgress "); - throw new MDBCServiceException("Tx with id "+txId+" was not created in the TxCommitProgress "); - } - return commitId; } /** @@ -1503,7 +1464,7 @@ public class MusicMixin implements MusicInterface { return partitions; } - public MusicRangeInformationRow getMRIRowFromCassandraRow(Row newRow){ + static public MusicRangeInformationRow getMRIRowFromCassandraRow(Row newRow){ UUID partitionIndex = newRow.getUUID("rangeid"); List<TupleValue> log = newRow.getList("txredolog",TupleValue.class); List<MusicTxDigestId> digestIds = new ArrayList<>(); @@ -1608,9 +1569,13 @@ public class MusicMixin implements MusicInterface { DatabasePartition newPartition = info.getDBPartition(); String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+newPartition.getMRIIndex().toString(); - String lockId = createAndAssignLock(fullyQualifiedMriKey, newPartition); + String lockId; + int counter=0; + do { + lockId = createAndAssignLock(fullyQualifiedMriKey, newPartition); //TODO: fix this retry logic - if(lockId == null || lockId.isEmpty()){ + } while ((lockId ==null||lockId.isEmpty())&&(counter++<3)); + if (lockId == null || lockId.isEmpty()) { throw new MDBCServiceException("Error initializing music range information, error creating a lock for a new row" + "for key "+fullyQualifiedMriKey) ; } @@ -1632,7 +1597,7 @@ public class MusicMixin implements MusicInterface { .append(rangeAndDependencies.getRange().getTable()) .append(",{"); boolean first=true; - for(Range r: rangeAndDependencies.dependentRanges()){ + for (Range r: rangeAndDependencies.dependentRanges()) { if(first){ first=false; } else { insert.append(','); @@ -1658,7 +1623,7 @@ public class MusicMixin implements MusicInterface { .append(id) .append(",{"); boolean first=true; - for(Range r: rangesCopy){ + for (Range r: rangesCopy) { if(first){ first=false; } else { insert.append(','); @@ -1706,7 +1671,7 @@ public class MusicMixin implements MusicInterface { .append(id) .append(",{"); boolean first=true; - for(Range r: ranges){ + for (Range r: ranges) { if(first){ first=false; } else { insert.append(','); @@ -1731,24 +1696,32 @@ public class MusicMixin implements MusicInterface { } @Override - public void appendToRedoLog(DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException { - logger.info("Appending to redo log for partition " + partition.getMRIIndex() + " txId=" + newRecord.txId); - PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, partition.getMRIIndex(), - musicTxDigestTableName, newRecord.txId); - ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, partition.getMRIIndex().toString(), - appendQuery, partition.getLockId(), null); - if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){ + public void appendToRedoLog(UUID MRIIndex, String lockId, MusicTxDigestId newRecord) throws MDBCServiceException { + logger.debug("Appending to redo log for partition " + MRIIndex + " txId=" + newRecord.transactionId); + appendToRedoLog(music_ns,MRIIndex,newRecord.transactionId,lockId,musicTxDigestTableName, + musicRangeInformationTableName); + } + + public void appendToRedoLog(String musicNamespace, UUID MRIIndex, UUID transactionId, String lockId, + String musicTxDigestTableName, String musicRangeInformationTableName) + throws MDBCServiceException{ + PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MRIIndex, + musicTxDigestTableName, transactionId); + ReturnType returnType = MusicCore.criticalPut(musicNamespace, musicRangeInformationTableName, MRIIndex.toString(), + appendQuery, lockId, null); + //returnType.getExecutionInfo() + if (returnType.getResult().compareTo(ResultType.SUCCESS) != 0) { logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage()); throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage()); } } public void createMusicTxDigest() throws MDBCServiceException { - createMusicTxDigest(-1); + createMusicTxDigest(this.musicTxDigestTableName,this.music_ns,-1); } public void createMusicEventualTxDigest() throws MDBCServiceException { - createMusicEventualTxDigest(-1); + createMusicEventualTxDigest(musicEventualTxDigestTableName,music_ns,-1); } @@ -1758,9 +1731,9 @@ public class MusicMixin implements MusicInterface { * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later * * TransactionDigest: text that contains all the changes in the transaction */ - private void createMusicEventualTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException { - String tableName = this.musicEventualTxDigestTableName; - if(musicTxDigestTableNumber >= 0) { + public static void createMusicEventualTxDigest(String musicEventualTxDigestTableName, String musicNamespace, int musicTxDigestTableNumber) throws MDBCServiceException { + String tableName = musicEventualTxDigestTableName; + if (musicTxDigestTableNumber >= 0) { tableName = tableName + "-" + Integer.toString(musicTxDigestTableNumber); @@ -1769,12 +1742,13 @@ public class MusicMixin implements MusicInterface { StringBuilder fields = new StringBuilder(); fields.append("txid uuid, "); fields.append("transactiondigest blob, "); + fields.append("compressed boolean, "); fields.append("txTimeId TIMEUUID ");//notice lack of ',' - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey); + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey); try { - executeMusicWriteQuery(this.music_ns,tableName,cql); + executeMusicWriteQuery(musicNamespace,tableName,cql); } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to create redo records table"); + logger.error("Initialization error: Failure to create eventual tx digest table"); throw(e); } } @@ -1786,9 +1760,9 @@ public class MusicMixin implements MusicInterface { * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later * * TransactionDigest: text that contains all the changes in the transaction */ - private void createMusicTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException { - String tableName = this.musicTxDigestTableName; - if(musicTxDigestTableNumber >= 0) { + public static void createMusicTxDigest(String musicTxDigestTableName, String musicNamespace, int musicTxDigestTableNumber) throws MDBCServiceException { + String tableName = musicTxDigestTableName; + if (musicTxDigestTableNumber >= 0) { tableName = tableName + "-" + Integer.toString(musicTxDigestTableNumber); @@ -1796,26 +1770,29 @@ public class MusicMixin implements MusicInterface { String priKey = "txid"; StringBuilder fields = new StringBuilder(); fields.append("txid uuid, "); + fields.append("compressed boolean, "); fields.append("transactiondigest blob ");//notice lack of ',' - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey); + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, + tableName, fields, priKey); try { - executeMusicWriteQuery(this.music_ns,tableName,cql); + executeMusicWriteQuery(musicNamespace,tableName,cql); } catch (MDBCServiceException e) { logger.error("Initialization error: Failure to create redo records table"); throw(e); } } - private void createMusicRangeDependencyTable() throws MDBCServiceException { - String tableName = this.musicRangeDependencyTableName; + public static void createMusicRangeDependencyTable(String musicNamespace,String musicRangeDependencyTableName) + throws MDBCServiceException { + String tableName = musicRangeDependencyTableName; String priKey = "range"; StringBuilder fields = new StringBuilder(); fields.append("range text, "); fields.append("dependencies set<text> ");//notice lack of ',' - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey); try { - executeMusicWriteQuery(this.music_ns,tableName,cql); + executeMusicWriteQuery(musicNamespace,tableName,cql); } catch (MDBCServiceException e) { logger.error("Initialization error: Failure to create redo records table"); throw(e); @@ -1828,18 +1805,23 @@ public class MusicMixin implements MusicInterface { @Override public void addTxDigest(MusicTxDigestId newId, ByteBuffer transactionDigest) throws MDBCServiceException { //\TODO: Save Prepared query to history + addTxDigest(newId.transactionId,transactionDigest); + } + + private void addTxDigest(UUID digestId, ByteBuffer transactionDigest) throws MDBCServiceException{ PreparedQueryObject query = new PreparedQueryObject(); - String cql = String.format("INSERT INTO %s.%s (txid,transactiondigest) VALUES (?,?);",this.music_ns, + String cql = String.format("INSERT INTO %s.%s (txid,transactiondigest,compressed ) VALUES (?,?,?);",this.music_ns, this.musicTxDigestTableName); query.appendQueryString(cql); - query.addValue(newId.txId); + query.addValue(digestId); query.addValue(transactionDigest); + query.addValue(useCompression); //\TODO check if I am not shooting on my own foot try { MusicCore.nonKeyRelatedPut(query,"critical"); } catch (MusicServiceException e) { - logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage()); - throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString(), e); + logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for digest id "+digestId.toString()+ "with error "+e.getErrorMessage()); + throw new MDBCServiceException("Transaction Digest serialization for digest id "+digestId.toString(), e); } } @@ -1854,11 +1836,11 @@ public class MusicMixin implements MusicInterface { this.music_ns + '.' + this.musicEventualTxDigestTableName + - " (txid,transactiondigest,txTimeId) " + + " (txid,transactiondigest,compressed,txTimeId) " + "VALUES (" + - newId.txId + ",'" + - transactionDigest + - "'," + + newId.transactionId+ ",'" + + transactionDigest + "'," + + useCompression + ","+ // "toTimestamp(now())" + "now()" + ");"; @@ -1867,8 +1849,8 @@ public class MusicMixin implements MusicInterface { try { MusicCore.nonKeyRelatedPut(query,"critical"); } catch (MusicServiceException e) { - logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage()); - throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString(), e); + logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.transactionId.toString()+ "with error "+e.getErrorMessage()); + throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.transactionId.toString(), e); } } @@ -1877,17 +1859,21 @@ public class MusicMixin implements MusicInterface { String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName); PreparedQueryObject pQueryObject = new PreparedQueryObject(); pQueryObject.appendQueryString(cql); - pQueryObject.addValue(id.txId); + pQueryObject.addValue(id.transactionId); Row newRow; try { newRow = executeMusicUnlockedQuorumGet(pQueryObject); } catch (MDBCServiceException e) { - logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.txId); + logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.transactionId); throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e); } ByteBuffer digest = newRow.getBytes("transactiondigest"); + Boolean compressed = newRow.getBool("compressed"); StagingTable changes; try { + if(compressed){ + digest = StagingTable.Decompress(digest); + } changes = new StagingTable(digest); } catch (MDBCServiceException e) { logger.error("Deserializng digest failed with an exception:"+e.getErrorMessage()); @@ -1921,10 +1907,14 @@ public class MusicMixin implements MusicInterface { while (!rs.isExhausted()) { Row row = rs.one(); ByteBuffer digest = row.getBytes("transactiondigest"); + Boolean compressed = row.getBool("compressed"); //String txTimeId = row.getString("txtimeid"); //??? UUID txTimeId = row.getUUID("txtimeid"); try { + if(compressed){ + digest=StagingTable.Decompress(digest); + } changes = new StagingTable(digest); } catch (MDBCServiceException e) { logger.error("Deserializng digest failed: "+e.getErrorMessage()); @@ -1935,9 +1925,6 @@ public class MusicMixin implements MusicInterface { return ecDigestInformation; } - - - ResultSet getAllMriCassandraRows() throws MDBCServiceException { StringBuilder cqlOperation = new StringBuilder(); cqlOperation.append("SELECT * FROM ") @@ -1959,34 +1946,6 @@ public class MusicMixin implements MusicInterface { return rows; } - private RangeMriRow findRangeRow(Range range) throws MDBCServiceException { - RangeMriRow row = null; - final ResultSet musicResults = getAllMriCassandraRows(); - while (!musicResults.isExhausted()) { - Row musicRow = musicResults.one(); - final MusicRangeInformationRow mriRow = getMRIRowFromCassandraRow(musicRow); - final List<Range> musicRanges = getRanges(musicRow); - //\TODO optimize this for loop to avoid redudant access - for(Range retrievedRange : musicRanges) { - if (retrievedRange.overlaps(range)) { - if(row==null){ - row = new RangeMriRow(range); - row.setCurrentRow(mriRow); - } - else if(row.getCurrentRow().getTimestamp() < mriRow.getTimestamp()){ - row.addOldRow(row.getCurrentRow()); - row.setCurrentRow(mriRow); - } - } - } - } - if(row==null){ - logger.error("Row in MRI doesn't exist for Range "+range.toString()); - throw new MDBCServiceException("Row in MRI doesn't exist for Range "+range.toString()); - } - return row; - } - /** * This function is used to find all the related uuids associated with the required ranges * @param ranges ranges to be find @@ -2037,18 +1996,6 @@ public class MusicMixin implements MusicInterface { return result; } - private List<Range> lockRow(LockRequest request, DatabasePartition partition,Map<UUID, LockResult> rowLock) - throws MDBCServiceException { - if(partition.getMRIIndex().equals(request.getId()) && partition.isLocked()){ - return new ArrayList<>(); - } - //\TODO: this function needs to be improved, to track possible changes in the owner of a set of ranges - String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+request.getId().toString(); - //return List<Range> knownRanges, UUID mriIndex, String lockId - DatabasePartition newPartition = new DatabasePartition(request.getToLockRanges(),request.getId(),null); - return waitForLock(request,newPartition,rowLock); - } - private void unlockKeyInMusic(String table, String key, String lockref) { String fullyQualifiedKey= music_ns+"."+ table+"."+lockref; MusicCore.destroyLockRef(fullyQualifiedKey,lockref); @@ -2182,8 +2129,6 @@ public class MusicMixin implements MusicInterface { return new OwnershipReturn(ownershipId, ownRow.getOwnerId(), ownRow.getIndex(),ranges,extendedDag); } - - /** * This function is used to check if we need to create a new row in MRI, beacause one of the new ranges is not contained * @param ranges ranges that should be contained in the partition @@ -2199,78 +2144,7 @@ public class MusicMixin implements MusicInterface { return false; } - private Map<UUID,String> mergeMriRows(String newId, Map<UUID,LockResult> lock, DatabasePartition partition) - throws MDBCServiceException { - Map<UUID,String> oldIds = new HashMap<>(); - List<Range> newRanges = new ArrayList<>(); - for (Map.Entry<UUID,LockResult> entry : lock.entrySet()) { - oldIds.put(entry.getKey(),entry.getValue().getOwnerId()); - //\TODO check if we need to do a locked get? Is that even required? - final MusicRangeInformationRow mriRow = getMusicRangeInformation(entry.getKey()); - final DatabasePartition dbPartition = mriRow.getDBPartition(); - newRanges.addAll(dbPartition.getSnapshot()); - } - DatabasePartition newPartition = new DatabasePartition(newRanges,UUID.fromString(newId),null); - String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+newId; - UUID newUUID = UUID.fromString(newId); - LockRequest newRequest = new LockRequest(musicRangeInformationTableName,newUUID,newRanges); - waitForLock(newRequest, newPartition,lock); - if(!lock.containsKey(newUUID)||!lock.get(newUUID).isNewLock()){ - logger.error("When merging rows, lock returned an invalid error"); - throw new MDBCServiceException("When merging MRI rows, lock returned an invalid error"); - } - final LockResult lockResult = lock.get(newUUID); - partition.updateDatabasePartition(newPartition); - logger.info("Creating MRI " +partition.getMRIIndex()+ " for ranges " + partition.getSnapshot()); - createEmptyMriRow(this.music_ns,this.musicRangeInformationTableName,partition.getMRIIndex(),myId, - lockResult.getOwnerId(),partition.getSnapshot(),true); - return oldIds; - } - - private void obtainAllLocks(NavigableMap<UUID, List<Range>> rowsToLock,DatabasePartition partition, - List<Range> newRanges,Map<UUID, LockResult> rowLock) throws MDBCServiceException { - //\TODO: perform this operations in parallel - for(Map.Entry<UUID,List<Range>> row : rowsToLock.entrySet()){ - List<Range> additionalRanges; - try { - LockRequest newRequest = new LockRequest(musicRangeInformationTableName,row.getKey(),row.getValue()); - additionalRanges =lockRow(newRequest, partition, rowLock); - } catch (MDBCServiceException e) { - //TODO: Make a decision if retry or just fail? - logger.error("Error locking row",e); - throw e; - } - newRanges.addAll(additionalRanges); - } - } - -/* @Override - public OwnershipReturn appendRange(String rangeId, List<Range> ranges, DatabasePartition partition) - throws MDBCServiceException { - if(!isAppendRequired(ranges,partition)){ - return new OwnershipReturn(partition.getLockId(),UUID.fromString(rangeId),null,null); - } - Map<Range, RangeMriRow> rows = findRangeRows(ranges); - final NavigableMap<UUID, List<Range>> rowsToLock = getPendingRows(rows); - HashMap<UUID, LockResult> rowLock = new HashMap<>(); - List<Range> newRanges = new ArrayList<>(); - - obtainAllLocks(rowsToLock,partition,newRanges,rowLock); - String lockId; - Map<UUID,String> oldIds = null; - if(rowLock.size()!=1){ - oldIds = mergeMriRows(rangeId, rowLock, partition); - lockId = partition.getLockId(); - } - else{ - List<LockResult> list = new ArrayList<>(rowLock.values()); - LockResult lockResult = list.get(0); - lockId = lockResult.getOwnerId(); - } - - return new OwnershipReturn(lockId,UUID.fromString(rangeId),oldIds,newRanges); - }*/ @Override public void relinquish(String ownerId, String rangeId) throws MDBCServiceException{ @@ -2290,7 +2164,9 @@ public class MusicMixin implements MusicInterface { * @return true if we should try to relinquish, else should avoid relinquishing in this iteration */ private boolean canTryRelinquishing(){ - return true; + //\TODO: Fix this!!!! REALLY IMPORTANT TO BE FIX + // This should actually have some mechanism to relinquish ownership + return false; } @Override @@ -2446,7 +2322,7 @@ public class MusicMixin implements MusicInterface { } public void createMusicNodeInfoTable() throws MDBCServiceException { - createMusicNodeInfoTable(-1); + createMusicNodeInfoTable(musicNodeInfoTableName,music_ns,-1); } /** @@ -2460,8 +2336,8 @@ public class MusicMixin implements MusicInterface { * * TxTimeID, TIMEUUID. * * LastTxDigestID, uuid. (not needed as of now!!) */ - private void createMusicNodeInfoTable(int nodeInfoTableNumber) throws MDBCServiceException { - String tableName = this.musicNodeInfoTableName; + public static void createMusicNodeInfoTable(String musicNodeInfoTableName, String musicNamespace, int nodeInfoTableNumber) throws MDBCServiceException { + String tableName = musicNodeInfoTableName; if(nodeInfoTableNumber >= 0) { tableName = tableName + "-" + @@ -2478,13 +2354,13 @@ public class MusicMixin implements MusicInterface { String cql = String.format( "CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", - this.music_ns, + musicNamespace, tableName, fields, priKey); try { - executeMusicWriteQuery(this.music_ns,tableName,cql); + executeMusicWriteQuery(musicNamespace,tableName,cql); } catch (MDBCServiceException e) { logger.error("Initialization error: Failure to create node information table"); throw(e); @@ -2514,4 +2390,19 @@ public class MusicMixin implements MusicInterface { } + @Override + public void deleteMriRow(MusicRangeInformationRow row) throws MDBCServiceException{ + String cql = String.format("DELETE FROM %s.%s WHERE rangeid = ?;", music_ns, musicRangeInformationTableName); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + pQueryObject.addValue(row.getPartitionIndex()); + ReturnType rt ; + try { + rt = MusicCore.atomicPut(music_ns, musicRangeDependencyTableName, row.getPartitionIndex().toString(), + pQueryObject, null); + } catch (MusicLockingException|MusicQueryException|MusicServiceException e) { + logger.error("Failure when deleting mri row"); + new MDBCServiceException("Error deleting mri row",e); + } + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java index aecee24..820817b 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java @@ -74,13 +74,15 @@ public class MySQLMixin implements DBInterface { public static final String TRANS_TBL = "MDBC_TRANSLOG"; private static final String CREATE_TBL_SQL = "CREATE TABLE IF NOT EXISTS "+TRANS_TBL+ - " (IX INT AUTO_INCREMENT, OP CHAR(1), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA VARCHAR(1024), CONNECTION_ID INT,PRIMARY KEY (IX))"; + " (IX INT AUTO_INCREMENT, OP CHAR(1), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA VARCHAR(1024), " + + "CONNECTION_ID INT, PRIMARY KEY (IX));"; private final MusicInterface mi; private final int connId; private final String dbName; private final Connection jdbcConn; private final Map<String, TableInfo> tables; + private PreparedStatement deleteStagingStatement; private boolean server_tbl_created = false; public MySQLMixin() { @@ -89,14 +91,20 @@ public class MySQLMixin implements DBInterface { this.dbName = null; this.jdbcConn = null; this.tables = null; + this.deleteStagingStatement = null; } - public MySQLMixin(MusicInterface mi, String url, Connection conn, Properties info) { + public MySQLMixin(MusicInterface mi, String url, Connection conn, Properties info) throws SQLException { this.mi = mi; this.connId = generateConnID(conn); this.dbName = getDBName(conn); this.jdbcConn = conn; this.tables = new HashMap<String, TableInfo>(); } + + private PreparedStatement getStagingDeletePreparedStatement() throws SQLException { + return jdbcConn.prepareStatement("DELETE FROM "+TRANS_TBL+" WHERE (IX BETWEEN ? AND ? ) AND " + + "CONNECTION_ID = ?;"); + } // This is used to generate a unique connId for this connection to the DB. private int generateConnID(Connection conn) { int rv = (int) System.currentTimeMillis(); // random-ish @@ -297,6 +305,7 @@ mysql> describe tables; Statement stmt = jdbcConn.createStatement(); stmt.execute(CREATE_TBL_SQL); stmt.close(); + this.deleteStagingStatement = getStagingDeletePreparedStatement(); logger.info(EELFLoggerDelegate.applicationLogger,"createSQLTriggers: Server side dirty table created."); server_tbl_created = true; } catch (SQLException e) { @@ -594,11 +603,15 @@ NEW.field refers to the new value // copy from DB.MDBC_TRANSLOG where connid == myconnid // then delete from MDBC_TRANSLOG String sql2 = "SELECT IX, TABLENAME, OP, ROWDATA,KEYDATA FROM "+TRANS_TBL +" WHERE CONNECTION_ID = " + this.connId; + Integer biggestIx = Integer.MIN_VALUE; + Integer smallestIx = Integer.MAX_VALUE; try { ResultSet rs = executeSQLRead(sql2); Set<Integer> rows = new TreeSet<Integer>(); while (rs.next()) { int ix = rs.getInt("IX"); + biggestIx = Integer.max(biggestIx,ix); + smallestIx = Integer.min(smallestIx,ix); String op = rs.getString("OP"); OperationType opType = toOpEnum(op); String tbl = rs.getString("TABLENAME"); @@ -609,17 +622,13 @@ NEW.field refers to the new value rows.add(ix); } rs.getStatement().close(); + // batch delete operations if (rows.size() > 0) { - //TODO: DO batch deletion - sql2 = "DELETE FROM "+TRANS_TBL+" WHERE IX = ?"; - PreparedStatement ps = jdbcConn.prepareStatement(sql2); - logger.debug("Executing: "+sql2); - logger.debug(" For ix = "+rows); - for (int ix : rows) { - ps.setInt(1, ix); - ps.execute(); - } - ps.close(); + this.deleteStagingStatement.setInt(1,smallestIx); + this.deleteStagingStatement.setInt(2,biggestIx); + this.deleteStagingStatement.setInt(3,this.connId); + logger.debug("Staging delete: Executing with vals ["+smallestIx+","+biggestIx+","+this.connId+"]"); + this.deleteStagingStatement.execute(); } } catch (SQLException e) { logger.warn("Exception in postStatementHook: "+e); @@ -1047,4 +1056,9 @@ NEW.field refers to the new value String sql = "DELETE FROM " + TRANS_TBL + " WHERE CONNECTION_ID = " + this.connId; jdbcStmt.executeQuery(sql); } + + @Override + public Connection getSQLConnection(){ + return jdbcConn; + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java index 057b550..e76e1b1 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java @@ -214,12 +214,7 @@ public class OwnershipAndCheckpoint{ checkpointLock.unlock(); break; } else { - final StagingTable txDigest = mi.getTxDigest(pair.getKey()); - applyTxDigest(rangesToWarmup,di, txDigest); - for (Range r : pair.getValue()) { - MusicRangeInformationRow row = node.getRow(); - alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index)); - } + applyDigestAndUpdateDataStructures(mi, di, rangesToWarmup, node, pair); } pair = node.nextNotAppliedTransaction(rangeSet); enableForeignKeys(di); @@ -233,6 +228,24 @@ public class OwnershipAndCheckpoint{ } } + private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface di, List<Range> ranges, DagNode node, + Pair<MusicTxDigestId, List<Range>> pair) throws MDBCServiceException { + final StagingTable txDigest; + try { + txDigest = mi.getTxDigest(pair.getKey()); + } catch (MDBCServiceException e) { + logger.warn("Transaction digest was not found, this could be caused by a failure of the previous owner" + +"And would normally only happen as the last ID of the corresponding redo log. Please check that this is the" + +" case for txID "+pair.getKey().transactionId.toString()); + return; + } + applyTxDigest(ranges,di, txDigest); + for (Range r : pair.getValue()) { + MusicRangeInformationRow row = node.getRow(); + alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index)); + } + } + private void applyRequiredChanges(MusicInterface mi, DBInterface db, Dag extendedDag, List<Range> ranges, UUID ownOpId) throws MDBCServiceException { Set<Range> rangeSet = new HashSet<Range>(ranges); @@ -242,12 +255,7 @@ public class OwnershipAndCheckpoint{ if(node!=null) { Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet); while (pair != null) { - final StagingTable txDigest = mi.getTxDigest(pair.getKey()); - applyTxDigest(ranges, db, txDigest); - for (Range r : pair.getValue()) { - MusicRangeInformationRow row = node.getRow(); - alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index)); - } + applyDigestAndUpdateDataStructures(mi, db, ranges, node, pair); pair = node.nextNotAppliedTransaction(rangeSet); if (timeout(ownOpId)) { enableForeignKeys(db); @@ -393,6 +401,7 @@ public class OwnershipAndCheckpoint{ return rowsPerLatestRange; } + public Map<Range, Pair<MriReference, Integer>> getAlreadyApplied() { return this.alreadyApplied; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java deleted file mode 100644 index 5b3872a..0000000 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * ============LICENSE_START==================================================== - * org.onap.music.mdbc - * ============================================================================= - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END====================================================== - */ -package org.onap.music.mdbc.tables; - -import java.sql.SQLException; -import java.sql.Timestamp; -import java.util.*; -import java.util.concurrent.TimeUnit; -import org.onap.music.exceptions.MDBCServiceException; -import org.onap.music.logging.EELFLoggerDelegate; -import org.onap.music.mdbc.DatabasePartition; -import org.onap.music.mdbc.MdbcConnection; -import org.onap.music.mdbc.Range; -import org.onap.music.mdbc.StateManager; -import org.onap.music.mdbc.mixins.DBInterface; -import org.onap.music.mdbc.mixins.MusicInterface; - -public class MusicTxDigest { - private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicTxDigest.class); - - //private MdbcServerLogic mdbcServer; - //private NodeConfiguration config; - private StateManager stateManager; - - public MusicTxDigest(StateManager stateManager) { - this.stateManager = stateManager; - } - - /** - * Runs the body of the background daemon - * @param daemonSleepTimeS time, in seconds, between updates - * @throws InterruptedException - */ - public void backgroundDaemon(int daemonSleepTimeS) throws InterruptedException { - MusicInterface mi = stateManager.getMusicInterface(); - DBInterface dbi = ((MdbcConnection) stateManager.getConnection("daemon")).getDBInterface(); - - while (true) { - Thread.sleep(TimeUnit.SECONDS.toMillis(daemonSleepTimeS)); - //update - logger.info(String.format("[%s] Background MusicTxDigest daemon updating local db", - new Timestamp(System.currentTimeMillis()))); - - //1) get all other partitions from musicrangeinformation - List<UUID> partitions = null; - try { - partitions = mi.getPartitionIndexes(); - } catch (MDBCServiceException e) { - logger.error("Error obtainting partition indexes, trying again next iteration"); - continue; - } - //2) for each partition I don't own - final Set<Range> warmupRanges = stateManager.getRangesToWarmup(); - final List<DatabasePartition> currentPartitions = stateManager.getPartitions(); - if (currentPartitions.size() != 0) { - for (DatabasePartition part : currentPartitions) { - List<Range> partitionRanges = part.getSnapshot(); - warmupRanges.removeAll(partitionRanges); - } - try { - stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges)); - } catch (MDBCServiceException e) { - logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage()); - continue; - } - } - - //Step 3: ReplayDigest() for E.C conditions - try { - replayDigest(mi,dbi, stateManager.getEventualRanges()); - } catch (MDBCServiceException e) { - logger.error("Unable to perform Eventual Consistency operations" + e.getMessage()); - continue; - } - - } - } - - /** - * Replay the digest for eventual consistency. - * @param mi music interface - * @param dbi interface to the database that will replay the operations - * @param ranges only these ranges will be applied from the digests - * @throws MDBCServiceException - */ - public void replayDigest(MusicInterface mi, DBInterface dbi, List<Range> ranges) throws MDBCServiceException { - StagingTable transaction; - String nodeName = stateManager.getMdbcServerName(); - - LinkedHashMap<UUID,StagingTable> ecDigestInformation = mi.getEveTxDigest(nodeName); - Set<UUID> keys = ecDigestInformation.keySet(); - for(UUID txTimeID:keys){ - transaction = ecDigestInformation.get(txTimeID); - try { - dbi.replayTransaction(transaction, ranges); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??) - } catch (SQLException e) { - logger.error("EC:Rolling back the entire digest replay."); - return; - } - logger.info("EC: Successfully replayed transaction "); - - try { - mi.updateNodeInfoTableWithTxTimeIDKey(txTimeID, nodeName); - } catch (MDBCServiceException e) { - logger.error("EC:Rolling back the entire digest replay."); - } - } -; - } - - - /** - * Replay the digest for a given partition - * @param mi music interface - * @param partitionId the partition to be replayed - * @param dbi interface to the database that will replay the operations - * @throws MDBCServiceException - */ - public static void replayDigestForPartition(MusicInterface mi, UUID partitionId, DBInterface dbi) throws MDBCServiceException { - final MusicRangeInformationRow row = mi.getMusicRangeInformation(partitionId); - List<MusicTxDigestId> partitionsRedoLogTxIds = row.getRedoLog(); - for (MusicTxDigestId txId: partitionsRedoLogTxIds) { - StagingTable transaction = mi.getTxDigest(txId); - try { - //\TODO do this two operations in parallel - dbi.replayTransaction(transaction, row.getDBPartition().getSnapshot()); - mi.replayTransaction(transaction); - } catch (SQLException e) { - logger.error("Rolling back the entire digest replay. " + partitionId); - return; - } - logger.info("Successfully replayed transaction " + txId); - } - //todo, keep track of where I am in pointer - } - - /** - * Start the background daemon defined by this object - * Spawns a new thread and runs "backgroundDaemon" - * @param daemonSleepTimeS time, in seconds, between updates run by daemon - */ - public void startBackgroundDaemon(int daemonSleepTimeS) { - class MusicTxBackgroundDaemon implements Runnable { - public void run() { - while (true) { - try { - logger.info("MusicTxDigest background daemon started"); - backgroundDaemon(daemonSleepTimeS); - } catch (InterruptedException e) { - logger.error("MusicTxDigest background daemon stopped " + e.getMessage()); - } - } - } - } - Thread t = new Thread(new MusicTxBackgroundDaemon()); - t.start(); - - } -} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java new file mode 100644 index 0000000..4f3a3bf --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java @@ -0,0 +1,144 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ +package org.onap.music.mdbc.tables; + +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.*; +import java.util.concurrent.TimeUnit; +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.mdbc.DatabasePartition; +import org.onap.music.mdbc.MdbcConnection; +import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.StateManager; +import org.onap.music.mdbc.mixins.DBInterface; +import org.onap.music.mdbc.mixins.MusicInterface; + +public class MusicTxDigestDaemon implements Runnable { + + private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicTxDigestDaemon.class); + + private StateManager stateManager; + private int daemonSleepTimeS; + + public MusicTxDigestDaemon(int daemonSleepTimeS, StateManager stateManager) { + this.stateManager = stateManager; + this.daemonSleepTimeS = daemonSleepTimeS; + } + + /** + * Replay the digest for eventual consistency. + * + * @param mi music interface + * @param dbi interface to the database that will replay the operations + * @param ranges only these ranges will be applied from the digests + */ + public void replayDigest(MusicInterface mi, DBInterface dbi, List<Range> ranges) throws MDBCServiceException { + StagingTable transaction; + String nodeName = stateManager.getMdbcServerName(); + + LinkedHashMap<UUID, StagingTable> ecDigestInformation = mi.getEveTxDigest(nodeName); + Set<UUID> keys = ecDigestInformation.keySet(); + for (UUID txTimeID : keys) { + transaction = ecDigestInformation.get(txTimeID); + try { + dbi.replayTransaction(transaction, + ranges); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??) + } catch (SQLException e) { + logger.error("EC:Rolling back the entire digest replay."); + return; + } + logger.info("EC: Successfully replayed transaction "); + + try { + mi.updateNodeInfoTableWithTxTimeIDKey(txTimeID, nodeName); + } catch (MDBCServiceException e) { + logger.error("EC:Rolling back the entire digest replay."); + } + } + } + + @Override + public void run() { + logger.info("MusicTxDigest background daemon started"); + if (stateManager == null) { + logger.error("State manager is null in background daemon"); + return; + } + MusicInterface mi = stateManager.getMusicInterface(); + + if (mi == null) { + logger.error("Music interface or DB interface is null in background daemon"); + return; + } + while (true) { + try { + MdbcConnection conn = (MdbcConnection) stateManager.getConnection("daemon"); + if (conn == null) { + logger.error("Connection created is null in background daemon"); + return; + } + DBInterface dbi = (conn).getDBInterface(); + //update + logger.info(String.format("[%s] Background MusicTxDigest daemon updating local db", + new Timestamp(System.currentTimeMillis()))); + + //1) get all other partitions from musicrangeinformation + List<UUID> partitions = null; + try { + partitions = mi.getPartitionIndexes(); + } catch (MDBCServiceException e) { + logger.error("Error obtainting partition indexes, trying again next iteration"); + continue; + } + //2) for each partition I don't own + final Set<Range> warmupRanges = stateManager.getRangesToWarmup(); + final List<DatabasePartition> currentPartitions = stateManager.getPartitions(); + List<Range> missingRanges = new ArrayList<>(); + if (currentPartitions.size() != 0) { + for (DatabasePartition part : currentPartitions) { + List<Range> partitionRanges = part.getSnapshot(); + warmupRanges.removeAll(partitionRanges); + } + try { + stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges)); + } catch (MDBCServiceException e) { + logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage()); + continue; + } + } + + //Step 3: ReplayDigest() for E.C conditions + try { + replayDigest(mi, dbi, stateManager.getEventualRanges()); + } catch (MDBCServiceException e) { + logger.error("Unable to perform Eventual Consistency operations" + e.getMessage()); + continue; + } + conn.close(); + Thread.sleep(TimeUnit.SECONDS.toMillis(this.daemonSleepTimeS)); + } catch (InterruptedException | SQLException e) { + logger.error("MusicTxDigest background daemon stopped " + e.getMessage(), e); + Thread.currentThread().interrupt(); + } + } + } +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java index 1c37db0..8fa49a9 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java @@ -22,16 +22,16 @@ package org.onap.music.mdbc.tables; import java.util.UUID; public final class MusicTxDigestId { - public final UUID txId; + public final UUID transactionId; public final int index; - public MusicTxDigestId(UUID primaryKey, int index) { - this.txId= primaryKey; + public MusicTxDigestId(UUID digestId, int index) { + this.transactionId= digestId; this.index=index; } public boolean isEmpty() { - return (this.txId==null); + return (this.transactionId==null); } @Override @@ -40,11 +40,11 @@ public final class MusicTxDigestId { if(o == null) return false; if(!(o instanceof MusicTxDigestId)) return false; MusicTxDigestId other = (MusicTxDigestId) o; - return other.txId.equals(this.txId); + return other.transactionId.equals(this.transactionId); } @Override public int hashCode(){ - return txId.hashCode(); + return transactionId.hashCode(); } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java index a9ab25f..eda6191 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java @@ -43,6 +43,18 @@ public final class Operation implements Serializable{ KEY = key; } + @Override + protected Object clone() throws CloneNotSupportedException { + Operation clone = null; + try { + clone = (Operation) super.clone(); + } + catch (CloneNotSupportedException e) { + throw new RuntimeException(e); + } + return clone; + } + public String getTable(){ return TABLE; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java index 03c7259..dbed9e4 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java @@ -21,11 +21,14 @@ package org.onap.music.mdbc.tables; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; +import java.util.zip.DataFormatException; +import java.util.zip.Deflater; +import java.util.zip.Inflater; +import javax.validation.constraints.Null; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.mdbc.Range; @@ -36,7 +39,7 @@ import org.onap.music.mdbc.proto.ProtoDigest.Digest.Row.OpType; public class StagingTable { - private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(StagingTable.class); + private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(StagingTable.class); private ArrayList<Operation> operations; boolean builderInitialized; Builder digestBuilder; @@ -47,6 +50,40 @@ public class StagingTable { this(new HashSet<>()); logger.debug("Creating staging table with no parameters, most likely this is wrong, unless you are testing"); } + + public StagingTable(StagingTable other) throws CloneNotSupportedException { + if(other==null){ + throw new NullPointerException("Invalid constructor parameter passed, it is null"); + } + //TODO this is a highly inefficient deep copy, please don't use in prod + operations=null; + if(other.operations!=null) { + Iterator<Operation> iterator = other.operations.iterator(); + operations = new ArrayList<>(); + while (iterator.hasNext()) { + operations.add((Operation) iterator.next().clone()); + } + } + builderInitialized=other.builderInitialized; + digestBuilder=null; + if(other.digestBuilder!=null) { + CompleteDigest build = other.digestBuilder.build(); + digestBuilder = build.toBuilder(); + } + eventuallyBuilder=null; + if(other.eventuallyBuilder!=null) { + CompleteDigest build2 = other.digestBuilder.build(); + eventuallyBuilder = build2.toBuilder(); + } + eventuallyConsistentRanges=null; + if(other.eventuallyConsistentRanges!=null) { + eventuallyConsistentRanges = new HashSet<>(); + Iterator<Range> rangeIter = other.eventuallyConsistentRanges.iterator(); + while (rangeIter.hasNext()) { + eventuallyConsistentRanges.add(rangeIter.next().clone()); + } + } + } public StagingTable(Set<Range> eventuallyConsistentRanges) { //operations = new ArrayList<Operation>(); @@ -74,6 +111,67 @@ public class StagingTable { } } + public static ByteBuffer Compress(ByteBuffer serializedStaging) throws MDBCServiceException { + if(serializedStaging.hasArray()) { + //\TODO: Use JAVA 11 to simplify this process using ByteBuffer natively + Deflater compressor = new Deflater(); + final byte[] inputArray = serializedStaging.array(); + compressor.setInput(inputArray); + compressor.finish(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(serializedStaging.array().length); + byte[] buf = new byte[1024]; + try { + while (!compressor.finished()) { + int i = compressor.deflate(buf); + bos.write(buf, 0, i); + } + } finally { + compressor.end(); + try { + bos.close(); + } catch (IOException e) { + throw new MDBCServiceException("Error closing ByetArrayOutputStream:",e); + } + } + byte[] output = bos.toByteArray(); + logger.debug("Staging table compressed from: "+inputArray.length+" to "+output.length); + return ByteBuffer.wrap(output); + } + else{ + throw new MDBCServiceException("Byte buffer was not created correctly, it should wrap an array"); + } + } + + public static ByteBuffer Decompress(ByteBuffer compressedStaging) throws MDBCServiceException { + if(compressedStaging.hasArray()) { + //\TODO: Use JAVA 11 to simplify this process using ByteBuffer natively + Inflater decompressor = new Inflater(); + byte[] inputArray = compressedStaging.array(); + decompressor.setInput(inputArray); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(inputArray.length); + byte[] buffer = new byte[1024]; + while (!decompressor.finished()) { + int decompressSize = 0; + try { + decompressSize = decompressor.inflate(buffer); + } catch (DataFormatException e) { + throw new MDBCServiceException("error decompressing input data",e); + } + outputStream.write(buffer, 0, decompressSize); + } + try { + outputStream.close(); + } catch (IOException e) { + throw new MDBCServiceException("Error closing output byte stream",e); + } + byte[] output = outputStream.toByteArray(); + return ByteBuffer.wrap(output); + } + else{ + throw new MDBCServiceException("Byte buffer was not created correctly, it should wrap an array"); + } + } + synchronized public boolean isBuilderInitialized(){ return isBuilderInitialized(); } @@ -108,7 +206,7 @@ public class StagingTable { } logger.warn("Get operation list with this type of initialization is not suggested for the" + "staging table"); - ArrayList newOperations = new ArrayList(); + ArrayList<Operation> newOperations = new ArrayList<>(); for(Row row : digestBuilder.getRowsList()){ final OpType type = row.getType(); OperationType newType = (type==OpType.INSERT)?OperationType.INSERT:(type==OpType.DELETE)? @@ -123,9 +221,10 @@ public class StagingTable { throw new MDBCServiceException("This type of staging table is unmutable, please use the constructor" + "with no parameters"); } - ByteString serialized = digestBuilder.build().toByteString(); + byte[] bytes = digestBuilder.build().toByteArray(); + ByteBuffer serialized = ByteBuffer.wrap(bytes); digestBuilder.clear(); - return serialized.asReadOnlyByteBuffer(); + return serialized; } synchronized public ByteBuffer getSerializedEventuallyStagingAndClean() throws MDBCServiceException { @@ -136,9 +235,10 @@ public class StagingTable { if(eventuallyBuilder == null || eventuallyBuilder.getRowsCount()==0){ return null; } - ByteString serialized = eventuallyBuilder.build().toByteString(); + byte[] bytes = eventuallyBuilder.build().toByteArray(); + ByteBuffer serialized = ByteBuffer.wrap(bytes); eventuallyBuilder.clear(); - return serialized.asReadOnlyByteBuffer(); + return serialized; } synchronized public boolean isEmpty() { diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/ClusterSetup.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/ClusterSetup.java new file mode 100644 index 0000000..d560915 --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/ClusterSetup.java @@ -0,0 +1,80 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ +package org.onap.music.mdbc.tools; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import java.io.FileNotFoundException; +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.main.MusicUtil; +import org.onap.music.mdbc.configurations.ClusterConfiguration; + +public class ClusterSetup { + public static final EELFLoggerDelegate LOG = EELFLoggerDelegate.getLogger(ClusterSetup.class); + + @Parameter(names = { "-c", "--configuration" }, required = true, + description = "This is the input file that is going to have the configuration to setup the cluster") + private String configurationFile; + @Parameter(names = { "-h", "-help", "--help" }, help = true, + description = "Print the help message") + private boolean help = false; + + private ClusterConfiguration inputConfig; + + public ClusterSetup(){} + + + public void readInput(){ + LOG.info("Reading inputs"); + try { + inputConfig = ClusterConfiguration.readJsonFromFile(configurationFile); + } catch (FileNotFoundException e) { + LOG.error("Input file is invalid or not found"); + System.exit(1); + } + } + + public void createAll() throws MDBCServiceException { + inputConfig.initNamespaces(); + inputConfig.initTables(); + } + + public static void main(String[] args) { + LOG.info("Starting cassandra cluster initializer"); + LOG.info("Using music file configuration:"+ MusicUtil.getMusicPropertiesFilePath()); + ClusterSetup configs = new ClusterSetup(); + @SuppressWarnings("deprecation") + JCommander jc = new JCommander(configs, args); + if (configs.help) { + jc.usage(); + System.exit(1); + return; + } + configs.readInput(); + try { + configs.createAll(); + } catch (MDBCServiceException e) { + e.printStackTrace(); + System.exit(1); + } + System.exit(0); + } +} diff --git a/mdbc-server/src/main/resources/music.properties b/mdbc-server/src/main/resources/music.properties index 0b34ff9..1aaf7fd 100755 --- a/mdbc-server/src/main/resources/music.properties +++ b/mdbc-server/src/main/resources/music.properties @@ -5,4 +5,4 @@ cassandra.user =\ cassandra.password =\ cassandra music_namespace =\ - mdbc_namespace
\ No newline at end of file + mdbc_namespace diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java index 11ec272..7f1c0e1 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java @@ -26,7 +26,7 @@ import java.io.IOException; import java.util.HashMap; import org.junit.Test; -import org.onap.music.mdbc.tables.MusicTxDigest; +import org.onap.music.mdbc.tables.MusicTxDigestDaemon; import org.onap.music.mdbc.tables.StagingTable; diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java b/mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java index 291179a..e5a3252 100755 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java @@ -20,11 +20,8 @@ package org.onap.music.mdbc; import com.datastax.driver.core.*; -import com.datastax.driver.core.exceptions.QueryExecutionException; -import com.datastax.driver.core.exceptions.SyntaxError; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.exceptions.MusicLockingException; -import org.onap.music.lockingservice.cassandra.CassaLockStore; import org.onap.music.lockingservice.cassandra.MusicLockState; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.main.MusicCore; @@ -38,16 +35,12 @@ import java.util.*; import org.onap.music.mdbc.mixins.MusicInterface; import org.onap.music.mdbc.tables.MusicRangeInformationRow; -import static junit.framework.TestCase.assertNotNull; -import static junit.framework.TestCase.fail; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - public class TestUtils { private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TestUtils.class); - public static DatabasePartition createBasicRow(Range range, MusicInterface mixin, String mdbcServerName){ + public static DatabasePartition createBasicRow(Range range, MusicInterface mixin, String mdbcServerName) + throws MDBCServiceException { final UUID uuid = MDBCUtils.generateTimebasedUniqueKey(); List<Range> ranges = new ArrayList<>(); ranges.add(range); @@ -55,21 +48,14 @@ public class TestUtils { MusicRangeInformationRow newRow = new MusicRangeInformationRow(uuid,dbPartition, new ArrayList<>(), "", mdbcServerName, true); DatabasePartition partition=null; - try { - partition = mixin.createMusicRangeInformation(newRow); - } catch (MDBCServiceException e) { - fail("failure when creating new row"); - } + partition = mixin.createMusicRangeInformation(newRow); return partition; } - public static void unlockRow(String keyspace, String mriTableName, DatabasePartition partition){ + public static void unlockRow(String keyspace, String mriTableName, DatabasePartition partition) + throws MusicLockingException { String fullyQualifiedMriKey = keyspace+"."+ mriTableName+"."+partition.getMRIIndex().toString(); - try { - MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId()); - } catch (MusicLockingException e) { - fail("failure when releasing lock"); - } + MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId()); } public static void createKeyspace(String keyspace, Session session) { @@ -78,15 +64,7 @@ public class TestUtils { " WITH REPLICATION " + "= {'class':'SimpleStrategy', 'replication_factor':1}; "; ResultSet res=null; - try { - res = session.execute(queryOp); - } - catch(QueryExecutionException e){ - fail("Failure executing creation of keyspace with error: " + e.getMessage()); - } catch(SyntaxError e){ - fail("Failure executing creation of keyspace with syntax error: " + e.getMessage()); - } - assertTrue("Keyspace "+keyspace+" is already being used, please change it to avoid loosing data",res.wasApplied()); + res = session.execute(queryOp); } public static void deleteKeyspace(String keyspace, Session session){ @@ -94,7 +72,6 @@ public class TestUtils { keyspace + ";"; ResultSet res = session.execute(queryBuilder); - assertTrue("Keyspace "+keyspace+" doesn't exist and it should",res.wasApplied()); } public static HashSet<String> getMriColNames(){ @@ -109,14 +86,18 @@ public class TestUtils { ); } - public static HashMap<String, DataType> getMriColTypes(Cluster cluster){ + public static HashMap<String, DataType> getMriColTypes(Cluster cluster) throws Exception { HashMap<String, DataType> expectedTypes = new HashMap<>(); expectedTypes.put("rangeid",DataType.uuid()); expectedTypes.put("keys",DataType.set(DataType.text())); ProtocolVersion currentVer = cluster.getConfiguration().getProtocolOptions().getProtocolVersion(); - assertNotNull("Protocol version for cluster is invalid", currentVer); + if(currentVer != null) { + throw new Exception("Protocol version for cluster is invalid"); + } CodecRegistry registry = cluster.getConfiguration().getCodecRegistry(); - assertNotNull("Codec registry for cluster is invalid", registry); + if(registry!= null) { + throw new Exception("Codec registry for cluster is invalid"); + } expectedTypes.put("txredolog",DataType.list(TupleType.of(currentVer,registry,DataType.text(),DataType.uuid()))); expectedTypes.put("ownerid",DataType.text()); expectedTypes.put("metricprocessid",DataType.text()); @@ -131,15 +112,19 @@ public class TestUtils { } public static void checkDataTypeForTable(List<ColumnMetadata> columnsMeta, HashSet<String> expectedColumns, - HashMap<String,DataType> expectedTypes){ + HashMap<String,DataType> expectedTypes) throws Exception { for(ColumnMetadata cMeta : columnsMeta){ String columnName = cMeta.getName(); DataType type = cMeta.getType(); - assertTrue("Invalid column name: "+columnName,expectedColumns.contains(columnName)); - assertTrue("Fix the contents of expectedtypes for column: "+columnName, - expectedTypes.containsKey(columnName)); - assertEquals("Invalid type for column: "+columnName, - expectedTypes.get(columnName),type); + if(!expectedColumns.contains(columnName)){ + throw new Exception("Invalid column name: "); + } + if(!expectedTypes.containsKey(columnName)){ + throw new Exception("Fix the contents of expectedtypes for column: "+columnName); + } + if(expectedTypes.get(columnName)!=type) { + throw new Exception("Invalid type for column: "+columnName); + } } } diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java index e8b7511..aba8cb4 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java @@ -122,8 +122,18 @@ public class MusicMixinTest { // Range range = new Range("TABLE1"); // List<Range> ranges = new ArrayList<>(); // ranges.add(range); -// final DatabasePartition partition = TestUtils.createBasicRow(range, mixin, mdbcServerName); -// TestUtils.unlockRow(keyspace,mriTableName,partition); +// DatabasePartition partition=null; +// try { +// partition = TestUtils.createBasicRow(range, mixin, mdbcServerName); +// } +// catch(Exception e){ +// fail(e.getMessage()); +// } +// try { +// TestUtils.unlockRow(keyspace,mriTableName,partition); +// } catch (MusicLockingException e) { +// fail(e.getMessage()); +// } // // DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey()); // try { @@ -229,7 +239,6 @@ public class MusicMixinTest { // assertFalse(node3Row.getIsLatest()); // } - @Test public void relinquish() { } diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java index 7db973c..eb01bcd 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java @@ -149,6 +149,7 @@ public class OwnershipAndCheckpointTest { Properties properties = new Properties(); properties.setProperty(MusicMixin.KEY_MY_ID,mdbcServerName); properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,keyspace); + properties.setProperty(MusicMixin.KEY_MUSIC_RFACTOR,"1"); //StateManager stateManager = new StateManager("dbUrl", properties, "serverName", "dbName"); ownAndCheck = new OwnershipAndCheckpoint(); musicMixin =new MusicMixin(stateManager, mdbcServerName,properties); @@ -175,7 +176,12 @@ public class OwnershipAndCheckpointTest { TxCommitProgress progressKeeper = new TxCommitProgress(); progressKeeper.createNewTransactionTracker(id ,this.conn); musicMixin.commitLog(partition, null, stagingTable, id, progressKeeper); - TestUtils.unlockRow(keyspace,mriTableName,partition); + try { + TestUtils.unlockRow(keyspace, mriTableName, partition); + } + catch(Exception e){ + fail(e.getMessage()); + } } private OwnershipReturn cleanAndOwnPartition(List<Range> ranges, UUID ownOpId) throws SQLException { @@ -265,4 +271,4 @@ public class OwnershipAndCheckpointTest { private void cleanAlreadyApplied(OwnershipAndCheckpoint ownAndCheck) { ownAndCheck.getAlreadyApplied().clear(); } -}
\ No newline at end of file +} diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/query/QueryProcessorTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/query/QueryProcessorTest.java index 63147e3..e39cc95 100644 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/query/QueryProcessorTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/query/QueryProcessorTest.java @@ -22,7 +22,7 @@ import java.util.HashMap; import java.util.List; import org.apache.calcite.sql.parser.SqlParseException; import org.junit.Test; -import org.onap.music.mdbc.tables.MusicTxDigest; +import org.onap.music.mdbc.tables.MusicTxDigestDaemon; import org.onap.music.mdbc.tables.StagingTable; |