diff options
author | Bharath Balasubramanian <bharathb@research.att.com> | 2018-11-20 12:53:38 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2018-11-20 12:53:38 +0000 |
commit | 83db3eb4ccf7e636f586a6873a966e14ba1685ae (patch) | |
tree | d86101926e3474ceebf0da40945d8d65479123de | |
parent | 14186ff595b31035d401b71111dc75da1c80a807 (diff) | |
parent | 5a7cf00e87cacf97130d3f1823adce824f865c69 (diff) |
Merge "TxDigest replay and code restructure"
31 files changed, 810 insertions, 1147 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java index 6bda739..7e39772 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java @@ -34,4 +34,8 @@ public class Configuration { public static final String MUSIC_MIXIN_DEFAULT = "cassandra2";//"cassandra2"; /** Default cassandra ulr*/ public static final String CASSANDRA_URL_DEFAULT = "localhost";//"cassandra2"; + /** Name of Tx Digest Update Daemon sleep time */ + public static final String TX_DAEMON_SLEEPTIME_S = "txdaemonsleeps"; + /** Default txDigest Daemon sleep time */ + public static final String TX_DAEMON_SLEEPTIME_S_DEFAULT = "10"; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabaseOperations.java b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabaseOperations.java deleted file mode 100755 index e10fe96..0000000 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabaseOperations.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * ============LICENSE_START==================================================== - * org.onap.music.mdbc - * ============================================================================= - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END====================================================== - */ -package org.onap.music.mdbc; - -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.TupleValue; -import org.onap.music.exceptions.MDBCServiceException; -import org.onap.music.logging.EELFLoggerDelegate; -import org.onap.music.datastore.PreparedQueryObject; -import org.onap.music.exceptions.MusicLockingException; -import org.onap.music.exceptions.MusicQueryException; -import org.onap.music.exceptions.MusicServiceException; -import org.onap.music.main.MusicCore; -import org.onap.music.main.ResultType; -import org.onap.music.main.ReturnType; -import org.onap.music.mdbc.tables.MusicRangeInformationRow; -import org.onap.music.mdbc.tables.MusicTxDigestId; -import org.onap.music.mdbc.tables.PartitionInformation; -import org.onap.music.mdbc.tables.StagingTable; - -import java.io.IOException; -import java.util.*; - -import com.datastax.driver.core.utils.UUIDs; - -public class DatabaseOperations { - private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(DatabaseOperations.class); - /** - * This functions is used to generate cassandra uuid - * @return a random UUID that can be used for fields of type uuid - */ - public static UUID generateUniqueKey() { - return UUIDs.random(); - } - - public static void createMusicTxDigest(String musicNamespace, String musicTxDigestTableName) - throws MDBCServiceException { - createMusicTxDigest(musicNamespace,musicTxDigestTableName,-1); - } - - /** - * This function creates the MusicTxDigest table. It contain information related to each transaction committed - * * LeaseId: id associated with the lease, text - * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later - * * TransactionDigest: text that contains all the changes in the transaction - */ - public static void createMusicTxDigest(String musicNamespace, String musicTxDigestTableName, - int musicTxDigestTableNumber) throws MDBCServiceException { - String tableName = musicTxDigestTableName; - if(musicTxDigestTableNumber >= 0) { - tableName = tableName + - "-" + - Integer.toString(musicTxDigestTableNumber); - } - String priKey = "txid"; - StringBuilder fields = new StringBuilder(); - fields.append("txid uuid, "); - fields.append("transactiondigest text ");//notice lack of ',' - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey); - try { - executeMusicWriteQuery(musicNamespace,tableName,cql); - } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to create redo records table"); - throw(e); - } - } - - /** - * This function creates the TransactionInformation table. It contain information related - * to the transactions happening in a given partition. - * * The schema of the table is - * * Id, uiid. - * * Partition, uuid id of the partition - * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables - * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables - * * Redo: list of uiids associated to the Redo Records Table - * - */ - public static void createMusicRangeInformationTable(String musicNamespace, String musicRangeInformationTableName) throws MDBCServiceException { - String tableName = musicRangeInformationTableName; - String priKey = "rangeid"; - StringBuilder fields = new StringBuilder(); - fields.append("rangeid uuid, "); - fields.append("keys set<text>, "); - fields.append("ownerid text, "); - fields.append("metricprocessid text, "); - //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly - fields.append("txredolog list<frozen<tuple<text,uuid>>> "); - String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey); - try { - executeMusicWriteQuery(musicNamespace,tableName,cql); - } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to create transaction information table"); - throw(e); - } - } - - /** - * Creates a new empty tit row - * @param namespace namespace where the tit table is located - * @param mriTableName name of the corresponding mri table where the new row is added - * @param processId id of the process that is going to own initially this. - * @return uuid associated to the new row - */ - public static UUID createEmptyMriRow(String namespace, String mriTableName, - String processId, String lockId, List<Range> ranges) throws MDBCServiceException { - UUID id = generateUniqueKey(); - return createEmptyMriRow(namespace,mriTableName,id,processId,lockId,ranges); - } - - public static UUID createEmptyMriRow(String namespace, String mriTableName, UUID id, String processId, String lockId, - List<Range> ranges) throws MDBCServiceException{ - StringBuilder insert = new StringBuilder("INSERT INTO ") - .append(namespace) - .append('.') - .append(mriTableName) - .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ") - .append("(") - .append(id) - .append(",{"); - boolean first=true; - for(Range r: ranges){ - if(first){ first=false; } - else { - insert.append(','); - } - insert.append("'").append(r.toString()).append("'"); - } - insert.append("},'") - .append((lockId==null)?"":lockId) - .append("','") - .append(processId) - .append("',[]);"); - PreparedQueryObject query = new PreparedQueryObject(); - query.appendQueryString(insert.toString()); - try { - executeLockedPut(namespace,mriTableName,id.toString(),query,lockId,null); - } catch (MDBCServiceException e) { - logger.error("Initialization error: Failure to add new row to transaction information"); - throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); - } - return id; - } - - public static MusicRangeInformationRow getMriRow(String namespace, String mriTableName, UUID id, String lockId) - throws MDBCServiceException{ - String cql = String.format("SELECT * FROM %s.%s WHERE rangeid = ?;", namespace, mriTableName); - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - pQueryObject.addValue(id); - Row newRow; - try { - newRow = executeLockedGet(namespace,mriTableName,pQueryObject,id.toString(),lockId); - } catch (MDBCServiceException e) { - logger.error("Get operationt error: Failure to get row from MRI "+mriTableName); - throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); - } -// public MusicRangeInformationRow(UUID index, List<MusicTxDigestId> redoLog, PartitionInformation partition, - // String ownerId, String metricProcessId) { - List<TupleValue> log = newRow.getList("txredolog",TupleValue.class); - List<MusicTxDigestId> digestIds = new ArrayList<>(); - for(TupleValue t: log){ - //final String tableName = t.getString(0); - final UUID index = t.getUUID(1); - digestIds.add(new MusicTxDigestId(index)); - } - List<Range> partitions = new ArrayList<>(); - Set<String> tables = newRow.getSet("keys",String.class); - for (String table:tables){ - partitions.add(new Range(table)); - } - return new MusicRangeInformationRow(id,digestIds,new PartitionInformation(partitions),newRow.getString("ownerid"),newRow.getString("metricprocessid")); - - } - - public static HashMap<Range,StagingTable> getTransactionDigest(String namespace, String musicTxDigestTable, MusicTxDigestId id) - throws MDBCServiceException{ - String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", namespace, musicTxDigestTable); - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - pQueryObject.addValue(id.tablePrimaryKey); - Row newRow; - try { - newRow = executeUnlockedQuorumGet(pQueryObject); - } catch (MDBCServiceException e) { - logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.tablePrimaryKey); - throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); - } - String digest = newRow.getString("transactiondigest"); - HashMap<Range,StagingTable> changes; - try { - changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest); - } catch (IOException e) { - logger.error("IOException when deserializing digest failed with an invalid class for id:"+id.tablePrimaryKey); - throw new MDBCServiceException("Deserializng digest failed with ioexception"); - } catch (ClassNotFoundException e) { - logger.error("Deserializng digest failed with an invalid class for id:"+id.tablePrimaryKey); - throw new MDBCServiceException("Deserializng digest failed with an invalid class"); - } - return changes; - } - - /** - * This method executes a write query in Music - * @param cql the CQL to be sent to Cassandra - */ - protected static void executeMusicWriteQuery(String keyspace, String table, String cql) - throws MDBCServiceException { - PreparedQueryObject pQueryObject = new PreparedQueryObject(); - pQueryObject.appendQueryString(cql); - ResultType rt = null; - try { - rt = MusicCore.createTable(keyspace,table,pQueryObject,"critical"); - } catch (MusicServiceException e) { - //\TODO: handle better, at least transform into an MDBCServiceException - e.printStackTrace(); - } - String result = rt.getResult(); - if (result==null || result.toLowerCase().equals("failure")) { - throw new MDBCServiceException("Music eventual put failed"); - } - } - - protected static Row executeLockedGet(String keyspace, String table, PreparedQueryObject cqlObject, String primaryKey, - String lock) - throws MDBCServiceException{ - ResultSet result; - try { - result = MusicCore.criticalGet(keyspace,table,primaryKey,cqlObject,lock); - } catch(MusicServiceException e){ - //\TODO: handle better, at least transform into an MDBCServiceException - e.printStackTrace(); - throw new MDBCServiceException("Error executing critical get"); - } - if(result.isExhausted()){ - throw new MDBCServiceException("There is not a row that matches the id "+primaryKey); - } - return result.one(); - } - - protected static Row executeUnlockedQuorumGet(PreparedQueryObject cqlObject) - throws MDBCServiceException{ - ResultSet result = MusicCore.quorumGet(cqlObject); - //\TODO: handle better, at least transform into an MDBCServiceException - if(result.isExhausted()){ - throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]"); - } - return result.one(); - } - - protected static void executeLockedPut(String namespace, String tableName, - String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId, - MusicCore.Condition conditionInfo) throws MDBCServiceException { - ReturnType rt ; - if(lockId==null) { - try { - rt = MusicCore.atomicPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, conditionInfo); - } catch (MusicLockingException e) { - logger.error("Music locked put failed"); - throw new MDBCServiceException("Music locked put failed"); - } catch (MusicServiceException e) { - logger.error("Music service fail: Music locked put failed"); - throw new MDBCServiceException("Music service fail: Music locked put failed"); - } catch (MusicQueryException e) { - logger.error("Music query fail: locked put failed"); - throw new MDBCServiceException("Music query fail: Music locked put failed"); - } - } - else { - rt = MusicCore.criticalPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, lockId, conditionInfo); - } - if (rt.getResult().getResult().toLowerCase().equals("failure")) { - throw new MDBCServiceException("Music locked put failed"); - } - } - - public static void createNamespace(String namespace, int replicationFactor) throws MDBCServiceException { - Map<String,Object> replicationInfo = new HashMap<>(); - replicationInfo.put("'class'", "'SimpleStrategy'"); - replicationInfo.put("'replication_factor'", replicationFactor); - - PreparedQueryObject queryObject = new PreparedQueryObject(); - queryObject.appendQueryString( - "CREATE KEYSPACE " + namespace + " WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":")); - - try { - MusicCore.nonKeyRelatedPut(queryObject, "critical"); - } catch (MusicServiceException e) { - if (!e.getMessage().equals("Keyspace "+namespace+" already exists")) { - logger.error("Error creating namespace: "+namespace); - throw new MDBCServiceException("Error creating namespace: "+namespace+". Internal error:"+e.getErrorMessage()); - } - } - } - - public static void createTxDigestRow(String namespace, String musicTxDigestTable, MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException { - PreparedQueryObject query = new PreparedQueryObject(); - String cqlQuery = "INSERT INTO " + - namespace + - '.' + - musicTxDigestTable + - " (txid,transactiondigest) " + - "VALUES (" + - newId.tablePrimaryKey + ",'" + - transactionDigest + - "');"; - query.appendQueryString(cqlQuery); - //\TODO check if I am not shooting on my own foot - try { - MusicCore.nonKeyRelatedPut(query,"critical"); - } catch (MusicServiceException e) { - logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.tablePrimaryKey.toString()+ "with error "+e.getErrorMessage()); - throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.tablePrimaryKey.toString()); - } - } - -} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java index e6b4e0e..2ca621a 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java @@ -37,9 +37,7 @@ import org.onap.music.mdbc.tables.MriReference; public class DatabasePartition { private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(DatabasePartition.class); - private String musicRangeInformationTable;//Table that currently contains the REDO log for this partition private UUID musicRangeInformationIndex;//Index that can be obtained either from - private String musicTxDigestTable; private String lockId; protected List<Range> ranges; @@ -48,57 +46,25 @@ public class DatabasePartition { * The only requirement is that the ranges are not overlapping. */ - public DatabasePartition() { - ranges = new ArrayList<>(); + public DatabasePartition(UUID mriIndex) { + this(new ArrayList<Range>(), mriIndex,""); } - public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String mriTable, String lockId, String musicTxDigestTable) { - if(knownRanges != null) { - ranges = knownRanges; - } - else { - ranges = new ArrayList<>(); - } - - if(musicTxDigestTable != null) { - this.setMusicTxDigestTable(musicTxDigestTable); - } - else{ - this.setMusicTxDigestTable(""); - } - + public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String lockId) { + ranges = knownRanges; + if(mriIndex != null) { this.setMusicRangeInformationIndex(mriIndex); } else { this.setMusicRangeInformationIndex(null); } + this.setLockId(lockId); - if(mriTable != null) { - this.setMusicRangeInformationTable(mriTable); - } - else { - this.setMusicRangeInformationTable(""); - } - - if(lockId != null) { - this.setLockId(lockId); - } - else { - this.setLockId(""); - } } - public String getMusicRangeInformationTable() { - return musicRangeInformationTable; - } - - public void setMusicRangeInformationTable(String musicRangeInformationTable) { - this.musicRangeInformationTable = musicRangeInformationTable; - } - - public MriReference getMusicRangeInformationIndex() { - return new MriReference(musicRangeInformationTable,musicRangeInformationIndex); + public UUID getMusicRangeInformationIndex() { + return musicRangeInformationIndex; } public void setMusicRangeInformationIndex(UUID musicRangeInformationIndex) { @@ -180,11 +146,4 @@ public class DatabasePartition { this.lockId = lockId; } - public String getMusicTxDigestTable() { - return musicTxDigestTable; - } - - public void setMusicTxDigestTable(String musicTxDigestTable) { - this.musicTxDigestTable = musicTxDigestTable; - } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java index 4d43177..2442af1 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java @@ -20,18 +20,26 @@ package org.onap.music.mdbc; import java.io.*; +import java.util.ArrayList; import java.util.Base64; import java.util.Deque; import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.logging.format.AppMessages; import org.onap.music.logging.format.ErrorSeverity; import org.onap.music.logging.format.ErrorTypes; +import org.onap.music.mdbc.mixins.CassandraMixin; +import org.onap.music.mdbc.mixins.Utils; import org.onap.music.mdbc.tables.Operation; import org.onap.music.mdbc.tables.StagingTable; -import javassist.bytecode.Descriptor.Iterator; +import com.datastax.driver.core.utils.UUIDs; import org.apache.commons.lang3.tuple.Pair; import org.json.JSONObject; @@ -86,4 +94,32 @@ public class MDBCUtils { } } -} + /** + * This functions is used to generate cassandra uuid + * @return a random UUID that can be used for fields of type uuid + */ + public static UUID generateUniqueKey() { + return UUIDs.random(); + } + + public static Properties getMdbcProperties() { + Properties prop = new Properties(); + InputStream input = null; + try { + input = Utils.class.getClassLoader().getResourceAsStream("mdbc.properties"); + prop.load(input); + } catch (Exception e) { + Utils.logger.warn(EELFLoggerDelegate.applicationLogger, "Could not load mdbc.properties." + + "Proceeding with defaults " + e.getMessage()); + } finally { + if (input != null) { + try { + input.close(); + } catch (IOException e) { + Utils.logger.error(EELFLoggerDelegate.errorLogger, e.getMessage()); + } + } + } + return prop; + } +}
\ No newline at end of file diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java index d4c0933..08f6e1e 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java @@ -20,6 +20,7 @@ package org.onap.music.mdbc; import org.onap.music.mdbc.configurations.NodeConfiguration; +import org.onap.music.mdbc.tables.MusicTxDigest; import org.apache.calcite.avatica.remote.Driver.Serialization; import org.apache.calcite.avatica.remote.LocalService; import org.apache.calcite.avatica.server.HttpServer; @@ -34,7 +35,7 @@ import java.util.Locale; import java.util.Properties; public class MdbcServer { - public static final EELFLoggerDelegate LOG = EELFLoggerDelegate.getLogger(MdbcStatement.class); + private static final EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MdbcStatement.class); @Parameter(names = { "-c", "--configuration" }, required = true, description = "This is the file that contains the ranges that are assigned to this MDBC server") @@ -64,10 +65,11 @@ public class MdbcServer { private NodeConfiguration config; private HttpServer server; + private MdbcServerLogic meta; public void start() { if (null != server) { - LOG.error("The server was already started"); + logger.error("The server was already started"); Unsafe.systemExit(ExitCodes.ALREADY_STARTED.ordinal()); return; } @@ -78,7 +80,7 @@ public class MdbcServer { Properties connectionProps = new Properties(); connectionProps.put("user", user); connectionProps.put("password", password); - MdbcServerLogic meta = new MdbcServerLogic(url,connectionProps,config); + meta = new MdbcServerLogic(url,connectionProps,config); LocalService service = new LocalService(meta); // Construct the server @@ -89,13 +91,14 @@ public class MdbcServer { // Then start it server.start(); - - LOG.info("Started Avatica server on port {} with serialization {}", server.getPort(), + + logger.info("Started Avatica server on port {} with serialization {}", server.getPort(), serialization); } catch (Exception e) { - LOG.error("Failed to start Avatica server", e); + logger.error("Failed to start Avatica server", e); Unsafe.systemExit(ExitCodes.START_FAILED.ordinal()); - } + } + } public void stop() { @@ -125,9 +128,9 @@ public class MdbcServer { Runtime.getRuntime().addShutdownHook( new Thread(new Runnable() { @Override public void run() { - LOG.info("Stopping server"); + logger.info("Stopping server"); server.stop(); - LOG.info("Server stopped"); + logger.info("Server stopped"); } })); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java index 33c5dbb..cccea92 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java @@ -29,6 +29,8 @@ import java.util.concurrent.TimeUnit; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.mdbc.configurations.NodeConfiguration; +import org.onap.music.mdbc.tables.MusicTxDigest; + import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; @@ -48,20 +50,16 @@ public class MdbcServerLogic extends JdbcMeta{ private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MdbcServerLogic.class); StateManager manager; - DatabasePartition ranges; String name; - String sqlDatabase; //TODO: Delete this properties after debugging private final Properties info; private final Cache<String, Connection> connectionCache; - public MdbcServerLogic(String Url, Properties info,NodeConfiguration config) throws SQLException, MDBCServiceException { + public MdbcServerLogic(String Url, Properties info, NodeConfiguration config) throws SQLException, MDBCServiceException { super(Url,info); - this.ranges = config.partition; this.name = config.nodeName; - this.sqlDatabase = config.sqlDatabaseName; - this.manager = new StateManager(Url,info,this.ranges,this.sqlDatabase); + this.manager = new StateManager(Url,info,config.partition,"test"); //FIXME: db name should not be passed in ahead of time this.info = info; int concurrencyLevel = Integer.parseInt( info.getProperty(ConnectionCacheSettings.CONCURRENCY_LEVEL.key(), @@ -87,6 +85,10 @@ public class MdbcServerLogic extends JdbcMeta{ .build(); } + public StateManager getStateManager() { + return this.manager; + } + @Override protected Connection getConnection(String id) throws SQLException { if (id == null) { diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java index d44d907..c2019cf 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java @@ -79,7 +79,7 @@ public class MusicSqlManager { */ public MusicSqlManager(String url, Connection conn, Properties info, MusicInterface mi) throws MDBCServiceException { try { - info.putAll(Utils.getMdbcProperties()); + info.putAll(MDBCUtils.getMdbcProperties()); String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT); this.dbi = MixinFactory.createDBInterface(mixinDb, this, url, conn, info); this.mi = mi; @@ -254,6 +254,7 @@ public class MusicSqlManager { return mi.getMusicKeyFromRowWithoutPrimaryIndexes(ti,table, dbRow); } + @Deprecated public String getMusicKeyFromRow(String table, JSONObject dbRow) { TableInfo ti = dbi.getTableInfo(table); return mi.getMusicKeyFromRow(ti,table, dbRow); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java index 2e47726..4bb0c85 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java @@ -20,6 +20,7 @@ package org.onap.music.mdbc; import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.exceptions.MusicServiceException; import org.onap.music.logging.EELFLoggerDelegate; import org.onap.music.logging.format.AppMessages; import org.onap.music.logging.format.ErrorSeverity; @@ -27,6 +28,7 @@ import org.onap.music.logging.format.ErrorTypes; import org.onap.music.mdbc.mixins.MixinFactory; import org.onap.music.mdbc.mixins.MusicInterface; import org.onap.music.mdbc.mixins.MusicMixin; +import org.onap.music.mdbc.tables.MusicTxDigest; import org.onap.music.mdbc.tables.TxCommitProgress; import java.io.IOException; @@ -53,26 +55,25 @@ public class StateManager { * that are created by the MDBC Server * @see MusicInterface */ - private MusicInterface musicManager; + private MusicInterface musicInterface; /** * This is the Running Queries information table. * It mainly contains information about the entities * that have being committed so far. */ private TxCommitProgress transactionInfo; - private Map<String,MdbcConnection> mdbcConnections; - private String sqlDatabase; - private String url; + String musicmixin; + String cassandraUrl; private Properties info; @SuppressWarnings("unused") private DatabasePartition ranges; - - public StateManager(String url, Properties info, DatabasePartition ranges, String sqlDatabase) throws MDBCServiceException { + + public StateManager(String url, Properties info, DatabasePartition ranges, String sqlDatabase) throws MDBCServiceException { this.sqlDatabase = sqlDatabase; this.ranges = ranges; this.url = url; @@ -81,28 +82,40 @@ public class StateManager { //\fixme this is not really used, delete! try { info.load(this.getClass().getClassLoader().getResourceAsStream("music.properties")); + info.putAll(MDBCUtils.getMdbcProperties()); } catch (IOException e) { logger.error(EELFLoggerDelegate.errorLogger, e.getMessage()); } - String cassandraUrl = info.getProperty(Configuration.KEY_CASSANDRA_URL, Configuration.CASSANDRA_URL_DEFAULT); - String mixin = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT); - init(mixin, cassandraUrl); + cassandraUrl = info.getProperty(Configuration.KEY_CASSANDRA_URL, Configuration.CASSANDRA_URL_DEFAULT); + musicmixin = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT); + + initMusic(); + initSqlDatabase(); + + + MusicTxDigest txDaemon = new MusicTxDigest(this); + txDaemon.startBackgroundDaemon(Integer.parseInt( + info.getProperty(Configuration.TX_DAEMON_SLEEPTIME_S, Configuration.TX_DAEMON_SLEEPTIME_S_DEFAULT))); } - protected void init(String mixin, String cassandraUrl) throws MDBCServiceException { - this.musicManager = MixinFactory.createMusicInterface(mixin, cassandraUrl, info); - this.musicManager.createKeyspace(); + /** + * Initialize the + * @param mixin + * @param cassandraUrl + * @throws MDBCServiceException + */ + protected void initMusic() throws MDBCServiceException { + this.musicInterface = MixinFactory.createMusicInterface(musicmixin, cassandraUrl, info); + this.musicInterface.createKeyspace(); try { - this.musicManager.initializeMetricDataStructures(); + this.musicInterface.initializeMetricDataStructures(); } catch (MDBCServiceException e) { logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR); throw(e); } - MusicMixin.loadProperties(); this.mdbcConnections = new HashMap<>(); - initSqlDatabase(); } - + protected void initSqlDatabase() throws MDBCServiceException { try { //\TODO: pass the driver as a variable @@ -125,6 +138,19 @@ public class StateManager { } } + public MusicInterface getMusicInterface() { + return this.musicInterface; + } + + public DatabasePartition getRanges() { + return ranges; + } + + public void setRanges(DatabasePartition ranges) { + this.ranges = ranges; + } + + public void CloseConnection(String connectionId){ //\TODO check if there is a race condition if(mdbcConnections.containsKey(connectionId)) { @@ -163,7 +189,7 @@ public class StateManager { } //Create MDBC connection try { - newConnection = new MdbcConnection(id, this.url+"/"+this.sqlDatabase, sqlConnection, info, this.musicManager, transactionInfo,ranges); + newConnection = new MdbcConnection(id, this.url+"/"+this.sqlDatabase, sqlConnection, info, this.musicInterface, transactionInfo,ranges); } catch (MDBCServiceException e) { logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); newConnection = null; @@ -213,7 +239,7 @@ public class StateManager { } //Create MDBC connection try { - newConnection = new MdbcConnection(id,this.url+"/"+this.sqlDatabase, sqlConnection, info, this.musicManager, transactionInfo,ranges); + newConnection = new MdbcConnection(id,this.url+"/"+this.sqlDatabase, sqlConnection, info, this.musicInterface, transactionInfo,ranges); } catch (MDBCServiceException e) { logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); newConnection = null; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java index fb4656c..ad86ada 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java @@ -36,14 +36,12 @@ public class NodeConfiguration { private static transient final EELFLoggerDelegate LOG = EELFLoggerDelegate.getLogger(NodeConfiguration.class); - public String sqlDatabaseName; public DatabasePartition partition; public String nodeName; - public NodeConfiguration(String tables, UUID mriIndex, String mriTableName, String sqlDatabaseName, String node, String redoRecordsTable){ + public NodeConfiguration(String tables, UUID mriIndex, String mriTableName, String node){ // public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String mriTable, String lockId, String musicTxDigestTable) { - partition = new DatabasePartition(toRanges(tables), mriIndex, mriTableName, null, redoRecordsTable) ; - this.sqlDatabaseName = sqlDatabaseName; + partition = new DatabasePartition(toRanges(tables), mriIndex, null) ; this.nodeName = node; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java index 77df15f..f3f5d22 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java @@ -21,9 +21,11 @@ package org.onap.music.mdbc.configurations; import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.logging.EELFLoggerDelegate; -import org.onap.music.mdbc.DatabaseOperations; +import org.onap.music.mdbc.MDBCUtils; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.RedoRow; +import org.onap.music.mdbc.mixins.CassandraMixin; +import org.onap.music.mdbc.tables.MusicTxDigest; import com.google.gson.Gson; import org.onap.music.datastore.PreparedQueryObject; @@ -62,7 +64,7 @@ public class TablesConfiguration { */ public List<NodeConfiguration> initializeAndCreateNodeConfigurations() throws MDBCServiceException { initInternalNamespace(); - DatabaseOperations.createNamespace(musicNamespace, internalReplicationFactor); + List<NodeConfiguration> nodeConfigs = new ArrayList<>(); if(partitions == null){ logger.error("Partitions was not correctly initialized"); @@ -70,12 +72,8 @@ public class TablesConfiguration { } for(PartitionInformation partitionInfo : partitions){ String mriTableName = partitionInfo.mriTableName; - mriTableName = (mriTableName==null || mriTableName.isEmpty())?TIT_TABLE_NAME:mriTableName; //0) Create the corresponding Music Range Information table - DatabaseOperations.createMusicRangeInformationTable(musicNamespace,mriTableName); - String musicTxDigestTableName = partitionInfo.mtxdTableName; - musicTxDigestTableName = (musicTxDigestTableName==null || musicTxDigestTableName.isEmpty())? MUSIC_TX_DIGEST_TABLE_NAME :musicTxDigestTableName; - DatabaseOperations.createMusicTxDigest(musicNamespace,musicTxDigestTableName); + String partitionId; if(partitionInfo.partitionId==null || partitionInfo.partitionId.isEmpty()){ if(partitionInfo.replicationFactor==0){ @@ -90,7 +88,7 @@ public class TablesConfiguration { partitionId = partitionInfo.partitionId; } //2) Create a row in the transaction information table - UUID mriTableIndex = DatabaseOperations.createEmptyMriRow(musicNamespace,mriTableName,"",null,partitionInfo.getTables()); + UUID mriTableIndex = MDBCUtils.generateUniqueKey(); //3) Add owner and tit information to partition info table RedoRow newRedoRow = new RedoRow(mriTableName,mriTableIndex); //DatabaseOperations.updateRedoRow(musicNamespace,pitName,partitionId,newRedoRow,partitionInfo.owner,null); @@ -105,13 +103,13 @@ public class TablesConfiguration { for(Range r: partitionInfo.tables){ newStr.append(r.toString()).append(","); } - nodeConfigs.add(new NodeConfiguration(newStr.toString(),mriTableIndex,mriTableName,sqlDatabaseName,partitionInfo.owner,musicTxDigestTableName)); + nodeConfigs.add(new NodeConfiguration(newStr.toString(),mriTableIndex, + sqlDatabaseName, partitionInfo.owner)); } return nodeConfigs; } private void initInternalNamespace() throws MDBCServiceException { - DatabaseOperations.createNamespace(internalNamespace,internalReplicationFactor); StringBuilder createKeysTableCql = new StringBuilder("CREATE TABLE IF NOT EXISTS ") .append(internalNamespace) .append(".unsynced_keys (key text PRIMARY KEY);"); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java index 68d836b..eede9be 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java @@ -51,7 +51,7 @@ public class EtdbTestClient { } Connection connection; try { - connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000;serialization=protobuf"); + connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf"); } catch (SQLException e) { e.printStackTrace(); return; @@ -70,7 +70,8 @@ public class EtdbTestClient { " LastName varchar(255),\n" + " FirstName varchar(255),\n" + " Address varchar(255),\n" + - " City varchar(255)\n" + + " City varchar(255),\n" + + " PRIMARY KEY (PersonID,LastName)" + ");"; Statement stmt; try { @@ -103,6 +104,12 @@ public class EtdbTestClient { } final String insertSQL = "INSERT INTO Persons VALUES (1, 'Martinez', 'Juan', 'KACB', 'ATLANTA');"; + final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=1;"; + final String insertSQL2 = "INSERT INTO Persons VALUES (2, 'Smith', 'JOHN', 'GNOC', 'BEDMINSTER');"; + final String insertSQL3 = "UPDATE Persons SET FirstName='JOSH' WHERE LastName='Smith';"; + final String insertSQL4 = "UPDATE Persons SET FirstName='JOHN' WHERE LastName='Smith';"; + + Statement insertStmt; try { insertStmt = connection.createStatement(); @@ -113,6 +120,11 @@ public class EtdbTestClient { try { execute = insertStmt.execute(insertSQL); + execute = insertStmt.execute(insertSQL1); + execute = insertStmt.execute(insertSQL2); + execute = insertStmt.execute(insertSQL3); + execute = insertStmt.execute(insertSQL4); + } catch (SQLException e) { e.printStackTrace(); return; diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java index 97c1102..0732dc8 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java @@ -30,6 +30,7 @@ import java.util.Properties; import org.json.JSONObject; import org.json.JSONTokener; import org.onap.music.datastore.PreparedQueryObject; +import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.exceptions.MusicServiceException; import org.onap.music.main.MusicCore; import org.onap.music.main.ReturnType; @@ -57,7 +58,7 @@ public class Cassandra2Mixin extends CassandraMixin { super(); } - public Cassandra2Mixin(String url, Properties info) throws MusicServiceException { + public Cassandra2Mixin(String url, Properties info) throws MDBCServiceException { super(url, info); } @@ -80,9 +81,10 @@ public class Cassandra2Mixin extends CassandraMixin { /** * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables. * The keyspace name comes from the initialization properties passed to the JDBC driver. + * @throws MusicServiceException */ @Override - public void createKeyspace() { + public void createKeyspace() throws MDBCServiceException { super.createKeyspace(); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java index a1f325e..3b1651f 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java @@ -35,12 +35,11 @@ import java.util.TreeSet; import java.util.UUID; import org.onap.music.mdbc.*; -import org.onap.music.mdbc.DatabaseOperations; -import org.onap.music.mdbc.tables.PartitionInformation; import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.MriReference; import org.onap.music.mdbc.tables.MusicRangeInformationRow; +import org.onap.music.mdbc.tables.MusicTxDigest; import org.onap.music.mdbc.tables.TxCommitProgress; import org.json.JSONObject; @@ -50,6 +49,7 @@ import org.onap.music.exceptions.MusicLockingException; import org.onap.music.exceptions.MusicQueryException; import org.onap.music.exceptions.MusicServiceException; import org.onap.music.main.MusicCore; +import org.onap.music.main.MusicCore.Condition; import org.onap.music.main.ResultType; import org.onap.music.main.ReturnType; @@ -62,6 +62,7 @@ import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.datastax.driver.core.TupleValue; /** * This class provides the methods that MDBC needs to access Cassandra directly in order to provide persistence @@ -93,8 +94,8 @@ public class CassandraMixin implements MusicInterface { public static final String KEY_MUSIC_RFACTOR = "music_rfactor"; /** The property name to use to provide the replication factor for Cassandra. */ public static final String KEY_MUSIC_NAMESPACE = "music_namespace"; - /** The default property value to use for the Cassandra keyspace. */ - public static final String DEFAULT_MUSIC_KEYSPACE = "mdbc"; + /** Namespace for the tables in MUSIC (Cassandra) */ + public static final String DEFAULT_MUSIC_NAMESPACE = "namespace"; /** The default property value to use for the Cassandra IP address. */ public static final String DEFAULT_MUSIC_ADDRESS = "localhost"; /** The default property value to use for the Cassandra replication factor. */ @@ -103,8 +104,7 @@ public class CassandraMixin implements MusicInterface { public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid"; /** Type of the primary key, if none is defined by the user */ public static final String MDBC_PRIMARYKEY_TYPE = "uuid"; - /** Namespace for the tables in MUSIC (Cassandra) */ - public static final String DEFAULT_MUSIC_NAMESPACE = "namespace"; + //\TODO Add logic to change the names when required and create the tables when necessary private String musicTxDigestTableName = "musictxdigest"; @@ -155,7 +155,7 @@ public class CassandraMixin implements MusicInterface { this.allReplicaIds = null; } - public CassandraMixin(String url, Properties info) throws MusicServiceException { + public CassandraMixin(String url, Properties info) throws MDBCServiceException { // Default values -- should be overridden in the Properties // Default to using the host_ids of the various peers as the replica IDs (this is probably preferred) this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS); @@ -173,11 +173,15 @@ public class CassandraMixin implements MusicInterface { this.music_ns = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE); logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns); - musicRangeInformationTableName = "musicrangeinformation"; - createMusicKeyspace(); + createKeyspace(); } - private void createMusicKeyspace() throws MusicServiceException { + /** + * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables. + * The keyspace name comes from the initialization properties passed to the JDBC driver. + */ + @Override + public void createKeyspace() throws MDBCServiceException { Map<String,Object> replicationInfo = new HashMap<>(); replicationInfo.put("'class'", "'SimpleStrategy'"); @@ -185,15 +189,14 @@ public class CassandraMixin implements MusicInterface { PreparedQueryObject queryObject = new PreparedQueryObject(); queryObject.appendQueryString( - "CREATE KEYSPACE " + this.music_ns + " WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":")); + "CREATE KEYSPACE IF NOT EXISTS " + this.music_ns + + " WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":")); try { MusicCore.nonKeyRelatedPut(queryObject, "eventual"); - } catch (MusicServiceException e) { - if (e.getMessage().equals("Keyspace "+this.music_ns+" already exists")) { - // ignore - } else { - throw(e); + } catch (MusicServiceException e) { + if (!e.getMessage().equals("Keyspace "+music_ns+" already exists")) { + throw new MDBCServiceException("Error creating namespace: "+music_ns+". Internal error:"+e.getErrorMessage()); } } } @@ -234,26 +237,13 @@ public class CassandraMixin implements MusicInterface { @Override public void initializeMetricDataStructures() throws MDBCServiceException { try { - DatabaseOperations.createMusicTxDigest(music_ns, musicTxDigestTableName);//\TODO If we start partitioning the data base, we would need to use the redotable number - DatabaseOperations.createMusicRangeInformationTable(music_ns, musicRangeInformationTableName); + createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number + createMusicRangeInformationTable(); } catch(MDBCServiceException e){ logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC"); } } - - /** - * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables. - * The keyspace name comes from the initialization properties passed to the JDBC driver. - */ - @Override - public void createKeyspace() { - if (keyspace_created == false) { - String cql = String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : %d };", music_ns, music_rfactor); - executeMusicWriteQuery(cql); - keyspace_created = true; - } - } /** * This method performs all necessary initialization in Music/Cassandra to store the table <i>tableName</i>. @@ -286,7 +276,7 @@ public class CassandraMixin implements MusicInterface { fields.append(pfx).append(MDBC_PRIMARYKEY_NAME) .append(" ") .append(MDBC_PRIMARYKEY_TYPE); - prikey.append("mdbc_cuid"); + prikey.append(MDBC_PRIMARYKEY_NAME); } String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", music_ns, tableName, fields.toString(), prikey.toString()); executeMusicWriteQuery(cql); @@ -928,7 +918,7 @@ public class CassandraMixin implements MusicInterface { * Return the function for cassandra's primary key generation */ public String generateUniqueKey() { - return DatabaseOperations.generateUniqueKey().toString(); + return MDBCUtils.generateUniqueKey().toString(); } @Override @@ -986,7 +976,7 @@ public class CassandraMixin implements MusicInterface { String pfx = ""; for(String keyCol: keyCols) { key.append(pfx); - key.append(row.getString(keyCol)); + key.append(row.get(keyCol)); pfx = ","; } String keyStr = key.toString(); @@ -1036,7 +1026,7 @@ public class CassandraMixin implements MusicInterface { } protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException { - MriReference mriIndex = partition.getMusicRangeInformationIndex(); + UUID mriIndex = partition.getMusicRangeInformationIndex(); String lockId; lockId = MusicCore.createLockReference(fullyQualifiedKey); //\TODO Handle better failures to acquire locks @@ -1058,11 +1048,13 @@ public class CassandraMixin implements MusicInterface { try { MusicCore.forciblyReleaseLock(fullyQualifiedKey,lockId); CassaLockStore lockingServiceHandle = MusicCore.getLockingServiceHandle(); - CassaLockStore.LockObject lockOwner = lockingServiceHandle.peekLockQueue(music_ns, partition.getMusicRangeInformationTable(), mriIndex.index.toString()); + CassaLockStore.LockObject lockOwner = lockingServiceHandle.peekLockQueue(music_ns, + this.musicRangeInformationTableName, mriIndex.toString()); while(lockOwner.lockRef != lockId) { MusicCore.forciblyReleaseLock(fullyQualifiedKey, lockOwner.lockRef); try { - lockOwner = lockingServiceHandle.peekLockQueue(music_ns, partition.getMusicRangeInformationTable(), mriIndex.index.toString()); + lockOwner = lockingServiceHandle.peekLockQueue(music_ns, + this.musicRangeInformationTableName, mriIndex.toString()); } catch(NullPointerException e){ //Ignore null pointer exception lockId = MusicCore.createLockReference(fullyQualifiedKey); @@ -1102,12 +1094,12 @@ public class CassandraMixin implements MusicInterface { @Override public void commitLog(DBInterface dbi, DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{ - MriReference mriIndex = partition.getMusicRangeInformationIndex(); + UUID mriIndex = partition.getMusicRangeInformationIndex(); if(mriIndex==null) { //\TODO Fetch MriIndex from the Range Information Table throw new MDBCServiceException("TIT Index retrieval not yet implemented"); } - String fullyQualifiedMriKey = music_ns+"."+ mriIndex.table+"."+mriIndex.index.toString(); + String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex.toString(); //0. See if reference to lock was already created String lockId = partition.getLockId(); if(lockId == null || lockId.isEmpty()) { @@ -1133,7 +1125,7 @@ public class CassandraMixin implements MusicInterface { throw new MDBCServiceException("Failed to serialized transaction digest with error "+e.toString()); } MusicTxDigestId digestId = new MusicTxDigestId(commitId); - addTxDigest(musicTxDigestTableName, digestId, serializedTransactionDigest); + addTxDigest(digestId, serializedTransactionDigest); //2. Save RRT index to RQ if(progressKeeper!= null) { progressKeeper.setRecordId(txId,digestId); @@ -1188,49 +1180,235 @@ public class CassandraMixin implements MusicInterface { } return objects; } - + + @Override + public List<UUID> getPartitionIndexes() { + ArrayList<UUID> partitions = new ArrayList<UUID>(); + String cql = String.format("SELECT rangeid FROM %s.%s", music_ns, musicRangeInformationTableName); + ResultSet rs = executeMusicRead(cql); + for (Row r: rs) { + partitions.add(r.getUUID("rangeid")); + } + return partitions; + } + @Override - public MusicRangeInformationRow getMusicRangeInformation(DatabasePartition partition) throws MDBCServiceException { + public MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException { //TODO: verify that lock id is valid before calling the database operations function - MriReference reference = partition.getMusicRangeInformationIndex(); - return DatabaseOperations.getMriRow(music_ns,reference.table,reference.index,partition.getLockId()); + //UUID id = partition.getMusicRangeInformationIndex(); + + String cql = String.format("SELECT * FROM %s.%s WHERE rangeid = ?;", music_ns, musicRangeInformationTableName); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + pQueryObject.addValue(partitionIndex); + Row newRow; + try { + newRow = executeMusicUnlockedQuorumGet(pQueryObject); + } catch (MDBCServiceException e) { + logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName); + throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); + } + + List<TupleValue> log = newRow.getList("txredolog",TupleValue.class); + List<MusicTxDigestId> digestIds = new ArrayList<>(); + for(TupleValue t: log){ + //final String tableName = t.getString(0); + final UUID index = t.getUUID(1); + digestIds.add(new MusicTxDigestId(index)); + } + List<Range> partitions = new ArrayList<>(); + Set<String> tables = newRow.getSet("keys",String.class); + for (String table:tables){ + partitions.add(new Range(table)); + } + return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex, ""), + digestIds, newRow.getString("ownerid"),newRow.getString("metricprocessid")); } + + /** + * This function creates the TransactionInformation table. It contain information related + * to the transactions happening in a given partition. + * * The schema of the table is + * * Id, uiid. + * * Partition, uuid id of the partition + * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables + * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables + * * Redo: list of uiids associated to the Redo Records Table + * + */ + private void createMusicRangeInformationTable() throws MDBCServiceException { + String tableName = this.musicRangeInformationTableName; + String priKey = "rangeid"; + StringBuilder fields = new StringBuilder(); + fields.append("rangeid uuid, "); + fields.append("keys set<text>, "); + fields.append("ownerid text, "); + fields.append("metricprocessid text, "); + //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly + fields.append("txredolog list<frozen<tuple<text,uuid>>> "); + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", + this.music_ns, tableName, fields, priKey); + try { + executeMusicWriteQuery(this.music_ns,tableName,cql); + } catch (MDBCServiceException e) { + logger.error("Initialization error: Failure to create transaction information table"); + throw(e); + } + } + + @Override public DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException { - DatabasePartition newPartition = new DatabasePartition(info.partition.ranges,info.index, - musicRangeInformationTableName,null,musicTxDigestTableName); - String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+info.index.toString(); + DatabasePartition newPartition = info.getDBPartition(); + String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+newPartition.getMusicRangeInformationIndex().toString(); String lockId = createAndAssignLock(fullyQualifiedMriKey,newPartition); - DatabaseOperations.createEmptyMriRow(music_ns,musicRangeInformationTableName,info.metricProcessId,lockId,info.partition.ranges); + createEmptyMriRow(info.getMetricProcessId(),lockId,new ArrayList<Range>()); throw new UnsupportedOperationException(); } + + /** + * Creates a new empty MRI row + * @param processId id of the process that is going to own initially this. + * @return uuid associated to the new row + */ + private UUID createEmptyMriRow(String processId, String lockId, List<Range> ranges) + throws MDBCServiceException { + UUID id = MDBCUtils.generateUniqueKey(); + return createEmptyMriRow(id,processId,lockId,ranges); + } + + /** + * Creates a new empty MRI row + * @param processId id of the process that is going to own initially this. + * @return uuid associated to the new row + */ + private UUID createEmptyMriRow(UUID id, String processId, String lockId, List<Range> ranges) + throws MDBCServiceException{ + StringBuilder insert = new StringBuilder("INSERT INTO ") + .append(this.music_ns) + .append('.') + .append(this.musicRangeInformationTableName) + .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ") + .append("(") + .append(id) + .append(",{"); + boolean first=true; + for(Range r: ranges){ + if(first){ first=false; } + else { + insert.append(','); + } + insert.append("'").append(r.toString()).append("'"); + } + insert.append("},'") + .append((lockId==null)?"":lockId) + .append("','") + .append(processId) + .append("',[]);"); + PreparedQueryObject query = new PreparedQueryObject(); + query.appendQueryString(insert.toString()); + try { + executeMusicLockedPut(this.music_ns,this.musicRangeInformationTableName,id.toString(),query,lockId,null); + } catch (MDBCServiceException e) { + logger.error("Initialization error: Failure to add new row to transaction information"); + throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); + } + return id; + } @Override - public void appendToRedoLog(MriReference mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException { - PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, mriRowId.index, musicTxDigestTableName, newRecord.tablePrimaryKey); - ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, mriRowId.index.toString(), appendQuery, partition.getLockId(), null); + public void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException { + PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, mriRowId, musicTxDigestTableName, newRecord.txId); + ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, mriRowId.toString(), appendQuery, partition.getLockId(), null); if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){ logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage()); throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage()); } } - + + public void createMusicTxDigest() throws MDBCServiceException { + createMusicTxDigest(-1); + } + + + /** + * This function creates the MusicTxDigest table. It contain information related to each transaction committed + * * LeaseId: id associated with the lease, text + * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later + * * TransactionDigest: text that contains all the changes in the transaction + */ + private void createMusicTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException { + String tableName = this.musicTxDigestTableName; + if(musicTxDigestTableNumber >= 0) { + tableName = tableName + + "-" + + Integer.toString(musicTxDigestTableNumber); + } + String priKey = "txid"; + StringBuilder fields = new StringBuilder(); + fields.append("txid uuid, "); + fields.append("transactiondigest text ");//notice lack of ',' + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey); + try { + executeMusicWriteQuery(this.music_ns,tableName,cql); + } catch (MDBCServiceException e) { + logger.error("Initialization error: Failure to create redo records table"); + throw(e); + } + } + + @Override - public void addTxDigest(String musicTxDigestTable, MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException { - DatabaseOperations.createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest); + public void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException { + //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest); + PreparedQueryObject query = new PreparedQueryObject(); + String cqlQuery = "INSERT INTO " + + this.music_ns + + '.' + + this.musicTxDigestTableName + + " (txid,transactiondigest) " + + "VALUES (" + + newId.txId + ",'" + + transactionDigest + + "');"; + query.appendQueryString(cqlQuery); + //\TODO check if I am not shooting on my own foot + try { + MusicCore.nonKeyRelatedPut(query,"critical"); + } catch (MusicServiceException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage()); + throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString()); + } } + - @Override - public PartitionInformation getPartitionInformation(DatabasePartition partition) throws MDBCServiceException { - //\TODO We may want to cache this information to avoid going to the database to obtain this simple information - MusicRangeInformationRow row = getMusicRangeInformation(partition); - return row.partition; - } @Override - public HashMap<Range,StagingTable> getTransactionDigest(MusicTxDigestId id) throws MDBCServiceException { - return DatabaseOperations.getTransactionDigest(music_ns, musicTxDigestTableName, id); + public HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException { + String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName); + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + pQueryObject.addValue(id.txId); + Row newRow; + try { + newRow = executeMusicUnlockedQuorumGet(pQueryObject); + } catch (MDBCServiceException e) { + logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.txId); + throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information"); + } + String digest = newRow.getString("transactiondigest"); + HashMap<Range,StagingTable> changes; + try { + changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest); + } catch (IOException e) { + logger.error("IOException when deserializing digest failed with an invalid class for id:"+id.txId); + throw new MDBCServiceException("Deserializng digest failed with ioexception"); + } catch (ClassNotFoundException e) { + logger.error("Deserializng digest failed with an invalid class for id:"+id.txId); + throw new MDBCServiceException("Deserializng digest failed with an invalid class"); + } + return changes; } @Override @@ -1247,4 +1425,78 @@ public class CassandraMixin implements MusicInterface { public void relinquish(String ownerId, String rangeId){ throw new UnsupportedOperationException(); } + + /** + * This method executes a write query in Music + * @param cql the CQL to be sent to Cassandra + */ + private static void executeMusicWriteQuery(String keyspace, String table, String cql) + throws MDBCServiceException { + PreparedQueryObject pQueryObject = new PreparedQueryObject(); + pQueryObject.appendQueryString(cql); + ResultType rt = null; + try { + rt = MusicCore.createTable(keyspace,table,pQueryObject,"critical"); + } catch (MusicServiceException e) { + //\TODO: handle better, at least transform into an MDBCServiceException + e.printStackTrace(); + } + String result = rt.getResult(); + if (result==null || result.toLowerCase().equals("failure")) { + throw new MDBCServiceException("Music eventual put failed"); + } + } + + private static Row executeMusicLockedGet(String keyspace, String table, PreparedQueryObject cqlObject, String primaryKey, + String lock) + throws MDBCServiceException{ + ResultSet result; + try { + result = MusicCore.criticalGet(keyspace,table,primaryKey,cqlObject,lock); + } catch(MusicServiceException e){ + //\TODO: handle better, at least transform into an MDBCServiceException + e.printStackTrace(); + throw new MDBCServiceException("Error executing critical get"); + } + if(result.isExhausted()){ + throw new MDBCServiceException("There is not a row that matches the id "+primaryKey); + } + return result.one(); + } + + private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject) + throws MDBCServiceException{ + ResultSet result = MusicCore.quorumGet(cqlObject); + //\TODO: handle better, at least transform into an MDBCServiceException + if(result.isExhausted()){ + throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]"); + } + return result.one(); + } + + private void executeMusicLockedPut(String namespace, String tableName, + String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId, + MusicCore.Condition conditionInfo) throws MDBCServiceException { + ReturnType rt ; + if(lockId==null) { + try { + rt = MusicCore.atomicPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, conditionInfo); + } catch (MusicLockingException e) { + logger.error("Music locked put failed"); + throw new MDBCServiceException("Music locked put failed"); + } catch (MusicServiceException e) { + logger.error("Music service fail: Music locked put failed"); + throw new MDBCServiceException("Music service fail: Music locked put failed"); + } catch (MusicQueryException e) { + logger.error("Music query fail: locked put failed"); + throw new MDBCServiceException("Music query fail: Music locked put failed"); + } + } + else { + rt = MusicCore.criticalPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, lockId, conditionInfo); + } + if (rt.getResult().getResult().toLowerCase().equals("failure")) { + throw new MDBCServiceException("Music locked put failed"); + } + } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java index 6a1219c..52b3036 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java @@ -27,10 +27,10 @@ import java.util.UUID; import org.json.JSONObject; import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.exceptions.MusicServiceException; import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; -import org.onap.music.mdbc.tables.PartitionInformation; import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.MriReference; @@ -81,8 +81,9 @@ public interface MusicInterface { /** * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables. * The keyspace name comes from the initialization properties passed to the JDBC driver. + * @throws MusicServiceException */ - void createKeyspace(); + void createKeyspace() throws MDBCServiceException; /** * This method performs all necessary initialization in Music/Cassandra to store the table <i>tableName</i>. * @param tableName the table to initialize MUSIC for @@ -170,23 +171,22 @@ public interface MusicInterface { */ void commitLog(DBInterface dbi, DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException; - MusicRangeInformationRow getMusicRangeInformation(DatabasePartition partition) throws MDBCServiceException; + MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException; DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException; - void appendToRedoLog(MriReference mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException; + void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException; - void addTxDigest(String musicTxDigestTable, MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException; + void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException; - PartitionInformation getPartitionInformation(DatabasePartition partition) throws MDBCServiceException; + HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException; - HashMap<Range,StagingTable> getTransactionDigest(MusicTxDigestId id) throws MDBCServiceException; - void own(List<Range> ranges); void appendRange(String rangeId, List<Range> ranges); void relinquish(String ownerId, String rangeId); + public List<UUID> getPartitionIndexes(); } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index 46d41d4..a7ea680 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -31,7 +31,6 @@ import org.onap.music.exceptions.MDBCServiceException; import org.onap.music.mdbc.DatabasePartition; import org.onap.music.mdbc.Range; import org.onap.music.mdbc.TableInfo; -import org.onap.music.mdbc.tables.PartitionInformation; import org.onap.music.mdbc.tables.MusicTxDigestId; import org.onap.music.mdbc.tables.StagingTable; import org.onap.music.mdbc.tables.MriReference; @@ -148,30 +147,6 @@ public class MusicMixin implements MusicInterface { public void updateDirtyRowAndEntityTableInMusic(String tableName, JSONObject changedRow, boolean isCritical) { } - - public static void loadProperties() { - Properties prop = new Properties(); - InputStream input = null; - try { - input = MusicMixin.class.getClassLoader().getResourceAsStream("mdbc.properties"); - prop.load(input); - String crTable = prop.getProperty("critical.tables"); - String[] tableArr = crTable.split(","); - criticalTables = Arrays.asList(tableArr); - - } catch (Exception ex) { - ex.printStackTrace(); - } finally { - if (input != null) { - try { - input.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - } - public static void releaseZKLocks(Set<LockId> lockIds) { for (LockId lockId : lockIds) { System.out.println("Releasing lock: " + lockId); @@ -208,14 +183,10 @@ public class MusicMixin implements MusicInterface { } @Override - public HashMap<Range, StagingTable> getTransactionDigest(MusicTxDigestId id) { + public HashMap<Range, StagingTable> getTxDigest(MusicTxDigestId id) { return null; } - @Override - public PartitionInformation getPartitionInformation(DatabasePartition partition) { - return null; - } @Override public DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) { @@ -223,11 +194,11 @@ public class MusicMixin implements MusicInterface { } @Override - public void appendToRedoLog(MriReference mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) { + public void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) { } @Override - public void addTxDigest(String musicTxDigestTable, MusicTxDigestId newId, String transactionDigest) { + public void addTxDigest(MusicTxDigestId newId, String transactionDigest) { } @Override @@ -246,7 +217,14 @@ public class MusicMixin implements MusicInterface { } @Override - public MusicRangeInformationRow getMusicRangeInformation(DatabasePartition partition){ + public List<UUID> getPartitionIndexes() { + // TODO Auto-generated method stub + return null; + } + + @Override + public MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException { + // TODO Auto-generated method stub return null; } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java index 23056e7..5943b34 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java @@ -573,10 +573,9 @@ NEW.field refers to the new value String op = rs.getString("OP"); OperationType opType = toOpEnum(op); String tbl = rs.getString("TABLENAME"); - String keydataStr = rs.getString("KEYDATA"); + JSONObject keydataStr = new JSONObject(new JSONTokener(rs.getString("KEYDATA"))); String newRowStr = rs.getString("NEWROWDATA"); JSONObject newRow = new JSONObject(new JSONTokener(newRowStr)); - String musicKey; TableInfo ti = getTableInfo(tbl); if (!ti.hasKey()) { //create music key @@ -586,26 +585,27 @@ NEW.field refers to the new value // the actual columns, otherwise performance when doing range queries are going // to be even worse (see the else bracket down) // - musicKey = msm.generateUniqueKey(); + String musicKey = msm.generateUniqueKey(); /*} else { //get key from data musicKey = msm.getMusicKeyFromRowWithoutPrimaryIndexes(tbl,newRow); }*/ newRow.put(msm.getMusicDefaultPrimaryKeyName(), musicKey); + keydataStr.put(msm.getMusicDefaultPrimaryKeyName(), musicKey); } - else { + /*else { //Use the keys musicKey = msm.getMusicKeyFromRow(tbl, newRow); if(musicKey.isEmpty()) { logger.error(EELFLoggerDelegate.errorLogger,"Primary key is invalid: ["+tbl+","+op+"]"); throw new NoSuchFieldException("Invalid operation enum"); } - } + }*/ Range range = new Range(tbl); if(!transactionDigests.containsKey(range)) { transactionDigests.put(range, new StagingTable()); } - transactionDigests.get(range).addOperation(musicKey, opType, newRow.toString()); + transactionDigests.get(range).addOperation(opType, newRow.toString(), keydataStr.toString()); rows.add(ix); } rs.getStatement().close(); diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java index cfa8771..86088f9 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java @@ -20,7 +20,6 @@ package org.onap.music.mdbc.mixins; import java.io.IOException; -import java.io.InputStream; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.sql.ResultSet; @@ -44,7 +43,7 @@ import com.datastax.driver.core.utils.Bytes; * @author Robert P. Eby */ public class Utils { - private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Utils.class); + public static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Utils.class); /** * Transforms and JsonObject into an array of objects @@ -168,7 +167,7 @@ public class Utils { Properties pr = null; try { pr = new Properties(); - pr.load(Utils.class.getResourceAsStream("/mdbc_driver.properties")); + pr.load(Utils.class.getResourceAsStream("/mdbc.properties")); } catch (IOException e) { logger.error(EELFLoggerDelegate.errorLogger, "Could not load property file > " + e.getMessage()); @@ -196,7 +195,7 @@ public class Utils { Properties pr = null; try { pr = new Properties(); - pr.load(Utils.class.getResourceAsStream("/mdbc_driver.properties")); + pr.load(Utils.class.getResourceAsStream("/mdbc.properties")); } catch (IOException e) { logger.error("Could not load property file > " + e.getMessage()); @@ -215,25 +214,4 @@ public class Utils { } } } - - public static Properties getMdbcProperties() { - Properties prop = new Properties(); - InputStream input = null; - try { - input = Utils.class.getClassLoader().getResourceAsStream("/mdbc.properties"); - prop.load(input); - } catch (Exception e) { - logger.warn(EELFLoggerDelegate.applicationLogger, "Could load mdbc.properties." - + "Proceeding with defaults " + e.getMessage()); - } finally { - if (input != null) { - try { - input.close(); - } catch (IOException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage()); - } - } - } - return prop; - } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java index 61c7bf1..69f2c31 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java @@ -22,11 +22,9 @@ package org.onap.music.mdbc.tables; import java.util.UUID; public final class MriReference { - public final String table; public final UUID index; - public MriReference(String table, UUID index) { - this.table = table; + public MriReference(UUID index) { this.index= index; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java index 6b67e5c..94011d7 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java @@ -22,20 +22,41 @@ package org.onap.music.mdbc.tables; import java.util.List; import java.util.UUID; +import org.onap.music.mdbc.DatabasePartition; + public final class MusicRangeInformationRow { - public final UUID index; - public final PartitionInformation partition; - public final List<MusicTxDigestId> redoLog; - public final String ownerId; - public final String metricProcessId; - - public MusicRangeInformationRow(UUID index, List<MusicTxDigestId> redoLog, PartitionInformation partition, - String ownerId, String metricProcessId) { - this.index = index; + private final DatabasePartition dbPartition; + //private final UUID partitionIndex; + private final List<MusicTxDigestId> redoLog; + private final String ownerId; + private final String metricProcessId; + + public MusicRangeInformationRow (DatabasePartition dbPartition, List<MusicTxDigestId> redoLog, + String ownerId, String metricProcessId) { + this.dbPartition = dbPartition; this.redoLog = redoLog; - this.partition = partition; this.ownerId = ownerId; this.metricProcessId = metricProcessId; } + /*public UUID getPartitionIndex() { + return dbPartition.getMusicRangeInformationIndex(); + } */ + + public DatabasePartition getDBPartition() { + return this.dbPartition; + } + + public List<MusicTxDigestId> getRedoLog() { + return redoLog; + } + + public String getOwnerId() { + return ownerId; + } + + public String getMetricProcessId() { + return metricProcessId; + } + } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java new file mode 100644 index 0000000..0a5bd60 --- /dev/null +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java @@ -0,0 +1,220 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ +package org.onap.music.mdbc.tables; + +import java.io.IOException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.json.JSONObject; +import org.onap.music.datastore.PreparedQueryObject; +import org.onap.music.exceptions.MDBCServiceException; +import org.onap.music.exceptions.MusicServiceException; +import org.onap.music.logging.EELFLoggerDelegate; +import org.onap.music.mdbc.DatabasePartition; +import org.onap.music.mdbc.MDBCUtils; +import org.onap.music.mdbc.MdbcServerLogic; +import org.onap.music.mdbc.Range; +import org.onap.music.mdbc.StateManager; +import org.onap.music.mdbc.configurations.NodeConfiguration; +import org.onap.music.mdbc.mixins.CassandraMixin; +import org.onap.music.mdbc.mixins.MusicInterface; + +import com.datastax.driver.core.Row; + +public class MusicTxDigest { + private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicTxDigest.class); + + //private MdbcServerLogic mdbcServer; + //private NodeConfiguration config; + private StateManager stateManager; + + public MusicTxDigest(StateManager stateManager) { + this.stateManager = stateManager; + } + + /** + * Parse the transaction digest into individual events + * @param digest - base 64 encoded, serialized digest + */ + public void replayTxDigest(HashMap<Range,StagingTable> digest) { + for (Map.Entry<Range,StagingTable> entry: digest.entrySet()) { + Range r = entry.getKey(); + StagingTable st = entry.getValue(); + ArrayList<Operation> opList = st.getOperationList(); + + for (Operation op: opList) { + replayOperation(r, op); + } + } + } + + /** + * Replays operation into local database + * @param r + * @param op + */ + private void replayOperation(Range r, Operation op) { + logger.info("Operation: " + op.getOperationType() + "->" + op.getNewVal()); + JSONObject jsonOp = op.getNewVal(); + JSONObject key = op.getKey(); + + ArrayList<String> cols = new ArrayList<String>(); + ArrayList<Object> vals = new ArrayList<Object>(); + Iterator<String> colIterator = jsonOp.keys(); + while(colIterator.hasNext()) { + String col = colIterator.next(); + //FIXME: should not explicitly refer to cassandramixin + if (col.equals(CassandraMixin.MDBC_PRIMARYKEY_NAME)) { + //reserved name + continue; + } + cols.add(col); + vals.add(jsonOp.get(col)); + } + + //build the queries + StringBuilder sql = new StringBuilder(); + String sep = ""; + switch (op.getOperationType()) { + case INSERT: + sql.append(op.getOperationType() + " INTO "); + sql.append(r.table + " (") ; + sep = ""; + for (String col: cols) { + sql.append(sep + col); + sep = ", "; + } + sql.append(") VALUES ("); + sep = ""; + for (Object val: vals) { + sql.append(sep + "\"" + val + "\""); + sep = ", "; + } + sql.append(");"); + logger.info(sql.toString()); + break; + case UPDATE: + sql.append(op.getOperationType() + " "); + sql.append(r.table + " SET "); + sep=""; + for (int i=0; i<cols.size(); i++) { + sql.append(sep + cols.get(i) + "=\"" + vals.get(i) +"\""); + sep = ", "; + } + sql.append(" WHERE "); + sql.append(getPrimaryKeyConditional(op.getKey())); + sql.append(";"); + logger.info(sql.toString()); + break; + case DELETE: + sql.append(op.getOperationType() + " FROM "); + sql.append(r.table + " WHERE "); + sql.append(getPrimaryKeyConditional(op.getKey())); + sql.append(";"); + logger.info(sql.toString()); + break; + case SELECT: + //no update happened, do nothing + break; + } + } + + private String getPrimaryKeyConditional(JSONObject primaryKeys) { + StringBuilder keyCondStmt = new StringBuilder(); + String and = ""; + for (String key: primaryKeys.keySet()) { + Object val = primaryKeys.get(key); + keyCondStmt.append(and + key + "=\"" + val + "\""); + and = " AND "; + } + return keyCondStmt.toString(); + } + + /** + * Runs the body of the background daemon + * @param daemonSleepTimeS time, in seconds, between updates + * @throws InterruptedException + */ + public void backgroundDaemon(int daemonSleepTimeS) throws InterruptedException { + MusicInterface mi = stateManager.getMusicInterface(); + while (true) { + //update + logger.info(String.format("[%s] Background MusicTxDigest daemon updating local db", + new Timestamp(System.currentTimeMillis()))); + + //1) get all other partitions from musicrangeinformation + List<UUID> partitions = mi.getPartitionIndexes(); + //2) for each partition I don't own + DatabasePartition myPartition = stateManager.getRanges(); + for (UUID partition: partitions) { + if (!partition.equals(myPartition.getMusicRangeInformationIndex())){ + try { + replayDigestForPartition(mi, partition); + } catch (MDBCServiceException e) { + logger.error("Unable to update for partition : " + partition + ". " + e.getMessage()); + continue; + } + } + } + Thread.sleep(TimeUnit.SECONDS.toMillis(daemonSleepTimeS)); + } + } + + public void replayDigestForPartition(MusicInterface mi, UUID partitionId) throws MDBCServiceException { + List<MusicTxDigestId> redoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog(); + for (MusicTxDigestId txId: redoLogTxIds) { + HashMap<Range, StagingTable> digest = mi.getTxDigest(txId); + replayTxDigest(digest); + } + //todo, keep track of where I am in pointer + } + + /** + * Start the background daemon defined by this object + * Spawns a new thread and runs "backgroundDaemon" + * @param daemonSleepTimeS time, in seconds, between updates run by daemon + */ + public void startBackgroundDaemon(int daemonSleepTimeS) { + class MusicTxBackgroundDaemon implements Runnable { + public void run() { + while (true) { + try { + logger.info("MusicTxDigest background daemon started"); + backgroundDaemon(daemonSleepTimeS); + } catch (InterruptedException e) { + logger.error("MusicTxDigest background daemon stopped " + e.getMessage()); + } + } + } + } + Thread t = new Thread(new MusicTxBackgroundDaemon()); + t.start(); + + } + + +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java index 33952e0..fda34e2 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java @@ -22,13 +22,13 @@ package org.onap.music.mdbc.tables; import java.util.UUID; public final class MusicTxDigestId { - public final UUID tablePrimaryKey; + public final UUID txId; public MusicTxDigestId(UUID primaryKey) { - this.tablePrimaryKey= primaryKey; + this.txId= primaryKey; } public boolean isEmpty() { - return (this.tablePrimaryKey==null); + return (this.txId==null); } } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java index 0c68575..0870be9 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java @@ -30,10 +30,12 @@ public final class Operation implements Serializable{ final OperationType TYPE; final String NEW_VAL; + final String KEY; - public Operation(OperationType type, String newVal) { + public Operation(OperationType type, String newVal, String key) { TYPE = type; NEW_VAL = newVal; + KEY = key; } public JSONObject getNewVal(){ @@ -41,6 +43,11 @@ public final class Operation implements Serializable{ return newRow; } + public JSONObject getKey() { + JSONObject key = new JSONObject(new JSONTokener(KEY)); + return key; + } + public OperationType getOperationType() { return this.TYPE; } diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/PartitionInformation.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/PartitionInformation.java deleted file mode 100755 index 6724860..0000000 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/PartitionInformation.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * ============LICENSE_START==================================================== - * org.onap.music.mdbc - * ============================================================================= - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END====================================================== - */ -package org.onap.music.mdbc.tables; - -import org.onap.music.mdbc.Range; - -import java.util.List; - -public class PartitionInformation { - public final List<Range> ranges; - - public PartitionInformation(List<Range> ranges) { - this.ranges=ranges; - } -} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java index d080c54..fcff5ff 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java @@ -20,6 +20,7 @@ package org.onap.music.mdbc.tables; import java.io.Serializable; +import java.util.ArrayList; import java.util.Deque; import java.util.HashMap; import java.util.LinkedList; @@ -36,32 +37,18 @@ public class StagingTable implements Serializable{ private static final long serialVersionUID = 7583182634761771943L; private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(StagingTable.class); //primary key -> Operation - private HashMap<String,Deque<Operation>> operations; + private ArrayList<Operation> operations; public StagingTable() { - operations = new HashMap<>(); + operations = new ArrayList<Operation>(); } - synchronized public void addOperation(String key, OperationType type, String newVal) { - if(!operations.containsKey(key)) { - operations.put(key, new LinkedList<>()); - } - operations.get(key).add(new Operation(type,newVal)); + synchronized public void addOperation(OperationType type, String newVal, String key) { + operations.add(new Operation(type,newVal, key)); } - synchronized public Deque<Pair<String,Operation>> getIterableSnapshot() throws NoSuchFieldException{ - Deque<Pair<String,Operation>> response=new LinkedList<Pair<String,Operation>>(); - //\TODO: check if we can just return the last change to a given key - Set<String> keys = operations.keySet(); - for(String key : keys) { - Deque<Operation> ops = operations.get(key); - if(ops.isEmpty()) { - logger.error(EELFLoggerDelegate.errorLogger, "Invalid state of the Operation data structure when creating snapshot"); - throw new NoSuchFieldException("Invalid state of the operation data structure"); - } - response.add(Pair.of(key,ops.getLast())); - } - return response; + synchronized public ArrayList<Operation> getOperationList() { + return operations; } synchronized public void clean() { diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java index d515539..0277902 100755 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java @@ -40,12 +40,6 @@ public class CreatePartition { private String mriIndex; @Parameter(names = { "-m", "--mri-table-name" }, required = true, description = "Mri Table name") - private String mriTable; - @Parameter(names = { "-r", "--music-tx-digest-table-name" }, required = true, - description = "Music Transaction Digest Table name") - private String mtxdTable; - @Parameter(names = { "-h", "-help", "--help" }, help = true, - description = "Print the help message") private boolean help = false; NodeConfiguration config; @@ -54,7 +48,7 @@ public class CreatePartition { } public void convert(){ - config = new NodeConfiguration(tables, UUID.fromString(mriIndex),mriTable,"test","", mtxdTable); + config = new NodeConfiguration(tables, UUID.fromString(mriIndex),"test",""); } public void saveToFile(){ diff --git a/mdbc-server/src/main/resources/mdbc.properties b/mdbc-server/src/main/resources/mdbc.properties index 3e207aa..d3feee2 100755 --- a/mdbc-server/src/main/resources/mdbc.properties +++ b/mdbc-server/src/main/resources/mdbc.properties @@ -8,5 +8,8 @@ MIXINS= \ org.onap.music.mdbc.mixins.CassandraMixin \ org.onap.music.mdbc.mixins.Cassandra2Mixin -critical.tables= \ - TEST
\ No newline at end of file +DEFAULT_DRIVERS=\ + org.h2.Driver \ + com.mysql.jdbc.Driver + +txdaemonsleeps=15
\ No newline at end of file diff --git a/mdbc-server/src/main/resources/mdbc_driver.properties b/mdbc-server/src/main/resources/mdbc_driver.properties deleted file mode 100755 index 487feb3..0000000 --- a/mdbc-server/src/main/resources/mdbc_driver.properties +++ /dev/null @@ -1,13 +0,0 @@ -# -# A list of all Mixins that should be checked by MDBC -# -MIXINS= \ - org.onap.music.mdbc.mixins.H2Mixin \ - org.onap.music.mdbc.mixins.H2ServerMixin \ - org.onap.music.mdbc.mixins.MySQLMixin \ - org.onap.music.mdbc.mixins.CassandraMixin \ - org.onap.music.mdbc.mixins.Cassandra2Mixin - -DEFAULT_DRIVERS=\ - org.h2.Driver \ - com.mysql.jdbc.Driver
\ No newline at end of file diff --git a/mdbc-server/src/main/resources/music.properties b/mdbc-server/src/main/resources/music.properties index 201651e..83dcb7c 100755 --- a/mdbc-server/src/main/resources/music.properties +++ b/mdbc-server/src/main/resources/music.properties @@ -1,8 +1,8 @@ cassandra.host =\ -135.197.226.108 + localhost cassandra.user =\ cassandra cassandra.password =\ cassandra zookeeper.host =\ -135.197.226.108 + localhost diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/DatabaseOperationsTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/DatabaseOperationsTest.java deleted file mode 100755 index 07c1451..0000000 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/DatabaseOperationsTest.java +++ /dev/null @@ -1,480 +0,0 @@ -/* - * ============LICENSE_START==================================================== - * org.onap.music.mdbc - * ============================================================================= - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END====================================================== - */ -package org.onap.music.mdbc; - -import com.datastax.driver.core.*; -import com.datastax.driver.core.exceptions.QueryExecutionException; -import com.datastax.driver.core.exceptions.SyntaxError; -import org.apache.commons.lang3.tuple.Pair; -//import org.cassandraunit.utils.EmbeddedCassandraServerHelper; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.Ignore; -import org.onap.music.datastore.CassaDataStore; -import org.onap.music.datastore.PreparedQueryObject; -import org.onap.music.exceptions.MDBCServiceException; -import org.onap.music.exceptions.MusicLockingException; -import org.onap.music.exceptions.MusicQueryException; -import org.onap.music.exceptions.MusicServiceException; -import org.onap.music.main.MusicCore; -import org.onap.music.main.MusicUtil; -import org.onap.music.main.ResultType; -import org.onap.music.main.ReturnType; -import org.onap.music.mdbc.tables.*; - - -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import static org.junit.Assert.*; - -@Ignore -public class DatabaseOperationsTest { - - final private String keyspace="metricmusictest"; - final private String mriTableName = "musicrangeinformation"; - final private String mtdTableName = "musictxdigest"; - - //Properties used to connect to music - private static Cluster cluster; - private static Session session; - private static String cassaHost = "localhost"; - - @BeforeClass - public static void init() throws MusicServiceException { - try { - // EmbeddedCassandraServerHelper.startEmbeddedCassandra(); - } catch (Exception e) { - System.out.println(e); - } - - cluster = new Cluster.Builder().addContactPoint(cassaHost).withPort(9142).build(); - cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(20000); - session = cluster.connect(); - - assertNotNull("Invalid configuration for cassandra", cluster); - session = cluster.connect(); - assertNotNull("Invalid configuration for cassandra", session); -// TestUtils.populateMusicUtilsWithProperties(prop); - CassaDataStore store = new CassaDataStore(cluster, session); - assertNotNull("Invalid configuration for music", store); - MusicCore.mDstoreHandle = store; - - } - - @AfterClass - public static void close() throws MusicServiceException, MusicQueryException { - - //TODO: shutdown cassandra - - } - - @Before - public void setUp() throws Exception { - // System.out.println("TEST 1: Getting ready for testing connection to Cassandra"); - //Create keyspace - - - createKeyspace(); - useKeyspace(); - } - - @After - public void tearDown() { - deleteKeyspace(); - } - - private void createKeyspace() { - String queryOp = "CREATE KEYSPACE " + - keyspace + - " WITH REPLICATION " + - "= {'class':'SimpleStrategy', 'replication_factor':1}; "; - ResultSet res=null; - try { - res = session.execute(queryOp); - } - catch(QueryExecutionException e){ - fail("Failure executing creation of keyspace with error: " + e.getMessage()); - } catch(SyntaxError e){ - fail("Failure executing creation of keyspace with syntax error: " + e.getMessage()); - } - assertTrue("Keyspace "+keyspace+" is already being used, please change it to avoid loosing data",res.wasApplied()); - } - - private void useKeyspace(){ - String queryBuilder = "USE " + - keyspace + - "; "; - ResultSet res = session.execute(queryBuilder); - assertTrue("Keyspace "+keyspace+" is already being used, please change it to avoid loosing data",res.wasApplied()); - } - - private void deleteKeyspace(){ - String queryBuilder = "DROP KEYSPACE " + - keyspace + - ";"; - ResultSet res = session.execute(queryBuilder); - assertTrue("Keyspace "+keyspace+" doesn't exist and it should",res.wasApplied()); - } - - private void CreateMTD(){ - try { - DatabaseOperations.createMusicTxDigest(keyspace, mtdTableName); - } catch (MDBCServiceException e) { - fail("Execution of creating music tx digest failed"); - } - } - - @Test - public void createMusicTxDigest() { - HashSet<String> expectedColumns = new HashSet<>( - Arrays.asList("txid","transactiondigest") - ); - HashMap<String,DataType> expectedTypes = new HashMap<>(); - expectedTypes.put("txid",DataType.uuid()); - expectedTypes.put("transactiondigest",DataType.text()); - CreateMTD(); - //check structure of table - CassaDataStore ds=null; - try { - ds = MusicCore.getDSHandle(); - } catch (MusicServiceException e) { - fail("Getting DS handle fail with error " + e.getErrorMessage()); - } - TableMetadata table = ds.returnColumnMetadata(keyspace,mtdTableName); - assertNotNull("Error obtaining metadata of table, there may be an error with its creation", table); - List<ColumnMetadata> columnsMeta = table.getColumns(); - checkDataTypeForTable(columnsMeta,expectedColumns,expectedTypes); - } - - @Test - public void createMusicRangeInformationTable() { - HashSet<String> expectedColumns = new HashSet<>( - Arrays.asList("rangeid","keys","txredolog","ownerid","metricprocessid") - ); - HashMap<String,DataType> expectedTypes = new HashMap<>(); - expectedTypes.put("rangeid",DataType.uuid()); - expectedTypes.put("keys",DataType.set(DataType.text())); - ProtocolVersion currentVer = cluster.getConfiguration().getProtocolOptions().getProtocolVersion(); - assertNotNull("Protocol version for cluster is invalid", currentVer); - CodecRegistry registry = cluster.getConfiguration().getCodecRegistry(); - assertNotNull("Codec registry for cluster is invalid", registry); - expectedTypes.put("txredolog",DataType.list(TupleType.of(currentVer,registry,DataType.text(),DataType.uuid()))); - expectedTypes.put("ownerid",DataType.text()); - expectedTypes.put("metricprocessid",DataType.text()); - try { - DatabaseOperations.createMusicRangeInformationTable(keyspace,mriTableName); - } catch (MDBCServiceException e) { - fail("Execution of creating music tx digest failed"); - } - //check structure of table - CassaDataStore ds=null; - try { - ds = MusicCore.getDSHandle(); - } catch (MusicServiceException e) { - fail("Getting DS handle fail with error " + e.getErrorMessage()); - } - TableMetadata table = ds.returnColumnMetadata(keyspace,mriTableName); - assertNotNull("Error obtaining metadata of table, there may be an error with its creation", table); - List<ColumnMetadata> columnsMeta = table.getColumns(); - checkDataTypeForTable(columnsMeta,expectedColumns,expectedTypes); - } - - private void checkDataTypeForTable(List<ColumnMetadata> columnsMeta, HashSet<String> expectedColumns, - HashMap<String,DataType> expectedTypes){ - for(ColumnMetadata cMeta : columnsMeta){ - String columnName = cMeta.getName(); - DataType type = cMeta.getType(); - assertTrue("Invalid column name: "+columnName,expectedColumns.contains(columnName)); - assertTrue("Fix the contents of expectedtypes for column: "+columnName, - expectedTypes.containsKey(columnName)); - assertEquals("Invalid type for column: "+columnName, - expectedTypes.get(columnName),type); - } - } - - private void createMRI(){ - try { - DatabaseOperations.createMusicRangeInformationTable(keyspace,mriTableName); - } catch (MDBCServiceException e) { - fail("Execution of creating music tx digest failed"); - } - } - - @Test - public void createEmptyMriRow() { - //Assume mri creation is working - createMRI(); - List<Range> ranges = new ArrayList<>(); - ranges.add(new Range("table1")); - ranges.add(new Range("table2")); - final String lockId = null; - String processId = "tcp://test:1234"; - UUID newRowId=null; - try { - newRowId = DatabaseOperations.createEmptyMriRow(keyspace,mriTableName,processId, - lockId, ranges); - } catch (MDBCServiceException e) { - fail("Adding a new empty mri row failed"); - } - getRowFromMriAndCompare(newRowId,ranges,lockId,processId); - } - - private String getLock(String table, MriReference mriIndex){ - String fullyQualifiedMriKey = keyspace+"."+ mriIndex.table+"."+mriIndex.index.toString(); - String lockId; - lockId = MusicCore.createLockReference(fullyQualifiedMriKey); - //\TODO Handle better failures to acquire locks - ReturnType lockReturn=null; - try { - lockReturn = MusicCore.acquireLock(fullyQualifiedMriKey,lockId); - } catch (MusicLockingException | MusicServiceException | MusicQueryException e) { - fail(e.getMessage()); - } - assertEquals(lockReturn.getResult(),ResultType.SUCCESS); - return lockId; - } - - private void releaseLock(MriReference mriIndex, String lock){ - String fullyQualifiedMriKey = keyspace+"."+ mriIndex.table+"."+mriIndex.index.toString(); - try { - MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey,lock); - } catch (MusicLockingException e) { - fail(e.getMessage()); - } - } - - private List<Range> getTestRanges(){ - List<Range> ranges = new ArrayList<>(); - ranges.add(new Range("table1")); - ranges.add(new Range("table2")); - return ranges; - } - - private String getTestProcessId(){ - return "tcp://test:1234"; - } - - private UUID CreateRowWithLockAndCheck(UUID newId, String lockId){ - - List<Range> ranges = getTestRanges(); - String processId = getTestProcessId(); - UUID newRowId=null; - try { - newRowId = DatabaseOperations.createEmptyMriRow(keyspace,mriTableName,newId, processId, lockId, ranges); - } catch (MDBCServiceException e) { - fail("Adding a new empty mri row failed"); - } - getRowFromMriAndCompare(newRowId,ranges,lockId,processId); - return newRowId; - } - - @Test - public void createEmptyMriRowWithLock() { - createMRI(); - //Assume mri creation is working - UUID newId = DatabaseOperations.generateUniqueKey(); - MriReference mriIndex = new MriReference(mriTableName,newId); - String lockId = getLock(mriTableName,mriIndex); - assertTrue("Error obtaining lock",!lockId.isEmpty()); - UUID newRowId = CreateRowWithLockAndCheck(newId,lockId); - assertEquals(newRowId,newId); - releaseLock(mriIndex,lockId); - } - - private void getRowFromMriAndCompare(UUID newRowId, List<Range> ranges, String lockId, String processId){ - lockId=(lockId==null)?"":lockId; - ResultSet res=null; - String queryOp = "SELECT * FROM " + - keyspace + "." + mriTableName + - " WHERE rangeid = " + - newRowId + - ";"; - try { - res = session.execute(queryOp); - } - catch(QueryExecutionException e){ - fail("Failure executing retrieval of row in MRU error: " + e.getMessage()); - } catch(SyntaxError e){ - fail("Failure executing retrieval of row with syntax error: " + e.getMessage()); - } - assertFalse(res.isExhausted()); - Row response = res.one(); - UUID id = response.get("rangeid",UUID.class); - assertEquals(id,newRowId); - Set<String> keys = response.getSet("keys",String.class); - for(Range r : ranges){ - assertTrue("Table was not found in retrieved keys",keys.contains(r.table)); - } - List<TupleValue> redo = response.getList("txredolog",TupleValue.class); - assertTrue(redo.isEmpty()); - String ownerId = response.getString("ownerid"); - assertEquals(ownerId,lockId); - String mpid= response.getString("metricprocessid"); - assertEquals(mpid,processId); - } - - @Test - public void getMriRow() { - createMRI(); - //Assume mri creation is working - UUID newId = DatabaseOperations.generateUniqueKey(); - MriReference mriIndex = new MriReference(mriTableName,newId); - String lockId = getLock(mriTableName,mriIndex); - assertTrue("Error obtaining lock",!lockId.isEmpty()); - UUID newRowId = CreateRowWithLockAndCheck(newId,lockId); - MusicRangeInformationRow mriRow=null; - try { - mriRow = DatabaseOperations.getMriRow(keyspace, mriTableName, newRowId, lockId); - } catch (MDBCServiceException e) { - fail(e.getErrorMessage()); - } - final List<Range> ranges = getTestRanges(); - String processId = getTestProcessId(); - assertEquals("invalid process id", mriRow.metricProcessId,processId); - assertEquals("invalid index", mriRow.index,newRowId); - assertEquals("invalid lock id",mriRow.ownerId,lockId); - assertTrue("redo log is not empty", mriRow.redoLog.isEmpty()); - List<Range> readRange = mriRow.partition.ranges; - List<Range> range = ranges; - for(Range r: range){ - boolean found = false; - for(Range rr : readRange) { - if(r.equals(rr)) { - found = true; - } - - } - assertTrue("ranges are incorrect", found); - } - } - - @Test - public void getTransactionDigest() { - CreateMTD(); - Range inputRange = new Range("table1"); - StagingTable inputStaging = new StagingTable(); - inputStaging.addOperation("key1", OperationType.INSERT,"1"); - HashMap<Range, StagingTable> input= new HashMap<>(); - input.put(inputRange, inputStaging); - MusicTxDigestId newId = new MusicTxDigestId(DatabaseOperations.generateUniqueKey()); - try { - DatabaseOperations.createTxDigestRow(keyspace,mtdTableName,newId,MDBCUtils.toString(input)); - } catch (MDBCServiceException e) { - fail("Adding a new mtd row failed"); - } catch (IOException e) { - fail("Fail compressing input staging tables"); - } - HashMap<Range, StagingTable> results=null; - try { - results = DatabaseOperations.getTransactionDigest(keyspace,mtdTableName,newId); - } catch (MDBCServiceException e) { - fail("Adding a new mtd row failed with error: "+e.getErrorMessage()); - } - assertTrue(results.containsKey(inputRange)); - StagingTable newStaging = results.get(inputRange); - Deque<Pair<String,Operation>> opers=null; - Deque<Pair<String,Operation>> initialOpers=null; - try { - opers=newStaging.getIterableSnapshot(); - initialOpers=inputStaging.getIterableSnapshot(); - } catch (NoSuchFieldException e) { - fail(e.getMessage()); - } - assertEquals("Operations are not equal",opers.size(),initialOpers.size()); - while(!opers.isEmpty()){ - Pair<String,Operation> recvOper = opers.getFirst(); - Pair<String,Operation> originalOper = initialOpers.getFirst(); - assertEquals(recvOper.getKey(),originalOper.getKey()); - assertEquals(recvOper.getValue(),originalOper.getValue()); - opers.removeFirst(); - initialOpers.removeFirst(); - } - } - - @Test - public void createNamespace() { - deleteKeyspace(); - try { - DatabaseOperations.createNamespace(keyspace,1); - } catch (MDBCServiceException e) { - fail(e.getErrorMessage()); - } - String describeOp = "USE "+keyspace+";"; - ResultSet res=null; - try { - res = session.execute(describeOp); - } - catch(QueryExecutionException e){ - fail("Failure executing retrieval of row in MRU error: " + e.getMessage()); - } catch(SyntaxError e){ - fail("Failure executing retrieval of row with syntax error: " + e.getMessage()); - } - assertTrue("Error with keyspace: "+keyspace, res.wasApplied()); - } - - private void getRowFromMtdAndCompare(MusicTxDigestId newId, String transactionDigest){ - ResultSet res=null; - String queryOp = "SELECT * FROM " + - keyspace + "." + mtdTableName+ - " WHERE txid = " + - newId.tablePrimaryKey + - ";"; - try { - res = session.execute(queryOp); - } - catch(QueryExecutionException e){ - fail("Failure executing retrieval of row in MTD error: " + e.getMessage()); - } catch(SyntaxError e){ - fail("Failure executing retrieval of row in MTD with syntax error: " + e.getMessage()); - } - assertFalse(res.isExhausted()); - Row response = res.one(); - UUID id = response.getUUID("txId"); - assertEquals(id,newId.tablePrimaryKey); - String digest = response.getString("transactiondigest"); - assertEquals(digest,transactionDigest); - } - - @Test - public void createTxDigestRow(){ - CreateMTD(); - MusicTxDigestId newId = new MusicTxDigestId(DatabaseOperations.generateUniqueKey()); - String transactionDigest = "newdigest"; - try { - DatabaseOperations.createTxDigestRow(keyspace,mtdTableName,newId,transactionDigest); - } catch (MDBCServiceException e) { - fail("Adding a new empty mtd row failed"); - } - getRowFromMtdAndCompare(newId,transactionDigest); - - } - -} diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java index a02578e..676d760 100755 --- a/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java @@ -37,7 +37,8 @@ public class MDBCUtilsTest { @Test public void toStringTest1() { StagingTable table = new StagingTable(); - table.addOperation("test",OperationType.INSERT,(new JSONObject(new String[]{"test3", "Test4"})).toString()); + table.addOperation(OperationType.INSERT,(new JSONObject(new String[]{"test3", "Test4"})).toString(), + (new JSONObject(new String[]{"test_key", "test_value"})).toString()); String output=null; try { output = MDBCUtils.toString(table); @@ -53,7 +54,8 @@ public class MDBCUtilsTest { public void toStringTest2() { HashMap<String,StagingTable> mapToSerialize = new HashMap<>(); StagingTable table = new StagingTable(); - table.addOperation("test",OperationType.INSERT,(new JSONObject(new String[]{"test3", "Test4"})).toString()); + table.addOperation(OperationType.INSERT,(new JSONObject(new String[]{"test3", "Test4"}).toString()), + (new JSONObject(new String[]{"test_key", "test_value"})).toString()); mapToSerialize.put("table",table); String output=null; try { diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java new file mode 100644 index 0000000..eab38d3 --- /dev/null +++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java @@ -0,0 +1,42 @@ +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ + +package org.onap.music.mdbc; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.HashMap; + +import org.junit.Test; +import org.onap.music.mdbc.tables.MusicTxDigest; +import org.onap.music.mdbc.tables.StagingTable; + +public class MusicTxDigestTest { + + @Test + public void test() throws Exception { + MusicTxDigest txDigest = new MusicTxDigest(null); + String t1 = "rO0ABXNyABFqYXZhLnV0aWwuSGFzaE1hcAUH2sHDFmDRAwACRgAKbG9hZEZhY3RvckkACXRocmVzaG9sZHhwP0AAAAAAAAx3CAAAABAAAAABc3IAGW9yZy5vbmFwLm11c2ljLm1kYmMuUmFuZ2UWWoOV+3nB2AIAAUwABXRhYmxldAASTGphdmEvbGFuZy9TdHJpbmc7eHB0AAdwZXJzb25zc3IAJ29yZy5vbmFwLm11c2ljLm1kYmMudGFibGVzLlN0YWdpbmdUYWJsZWk84G3L4tunAgABTAAKb3BlcmF0aW9uc3QAFUxqYXZhL3V0aWwvQXJyYXlMaXN0O3hwc3IAE2phdmEudXRpbC5BcnJheUxpc3R4gdIdmcdhnQMAAUkABHNpemV4cAAAAAV3BAAAAAVzcgAkb3JnLm9uYXAubXVzaWMubWRiYy50YWJsZXMuT3BlcmF0aW9u7yJhSJSWe0ACAANMAANLRVlxAH4AA0wAB05FV19WQUxxAH4AA0wABFRZUEV0ACpMb3JnL29uYXAvbXVzaWMvbWRiYy90YWJsZXMvT3BlcmF0aW9uVHlwZTt4cHQAJHsiUGVyc29uSUQiOjEsIkxhc3ROYW1lIjoiTWFydGluZXoifXQAWXsiQWRkcmVzcyI6IktBQ0IiLCJQZXJzb25JRCI6MSwiRmlyc3ROYW1lIjoiSnVhbiIsIkNpdHkiOiJBVExBTlRBIiwiTGFzdE5hbWUiOiJNYXJ0aW5leiJ9fnIAKG9yZy5vbmFwLm11c2ljLm1kYmMudGFibGVzLk9wZXJhdGlvblR5cGUAAAAAAAAAABIAAHhyAA5qYXZhLmxhbmcuRW51bQAAAAAAAAAAEgAAeHB0AAZJTlNFUlRzcQB+AAt0ACR7IlBlcnNvbklEIjoxLCJMYXN0TmFtZSI6Ik1hcnRpbmV6In10AFl7IkFkZHJlc3MiOiJLQUNCIiwiUGVyc29uSUQiOjEsIkZpcnN0TmFtZSI6Ikp1YW4iLCJDaXR5IjoiQVRMQU5UQSIsIkxhc3ROYW1lIjoiTWFydGluZXoifX5xAH4AEHQABkRFTEVURXNxAH4AC3QAIXsiUGVyc29uSUQiOjIsIkxhc3ROYW1lIjoiU21pdGgifXQAWXsiQWRkcmVzcyI6IkdOT0MiLCJQZXJzb25JRCI6MiwiRmlyc3ROYW1lIjoiSk9ITiIsIkNpdHkiOiJCRURNSU5TVEVSIiwiTGFzdE5hbWUiOiJTbWl0aCJ9cQB+ABJzcQB+AAt0ACF7IlBlcnNvbklEIjoyLCJMYXN0TmFtZSI6IlNtaXRoIn10AFl7IkFkZHJlc3MiOiJHTk9DIiwiUGVyc29uSUQiOjIsIkZpcnN0TmFtZSI6IkpPU0giLCJDaXR5IjoiQkVETUlOU1RFUiIsIkxhc3ROYW1lIjoiU21pdGgifX5xAH4AEHQABlVQREFURXNxAH4AC3QAIXsiUGVyc29uSUQiOjIsIkxhc3ROYW1lIjoiU21pdGgifXQAWXsiQWRkcmVzcyI6IkdOT0MiLCJQZXJzb25JRCI6MiwiRmlyc3ROYW1lIjoiSk9ITiIsIkNpdHkiOiJCRURNSU5TVEVSIiwiTGFzdE5hbWUiOiJTbWl0aCJ9cQB+AB94eA=="; + HashMap<Range, StagingTable> digest = (HashMap<Range, StagingTable>) MDBCUtils.fromString(t1); + txDigest.replayTxDigest(digest); + } + +} |