aboutsummaryrefslogtreecommitdiffstats
path: root/mdbc-server
diff options
context:
space:
mode:
authorBharath Balasubramanian <bharathb@research.att.com>2018-11-20 12:53:38 +0000
committerGerrit Code Review <gerrit@onap.org>2018-11-20 12:53:38 +0000
commit83db3eb4ccf7e636f586a6873a966e14ba1685ae (patch)
treed86101926e3474ceebf0da40945d8d65479123de /mdbc-server
parent14186ff595b31035d401b71111dc75da1c80a807 (diff)
parent5a7cf00e87cacf97130d3f1823adce824f865c69 (diff)
Merge "TxDigest replay and code restructure"
Diffstat (limited to 'mdbc-server')
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java4
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/DatabaseOperations.java335
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java57
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java40
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java21
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java14
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java3
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java62
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java6
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java18
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java16
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java6
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java376
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java16
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java44
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java12
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java28
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java4
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java41
-rw-r--r--mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java220
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java6
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java9
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/PartitionInformation.java32
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java27
-rwxr-xr-xmdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java8
-rwxr-xr-xmdbc-server/src/main/resources/mdbc.properties7
-rwxr-xr-xmdbc-server/src/main/resources/mdbc_driver.properties13
-rwxr-xr-xmdbc-server/src/main/resources/music.properties4
-rwxr-xr-xmdbc-server/src/test/java/org/onap/music/mdbc/DatabaseOperationsTest.java480
-rwxr-xr-xmdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java6
-rw-r--r--mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java42
31 files changed, 810 insertions, 1147 deletions
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
index 6bda739..7e39772 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
@@ -34,4 +34,8 @@ public class Configuration {
public static final String MUSIC_MIXIN_DEFAULT = "cassandra2";//"cassandra2";
/** Default cassandra ulr*/
public static final String CASSANDRA_URL_DEFAULT = "localhost";//"cassandra2";
+ /** Name of Tx Digest Update Daemon sleep time */
+ public static final String TX_DAEMON_SLEEPTIME_S = "txdaemonsleeps";
+ /** Default txDigest Daemon sleep time */
+ public static final String TX_DAEMON_SLEEPTIME_S_DEFAULT = "10";
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabaseOperations.java b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabaseOperations.java
deleted file mode 100755
index e10fe96..0000000
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabaseOperations.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * ============LICENSE_START====================================================
- * org.onap.music.mdbc
- * =============================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END======================================================
- */
-package org.onap.music.mdbc;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.TupleValue;
-import org.onap.music.exceptions.MDBCServiceException;
-import org.onap.music.logging.EELFLoggerDelegate;
-import org.onap.music.datastore.PreparedQueryObject;
-import org.onap.music.exceptions.MusicLockingException;
-import org.onap.music.exceptions.MusicQueryException;
-import org.onap.music.exceptions.MusicServiceException;
-import org.onap.music.main.MusicCore;
-import org.onap.music.main.ResultType;
-import org.onap.music.main.ReturnType;
-import org.onap.music.mdbc.tables.MusicRangeInformationRow;
-import org.onap.music.mdbc.tables.MusicTxDigestId;
-import org.onap.music.mdbc.tables.PartitionInformation;
-import org.onap.music.mdbc.tables.StagingTable;
-
-import java.io.IOException;
-import java.util.*;
-
-import com.datastax.driver.core.utils.UUIDs;
-
-public class DatabaseOperations {
- private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(DatabaseOperations.class);
- /**
- * This functions is used to generate cassandra uuid
- * @return a random UUID that can be used for fields of type uuid
- */
- public static UUID generateUniqueKey() {
- return UUIDs.random();
- }
-
- public static void createMusicTxDigest(String musicNamespace, String musicTxDigestTableName)
- throws MDBCServiceException {
- createMusicTxDigest(musicNamespace,musicTxDigestTableName,-1);
- }
-
- /**
- * This function creates the MusicTxDigest table. It contain information related to each transaction committed
- * * LeaseId: id associated with the lease, text
- * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
- * * TransactionDigest: text that contains all the changes in the transaction
- */
- public static void createMusicTxDigest(String musicNamespace, String musicTxDigestTableName,
- int musicTxDigestTableNumber) throws MDBCServiceException {
- String tableName = musicTxDigestTableName;
- if(musicTxDigestTableNumber >= 0) {
- tableName = tableName +
- "-" +
- Integer.toString(musicTxDigestTableNumber);
- }
- String priKey = "txid";
- StringBuilder fields = new StringBuilder();
- fields.append("txid uuid, ");
- fields.append("transactiondigest text ");//notice lack of ','
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
- try {
- executeMusicWriteQuery(musicNamespace,tableName,cql);
- } catch (MDBCServiceException e) {
- logger.error("Initialization error: Failure to create redo records table");
- throw(e);
- }
- }
-
- /**
- * This function creates the TransactionInformation table. It contain information related
- * to the transactions happening in a given partition.
- * * The schema of the table is
- * * Id, uiid.
- * * Partition, uuid id of the partition
- * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables
- * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables
- * * Redo: list of uiids associated to the Redo Records Table
- *
- */
- public static void createMusicRangeInformationTable(String musicNamespace, String musicRangeInformationTableName) throws MDBCServiceException {
- String tableName = musicRangeInformationTableName;
- String priKey = "rangeid";
- StringBuilder fields = new StringBuilder();
- fields.append("rangeid uuid, ");
- fields.append("keys set<text>, ");
- fields.append("ownerid text, ");
- fields.append("metricprocessid text, ");
- //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly
- fields.append("txredolog list<frozen<tuple<text,uuid>>> ");
- String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
- try {
- executeMusicWriteQuery(musicNamespace,tableName,cql);
- } catch (MDBCServiceException e) {
- logger.error("Initialization error: Failure to create transaction information table");
- throw(e);
- }
- }
-
- /**
- * Creates a new empty tit row
- * @param namespace namespace where the tit table is located
- * @param mriTableName name of the corresponding mri table where the new row is added
- * @param processId id of the process that is going to own initially this.
- * @return uuid associated to the new row
- */
- public static UUID createEmptyMriRow(String namespace, String mriTableName,
- String processId, String lockId, List<Range> ranges) throws MDBCServiceException {
- UUID id = generateUniqueKey();
- return createEmptyMriRow(namespace,mriTableName,id,processId,lockId,ranges);
- }
-
- public static UUID createEmptyMriRow(String namespace, String mriTableName, UUID id, String processId, String lockId,
- List<Range> ranges) throws MDBCServiceException{
- StringBuilder insert = new StringBuilder("INSERT INTO ")
- .append(namespace)
- .append('.')
- .append(mriTableName)
- .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ")
- .append("(")
- .append(id)
- .append(",{");
- boolean first=true;
- for(Range r: ranges){
- if(first){ first=false; }
- else {
- insert.append(',');
- }
- insert.append("'").append(r.toString()).append("'");
- }
- insert.append("},'")
- .append((lockId==null)?"":lockId)
- .append("','")
- .append(processId)
- .append("',[]);");
- PreparedQueryObject query = new PreparedQueryObject();
- query.appendQueryString(insert.toString());
- try {
- executeLockedPut(namespace,mriTableName,id.toString(),query,lockId,null);
- } catch (MDBCServiceException e) {
- logger.error("Initialization error: Failure to add new row to transaction information");
- throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
- }
- return id;
- }
-
- public static MusicRangeInformationRow getMriRow(String namespace, String mriTableName, UUID id, String lockId)
- throws MDBCServiceException{
- String cql = String.format("SELECT * FROM %s.%s WHERE rangeid = ?;", namespace, mriTableName);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- pQueryObject.addValue(id);
- Row newRow;
- try {
- newRow = executeLockedGet(namespace,mriTableName,pQueryObject,id.toString(),lockId);
- } catch (MDBCServiceException e) {
- logger.error("Get operationt error: Failure to get row from MRI "+mriTableName);
- throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
- }
-// public MusicRangeInformationRow(UUID index, List<MusicTxDigestId> redoLog, PartitionInformation partition,
- // String ownerId, String metricProcessId) {
- List<TupleValue> log = newRow.getList("txredolog",TupleValue.class);
- List<MusicTxDigestId> digestIds = new ArrayList<>();
- for(TupleValue t: log){
- //final String tableName = t.getString(0);
- final UUID index = t.getUUID(1);
- digestIds.add(new MusicTxDigestId(index));
- }
- List<Range> partitions = new ArrayList<>();
- Set<String> tables = newRow.getSet("keys",String.class);
- for (String table:tables){
- partitions.add(new Range(table));
- }
- return new MusicRangeInformationRow(id,digestIds,new PartitionInformation(partitions),newRow.getString("ownerid"),newRow.getString("metricprocessid"));
-
- }
-
- public static HashMap<Range,StagingTable> getTransactionDigest(String namespace, String musicTxDigestTable, MusicTxDigestId id)
- throws MDBCServiceException{
- String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", namespace, musicTxDigestTable);
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- pQueryObject.addValue(id.tablePrimaryKey);
- Row newRow;
- try {
- newRow = executeUnlockedQuorumGet(pQueryObject);
- } catch (MDBCServiceException e) {
- logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.tablePrimaryKey);
- throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
- }
- String digest = newRow.getString("transactiondigest");
- HashMap<Range,StagingTable> changes;
- try {
- changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest);
- } catch (IOException e) {
- logger.error("IOException when deserializing digest failed with an invalid class for id:"+id.tablePrimaryKey);
- throw new MDBCServiceException("Deserializng digest failed with ioexception");
- } catch (ClassNotFoundException e) {
- logger.error("Deserializng digest failed with an invalid class for id:"+id.tablePrimaryKey);
- throw new MDBCServiceException("Deserializng digest failed with an invalid class");
- }
- return changes;
- }
-
- /**
- * This method executes a write query in Music
- * @param cql the CQL to be sent to Cassandra
- */
- protected static void executeMusicWriteQuery(String keyspace, String table, String cql)
- throws MDBCServiceException {
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
- ResultType rt = null;
- try {
- rt = MusicCore.createTable(keyspace,table,pQueryObject,"critical");
- } catch (MusicServiceException e) {
- //\TODO: handle better, at least transform into an MDBCServiceException
- e.printStackTrace();
- }
- String result = rt.getResult();
- if (result==null || result.toLowerCase().equals("failure")) {
- throw new MDBCServiceException("Music eventual put failed");
- }
- }
-
- protected static Row executeLockedGet(String keyspace, String table, PreparedQueryObject cqlObject, String primaryKey,
- String lock)
- throws MDBCServiceException{
- ResultSet result;
- try {
- result = MusicCore.criticalGet(keyspace,table,primaryKey,cqlObject,lock);
- } catch(MusicServiceException e){
- //\TODO: handle better, at least transform into an MDBCServiceException
- e.printStackTrace();
- throw new MDBCServiceException("Error executing critical get");
- }
- if(result.isExhausted()){
- throw new MDBCServiceException("There is not a row that matches the id "+primaryKey);
- }
- return result.one();
- }
-
- protected static Row executeUnlockedQuorumGet(PreparedQueryObject cqlObject)
- throws MDBCServiceException{
- ResultSet result = MusicCore.quorumGet(cqlObject);
- //\TODO: handle better, at least transform into an MDBCServiceException
- if(result.isExhausted()){
- throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]");
- }
- return result.one();
- }
-
- protected static void executeLockedPut(String namespace, String tableName,
- String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId,
- MusicCore.Condition conditionInfo) throws MDBCServiceException {
- ReturnType rt ;
- if(lockId==null) {
- try {
- rt = MusicCore.atomicPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, conditionInfo);
- } catch (MusicLockingException e) {
- logger.error("Music locked put failed");
- throw new MDBCServiceException("Music locked put failed");
- } catch (MusicServiceException e) {
- logger.error("Music service fail: Music locked put failed");
- throw new MDBCServiceException("Music service fail: Music locked put failed");
- } catch (MusicQueryException e) {
- logger.error("Music query fail: locked put failed");
- throw new MDBCServiceException("Music query fail: Music locked put failed");
- }
- }
- else {
- rt = MusicCore.criticalPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, lockId, conditionInfo);
- }
- if (rt.getResult().getResult().toLowerCase().equals("failure")) {
- throw new MDBCServiceException("Music locked put failed");
- }
- }
-
- public static void createNamespace(String namespace, int replicationFactor) throws MDBCServiceException {
- Map<String,Object> replicationInfo = new HashMap<>();
- replicationInfo.put("'class'", "'SimpleStrategy'");
- replicationInfo.put("'replication_factor'", replicationFactor);
-
- PreparedQueryObject queryObject = new PreparedQueryObject();
- queryObject.appendQueryString(
- "CREATE KEYSPACE " + namespace + " WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":"));
-
- try {
- MusicCore.nonKeyRelatedPut(queryObject, "critical");
- } catch (MusicServiceException e) {
- if (!e.getMessage().equals("Keyspace "+namespace+" already exists")) {
- logger.error("Error creating namespace: "+namespace);
- throw new MDBCServiceException("Error creating namespace: "+namespace+". Internal error:"+e.getErrorMessage());
- }
- }
- }
-
- public static void createTxDigestRow(String namespace, String musicTxDigestTable, MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException {
- PreparedQueryObject query = new PreparedQueryObject();
- String cqlQuery = "INSERT INTO " +
- namespace +
- '.' +
- musicTxDigestTable +
- " (txid,transactiondigest) " +
- "VALUES (" +
- newId.tablePrimaryKey + ",'" +
- transactionDigest +
- "');";
- query.appendQueryString(cqlQuery);
- //\TODO check if I am not shooting on my own foot
- try {
- MusicCore.nonKeyRelatedPut(query,"critical");
- } catch (MusicServiceException e) {
- logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.tablePrimaryKey.toString()+ "with error "+e.getErrorMessage());
- throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.tablePrimaryKey.toString());
- }
- }
-
-}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
index e6b4e0e..2ca621a 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
@@ -37,9 +37,7 @@ import org.onap.music.mdbc.tables.MriReference;
public class DatabasePartition {
private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(DatabasePartition.class);
- private String musicRangeInformationTable;//Table that currently contains the REDO log for this partition
private UUID musicRangeInformationIndex;//Index that can be obtained either from
- private String musicTxDigestTable;
private String lockId;
protected List<Range> ranges;
@@ -48,57 +46,25 @@ public class DatabasePartition {
* The only requirement is that the ranges are not overlapping.
*/
- public DatabasePartition() {
- ranges = new ArrayList<>();
+ public DatabasePartition(UUID mriIndex) {
+ this(new ArrayList<Range>(), mriIndex,"");
}
- public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String mriTable, String lockId, String musicTxDigestTable) {
- if(knownRanges != null) {
- ranges = knownRanges;
- }
- else {
- ranges = new ArrayList<>();
- }
-
- if(musicTxDigestTable != null) {
- this.setMusicTxDigestTable(musicTxDigestTable);
- }
- else{
- this.setMusicTxDigestTable("");
- }
-
+ public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String lockId) {
+ ranges = knownRanges;
+
if(mriIndex != null) {
this.setMusicRangeInformationIndex(mriIndex);
}
else {
this.setMusicRangeInformationIndex(null);
}
+ this.setLockId(lockId);
- if(mriTable != null) {
- this.setMusicRangeInformationTable(mriTable);
- }
- else {
- this.setMusicRangeInformationTable("");
- }
-
- if(lockId != null) {
- this.setLockId(lockId);
- }
- else {
- this.setLockId("");
- }
}
- public String getMusicRangeInformationTable() {
- return musicRangeInformationTable;
- }
-
- public void setMusicRangeInformationTable(String musicRangeInformationTable) {
- this.musicRangeInformationTable = musicRangeInformationTable;
- }
-
- public MriReference getMusicRangeInformationIndex() {
- return new MriReference(musicRangeInformationTable,musicRangeInformationIndex);
+ public UUID getMusicRangeInformationIndex() {
+ return musicRangeInformationIndex;
}
public void setMusicRangeInformationIndex(UUID musicRangeInformationIndex) {
@@ -180,11 +146,4 @@ public class DatabasePartition {
this.lockId = lockId;
}
- public String getMusicTxDigestTable() {
- return musicTxDigestTable;
- }
-
- public void setMusicTxDigestTable(String musicTxDigestTable) {
- this.musicTxDigestTable = musicTxDigestTable;
- }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
index 4d43177..2442af1 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
@@ -20,18 +20,26 @@
package org.onap.music.mdbc;
import java.io.*;
+import java.util.ArrayList;
import java.util.Base64;
import java.util.Deque;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.logging.format.AppMessages;
import org.onap.music.logging.format.ErrorSeverity;
import org.onap.music.logging.format.ErrorTypes;
+import org.onap.music.mdbc.mixins.CassandraMixin;
+import org.onap.music.mdbc.mixins.Utils;
import org.onap.music.mdbc.tables.Operation;
import org.onap.music.mdbc.tables.StagingTable;
-import javassist.bytecode.Descriptor.Iterator;
+import com.datastax.driver.core.utils.UUIDs;
import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONObject;
@@ -86,4 +94,32 @@ public class MDBCUtils {
}
}
-}
+ /**
+ * This functions is used to generate cassandra uuid
+ * @return a random UUID that can be used for fields of type uuid
+ */
+ public static UUID generateUniqueKey() {
+ return UUIDs.random();
+ }
+
+ public static Properties getMdbcProperties() {
+ Properties prop = new Properties();
+ InputStream input = null;
+ try {
+ input = Utils.class.getClassLoader().getResourceAsStream("mdbc.properties");
+ prop.load(input);
+ } catch (Exception e) {
+ Utils.logger.warn(EELFLoggerDelegate.applicationLogger, "Could not load mdbc.properties."
+ + "Proceeding with defaults " + e.getMessage());
+ } finally {
+ if (input != null) {
+ try {
+ input.close();
+ } catch (IOException e) {
+ Utils.logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
+ }
+ }
+ }
+ return prop;
+ }
+} \ No newline at end of file
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
index d4c0933..08f6e1e 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
@@ -20,6 +20,7 @@
package org.onap.music.mdbc;
import org.onap.music.mdbc.configurations.NodeConfiguration;
+import org.onap.music.mdbc.tables.MusicTxDigest;
import org.apache.calcite.avatica.remote.Driver.Serialization;
import org.apache.calcite.avatica.remote.LocalService;
import org.apache.calcite.avatica.server.HttpServer;
@@ -34,7 +35,7 @@ import java.util.Locale;
import java.util.Properties;
public class MdbcServer {
- public static final EELFLoggerDelegate LOG = EELFLoggerDelegate.getLogger(MdbcStatement.class);
+ private static final EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MdbcStatement.class);
@Parameter(names = { "-c", "--configuration" }, required = true,
description = "This is the file that contains the ranges that are assigned to this MDBC server")
@@ -64,10 +65,11 @@ public class MdbcServer {
private NodeConfiguration config;
private HttpServer server;
+ private MdbcServerLogic meta;
public void start() {
if (null != server) {
- LOG.error("The server was already started");
+ logger.error("The server was already started");
Unsafe.systemExit(ExitCodes.ALREADY_STARTED.ordinal());
return;
}
@@ -78,7 +80,7 @@ public class MdbcServer {
Properties connectionProps = new Properties();
connectionProps.put("user", user);
connectionProps.put("password", password);
- MdbcServerLogic meta = new MdbcServerLogic(url,connectionProps,config);
+ meta = new MdbcServerLogic(url,connectionProps,config);
LocalService service = new LocalService(meta);
// Construct the server
@@ -89,13 +91,14 @@ public class MdbcServer {
// Then start it
server.start();
-
- LOG.info("Started Avatica server on port {} with serialization {}", server.getPort(),
+
+ logger.info("Started Avatica server on port {} with serialization {}", server.getPort(),
serialization);
} catch (Exception e) {
- LOG.error("Failed to start Avatica server", e);
+ logger.error("Failed to start Avatica server", e);
Unsafe.systemExit(ExitCodes.START_FAILED.ordinal());
- }
+ }
+
}
public void stop() {
@@ -125,9 +128,9 @@ public class MdbcServer {
Runtime.getRuntime().addShutdownHook(
new Thread(new Runnable() {
@Override public void run() {
- LOG.info("Stopping server");
+ logger.info("Stopping server");
server.stop();
- LOG.info("Server stopped");
+ logger.info("Server stopped");
}
}));
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
index 33c5dbb..cccea92 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
@@ -29,6 +29,8 @@ import java.util.concurrent.TimeUnit;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.mdbc.configurations.NodeConfiguration;
+import org.onap.music.mdbc.tables.MusicTxDigest;
+
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
@@ -48,20 +50,16 @@ public class MdbcServerLogic extends JdbcMeta{
private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MdbcServerLogic.class);
StateManager manager;
- DatabasePartition ranges;
String name;
- String sqlDatabase;
//TODO: Delete this properties after debugging
private final Properties info;
private final Cache<String, Connection> connectionCache;
- public MdbcServerLogic(String Url, Properties info,NodeConfiguration config) throws SQLException, MDBCServiceException {
+ public MdbcServerLogic(String Url, Properties info, NodeConfiguration config) throws SQLException, MDBCServiceException {
super(Url,info);
- this.ranges = config.partition;
this.name = config.nodeName;
- this.sqlDatabase = config.sqlDatabaseName;
- this.manager = new StateManager(Url,info,this.ranges,this.sqlDatabase);
+ this.manager = new StateManager(Url,info,config.partition,"test"); //FIXME: db name should not be passed in ahead of time
this.info = info;
int concurrencyLevel = Integer.parseInt(
info.getProperty(ConnectionCacheSettings.CONCURRENCY_LEVEL.key(),
@@ -87,6 +85,10 @@ public class MdbcServerLogic extends JdbcMeta{
.build();
}
+ public StateManager getStateManager() {
+ return this.manager;
+ }
+
@Override
protected Connection getConnection(String id) throws SQLException {
if (id == null) {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java
index d44d907..c2019cf 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/MusicSqlManager.java
@@ -79,7 +79,7 @@ public class MusicSqlManager {
*/
public MusicSqlManager(String url, Connection conn, Properties info, MusicInterface mi) throws MDBCServiceException {
try {
- info.putAll(Utils.getMdbcProperties());
+ info.putAll(MDBCUtils.getMdbcProperties());
String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT);
this.dbi = MixinFactory.createDBInterface(mixinDb, this, url, conn, info);
this.mi = mi;
@@ -254,6 +254,7 @@ public class MusicSqlManager {
return mi.getMusicKeyFromRowWithoutPrimaryIndexes(ti,table, dbRow);
}
+ @Deprecated
public String getMusicKeyFromRow(String table, JSONObject dbRow) {
TableInfo ti = dbi.getTableInfo(table);
return mi.getMusicKeyFromRow(ti,table, dbRow);
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
index 2e47726..4bb0c85 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
@@ -20,6 +20,7 @@
package org.onap.music.mdbc;
import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.logging.format.AppMessages;
import org.onap.music.logging.format.ErrorSeverity;
@@ -27,6 +28,7 @@ import org.onap.music.logging.format.ErrorTypes;
import org.onap.music.mdbc.mixins.MixinFactory;
import org.onap.music.mdbc.mixins.MusicInterface;
import org.onap.music.mdbc.mixins.MusicMixin;
+import org.onap.music.mdbc.tables.MusicTxDigest;
import org.onap.music.mdbc.tables.TxCommitProgress;
import java.io.IOException;
@@ -53,26 +55,25 @@ public class StateManager {
* that are created by the MDBC Server
* @see MusicInterface
*/
- private MusicInterface musicManager;
+ private MusicInterface musicInterface;
/**
* This is the Running Queries information table.
* It mainly contains information about the entities
* that have being committed so far.
*/
private TxCommitProgress transactionInfo;
-
private Map<String,MdbcConnection> mdbcConnections;
-
private String sqlDatabase;
-
private String url;
+ String musicmixin;
+ String cassandraUrl;
private Properties info;
@SuppressWarnings("unused")
private DatabasePartition ranges;
-
- public StateManager(String url, Properties info, DatabasePartition ranges, String sqlDatabase) throws MDBCServiceException {
+
+ public StateManager(String url, Properties info, DatabasePartition ranges, String sqlDatabase) throws MDBCServiceException {
this.sqlDatabase = sqlDatabase;
this.ranges = ranges;
this.url = url;
@@ -81,28 +82,40 @@ public class StateManager {
//\fixme this is not really used, delete!
try {
info.load(this.getClass().getClassLoader().getResourceAsStream("music.properties"));
+ info.putAll(MDBCUtils.getMdbcProperties());
} catch (IOException e) {
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
}
- String cassandraUrl = info.getProperty(Configuration.KEY_CASSANDRA_URL, Configuration.CASSANDRA_URL_DEFAULT);
- String mixin = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT);
- init(mixin, cassandraUrl);
+ cassandraUrl = info.getProperty(Configuration.KEY_CASSANDRA_URL, Configuration.CASSANDRA_URL_DEFAULT);
+ musicmixin = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT);
+
+ initMusic();
+ initSqlDatabase();
+
+
+ MusicTxDigest txDaemon = new MusicTxDigest(this);
+ txDaemon.startBackgroundDaemon(Integer.parseInt(
+ info.getProperty(Configuration.TX_DAEMON_SLEEPTIME_S, Configuration.TX_DAEMON_SLEEPTIME_S_DEFAULT)));
}
- protected void init(String mixin, String cassandraUrl) throws MDBCServiceException {
- this.musicManager = MixinFactory.createMusicInterface(mixin, cassandraUrl, info);
- this.musicManager.createKeyspace();
+ /**
+ * Initialize the
+ * @param mixin
+ * @param cassandraUrl
+ * @throws MDBCServiceException
+ */
+ protected void initMusic() throws MDBCServiceException {
+ this.musicInterface = MixinFactory.createMusicInterface(musicmixin, cassandraUrl, info);
+ this.musicInterface.createKeyspace();
try {
- this.musicManager.initializeMetricDataStructures();
+ this.musicInterface.initializeMetricDataStructures();
} catch (MDBCServiceException e) {
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR);
throw(e);
}
- MusicMixin.loadProperties();
this.mdbcConnections = new HashMap<>();
- initSqlDatabase();
}
-
+
protected void initSqlDatabase() throws MDBCServiceException {
try {
//\TODO: pass the driver as a variable
@@ -125,6 +138,19 @@ public class StateManager {
}
}
+ public MusicInterface getMusicInterface() {
+ return this.musicInterface;
+ }
+
+ public DatabasePartition getRanges() {
+ return ranges;
+ }
+
+ public void setRanges(DatabasePartition ranges) {
+ this.ranges = ranges;
+ }
+
+
public void CloseConnection(String connectionId){
//\TODO check if there is a race condition
if(mdbcConnections.containsKey(connectionId)) {
@@ -163,7 +189,7 @@ public class StateManager {
}
//Create MDBC connection
try {
- newConnection = new MdbcConnection(id, this.url+"/"+this.sqlDatabase, sqlConnection, info, this.musicManager, transactionInfo,ranges);
+ newConnection = new MdbcConnection(id, this.url+"/"+this.sqlDatabase, sqlConnection, info, this.musicInterface, transactionInfo,ranges);
} catch (MDBCServiceException e) {
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
newConnection = null;
@@ -213,7 +239,7 @@ public class StateManager {
}
//Create MDBC connection
try {
- newConnection = new MdbcConnection(id,this.url+"/"+this.sqlDatabase, sqlConnection, info, this.musicManager, transactionInfo,ranges);
+ newConnection = new MdbcConnection(id,this.url+"/"+this.sqlDatabase, sqlConnection, info, this.musicInterface, transactionInfo,ranges);
} catch (MDBCServiceException e) {
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
newConnection = null;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
index fb4656c..ad86ada 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
@@ -36,14 +36,12 @@ public class NodeConfiguration {
private static transient final EELFLoggerDelegate LOG = EELFLoggerDelegate.getLogger(NodeConfiguration.class);
- public String sqlDatabaseName;
public DatabasePartition partition;
public String nodeName;
- public NodeConfiguration(String tables, UUID mriIndex, String mriTableName, String sqlDatabaseName, String node, String redoRecordsTable){
+ public NodeConfiguration(String tables, UUID mriIndex, String mriTableName, String node){
// public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String mriTable, String lockId, String musicTxDigestTable) {
- partition = new DatabasePartition(toRanges(tables), mriIndex, mriTableName, null, redoRecordsTable) ;
- this.sqlDatabaseName = sqlDatabaseName;
+ partition = new DatabasePartition(toRanges(tables), mriIndex, null) ;
this.nodeName = node;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
index 77df15f..f3f5d22 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
@@ -21,9 +21,11 @@ package org.onap.music.mdbc.configurations;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
-import org.onap.music.mdbc.DatabaseOperations;
+import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.RedoRow;
+import org.onap.music.mdbc.mixins.CassandraMixin;
+import org.onap.music.mdbc.tables.MusicTxDigest;
import com.google.gson.Gson;
import org.onap.music.datastore.PreparedQueryObject;
@@ -62,7 +64,7 @@ public class TablesConfiguration {
*/
public List<NodeConfiguration> initializeAndCreateNodeConfigurations() throws MDBCServiceException {
initInternalNamespace();
- DatabaseOperations.createNamespace(musicNamespace, internalReplicationFactor);
+
List<NodeConfiguration> nodeConfigs = new ArrayList<>();
if(partitions == null){
logger.error("Partitions was not correctly initialized");
@@ -70,12 +72,8 @@ public class TablesConfiguration {
}
for(PartitionInformation partitionInfo : partitions){
String mriTableName = partitionInfo.mriTableName;
- mriTableName = (mriTableName==null || mriTableName.isEmpty())?TIT_TABLE_NAME:mriTableName;
//0) Create the corresponding Music Range Information table
- DatabaseOperations.createMusicRangeInformationTable(musicNamespace,mriTableName);
- String musicTxDigestTableName = partitionInfo.mtxdTableName;
- musicTxDigestTableName = (musicTxDigestTableName==null || musicTxDigestTableName.isEmpty())? MUSIC_TX_DIGEST_TABLE_NAME :musicTxDigestTableName;
- DatabaseOperations.createMusicTxDigest(musicNamespace,musicTxDigestTableName);
+
String partitionId;
if(partitionInfo.partitionId==null || partitionInfo.partitionId.isEmpty()){
if(partitionInfo.replicationFactor==0){
@@ -90,7 +88,7 @@ public class TablesConfiguration {
partitionId = partitionInfo.partitionId;
}
//2) Create a row in the transaction information table
- UUID mriTableIndex = DatabaseOperations.createEmptyMriRow(musicNamespace,mriTableName,"",null,partitionInfo.getTables());
+ UUID mriTableIndex = MDBCUtils.generateUniqueKey();
//3) Add owner and tit information to partition info table
RedoRow newRedoRow = new RedoRow(mriTableName,mriTableIndex);
//DatabaseOperations.updateRedoRow(musicNamespace,pitName,partitionId,newRedoRow,partitionInfo.owner,null);
@@ -105,13 +103,13 @@ public class TablesConfiguration {
for(Range r: partitionInfo.tables){
newStr.append(r.toString()).append(",");
}
- nodeConfigs.add(new NodeConfiguration(newStr.toString(),mriTableIndex,mriTableName,sqlDatabaseName,partitionInfo.owner,musicTxDigestTableName));
+ nodeConfigs.add(new NodeConfiguration(newStr.toString(),mriTableIndex,
+ sqlDatabaseName, partitionInfo.owner));
}
return nodeConfigs;
}
private void initInternalNamespace() throws MDBCServiceException {
- DatabaseOperations.createNamespace(internalNamespace,internalReplicationFactor);
StringBuilder createKeysTableCql = new StringBuilder("CREATE TABLE IF NOT EXISTS ")
.append(internalNamespace)
.append(".unsynced_keys (key text PRIMARY KEY);");
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java
index 68d836b..eede9be 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/EtdbTestClient.java
@@ -51,7 +51,7 @@ public class EtdbTestClient {
}
Connection connection;
try {
- connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000;serialization=protobuf");
+ connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf");
} catch (SQLException e) {
e.printStackTrace();
return;
@@ -70,7 +70,8 @@ public class EtdbTestClient {
" LastName varchar(255),\n" +
" FirstName varchar(255),\n" +
" Address varchar(255),\n" +
- " City varchar(255)\n" +
+ " City varchar(255),\n" +
+ " PRIMARY KEY (PersonID,LastName)" +
");";
Statement stmt;
try {
@@ -103,6 +104,12 @@ public class EtdbTestClient {
}
final String insertSQL = "INSERT INTO Persons VALUES (1, 'Martinez', 'Juan', 'KACB', 'ATLANTA');";
+ final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=1;";
+ final String insertSQL2 = "INSERT INTO Persons VALUES (2, 'Smith', 'JOHN', 'GNOC', 'BEDMINSTER');";
+ final String insertSQL3 = "UPDATE Persons SET FirstName='JOSH' WHERE LastName='Smith';";
+ final String insertSQL4 = "UPDATE Persons SET FirstName='JOHN' WHERE LastName='Smith';";
+
+
Statement insertStmt;
try {
insertStmt = connection.createStatement();
@@ -113,6 +120,11 @@ public class EtdbTestClient {
try {
execute = insertStmt.execute(insertSQL);
+ execute = insertStmt.execute(insertSQL1);
+ execute = insertStmt.execute(insertSQL2);
+ execute = insertStmt.execute(insertSQL3);
+ execute = insertStmt.execute(insertSQL4);
+
} catch (SQLException e) {
e.printStackTrace();
return;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java
index 97c1102..0732dc8 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Cassandra2Mixin.java
@@ -30,6 +30,7 @@ import java.util.Properties;
import org.json.JSONObject;
import org.json.JSONTokener;
import org.onap.music.datastore.PreparedQueryObject;
+import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.main.MusicCore;
import org.onap.music.main.ReturnType;
@@ -57,7 +58,7 @@ public class Cassandra2Mixin extends CassandraMixin {
super();
}
- public Cassandra2Mixin(String url, Properties info) throws MusicServiceException {
+ public Cassandra2Mixin(String url, Properties info) throws MDBCServiceException {
super(url, info);
}
@@ -80,9 +81,10 @@ public class Cassandra2Mixin extends CassandraMixin {
/**
* This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables.
* The keyspace name comes from the initialization properties passed to the JDBC driver.
+ * @throws MusicServiceException
*/
@Override
- public void createKeyspace() {
+ public void createKeyspace() throws MDBCServiceException {
super.createKeyspace();
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java
index a1f325e..3b1651f 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/CassandraMixin.java
@@ -35,12 +35,11 @@ import java.util.TreeSet;
import java.util.UUID;
import org.onap.music.mdbc.*;
-import org.onap.music.mdbc.DatabaseOperations;
-import org.onap.music.mdbc.tables.PartitionInformation;
import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigest;
import org.onap.music.mdbc.tables.TxCommitProgress;
import org.json.JSONObject;
@@ -50,6 +49,7 @@ import org.onap.music.exceptions.MusicLockingException;
import org.onap.music.exceptions.MusicQueryException;
import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.main.MusicCore;
+import org.onap.music.main.MusicCore.Condition;
import org.onap.music.main.ResultType;
import org.onap.music.main.ReturnType;
@@ -62,6 +62,7 @@ import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TupleValue;
/**
* This class provides the methods that MDBC needs to access Cassandra directly in order to provide persistence
@@ -93,8 +94,8 @@ public class CassandraMixin implements MusicInterface {
public static final String KEY_MUSIC_RFACTOR = "music_rfactor";
/** The property name to use to provide the replication factor for Cassandra. */
public static final String KEY_MUSIC_NAMESPACE = "music_namespace";
- /** The default property value to use for the Cassandra keyspace. */
- public static final String DEFAULT_MUSIC_KEYSPACE = "mdbc";
+ /** Namespace for the tables in MUSIC (Cassandra) */
+ public static final String DEFAULT_MUSIC_NAMESPACE = "namespace";
/** The default property value to use for the Cassandra IP address. */
public static final String DEFAULT_MUSIC_ADDRESS = "localhost";
/** The default property value to use for the Cassandra replication factor. */
@@ -103,8 +104,7 @@ public class CassandraMixin implements MusicInterface {
public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid";
/** Type of the primary key, if none is defined by the user */
public static final String MDBC_PRIMARYKEY_TYPE = "uuid";
- /** Namespace for the tables in MUSIC (Cassandra) */
- public static final String DEFAULT_MUSIC_NAMESPACE = "namespace";
+
//\TODO Add logic to change the names when required and create the tables when necessary
private String musicTxDigestTableName = "musictxdigest";
@@ -155,7 +155,7 @@ public class CassandraMixin implements MusicInterface {
this.allReplicaIds = null;
}
- public CassandraMixin(String url, Properties info) throws MusicServiceException {
+ public CassandraMixin(String url, Properties info) throws MDBCServiceException {
// Default values -- should be overridden in the Properties
// Default to using the host_ids of the various peers as the replica IDs (this is probably preferred)
this.musicAddress = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS);
@@ -173,11 +173,15 @@ public class CassandraMixin implements MusicInterface {
this.music_ns = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE);
logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns);
- musicRangeInformationTableName = "musicrangeinformation";
- createMusicKeyspace();
+ createKeyspace();
}
- private void createMusicKeyspace() throws MusicServiceException {
+ /**
+ * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables.
+ * The keyspace name comes from the initialization properties passed to the JDBC driver.
+ */
+ @Override
+ public void createKeyspace() throws MDBCServiceException {
Map<String,Object> replicationInfo = new HashMap<>();
replicationInfo.put("'class'", "'SimpleStrategy'");
@@ -185,15 +189,14 @@ public class CassandraMixin implements MusicInterface {
PreparedQueryObject queryObject = new PreparedQueryObject();
queryObject.appendQueryString(
- "CREATE KEYSPACE " + this.music_ns + " WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":"));
+ "CREATE KEYSPACE IF NOT EXISTS " + this.music_ns +
+ " WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":"));
try {
MusicCore.nonKeyRelatedPut(queryObject, "eventual");
- } catch (MusicServiceException e) {
- if (e.getMessage().equals("Keyspace "+this.music_ns+" already exists")) {
- // ignore
- } else {
- throw(e);
+ } catch (MusicServiceException e) {
+ if (!e.getMessage().equals("Keyspace "+music_ns+" already exists")) {
+ throw new MDBCServiceException("Error creating namespace: "+music_ns+". Internal error:"+e.getErrorMessage());
}
}
}
@@ -234,26 +237,13 @@ public class CassandraMixin implements MusicInterface {
@Override
public void initializeMetricDataStructures() throws MDBCServiceException {
try {
- DatabaseOperations.createMusicTxDigest(music_ns, musicTxDigestTableName);//\TODO If we start partitioning the data base, we would need to use the redotable number
- DatabaseOperations.createMusicRangeInformationTable(music_ns, musicRangeInformationTableName);
+ createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number
+ createMusicRangeInformationTable();
}
catch(MDBCServiceException e){
logger.error(EELFLoggerDelegate.errorLogger,"Error creating tables in MUSIC");
}
}
-
- /**
- * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables.
- * The keyspace name comes from the initialization properties passed to the JDBC driver.
- */
- @Override
- public void createKeyspace() {
- if (keyspace_created == false) {
- String cql = String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : %d };", music_ns, music_rfactor);
- executeMusicWriteQuery(cql);
- keyspace_created = true;
- }
- }
/**
* This method performs all necessary initialization in Music/Cassandra to store the table <i>tableName</i>.
@@ -286,7 +276,7 @@ public class CassandraMixin implements MusicInterface {
fields.append(pfx).append(MDBC_PRIMARYKEY_NAME)
.append(" ")
.append(MDBC_PRIMARYKEY_TYPE);
- prikey.append("mdbc_cuid");
+ prikey.append(MDBC_PRIMARYKEY_NAME);
}
String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", music_ns, tableName, fields.toString(), prikey.toString());
executeMusicWriteQuery(cql);
@@ -928,7 +918,7 @@ public class CassandraMixin implements MusicInterface {
* Return the function for cassandra's primary key generation
*/
public String generateUniqueKey() {
- return DatabaseOperations.generateUniqueKey().toString();
+ return MDBCUtils.generateUniqueKey().toString();
}
@Override
@@ -986,7 +976,7 @@ public class CassandraMixin implements MusicInterface {
String pfx = "";
for(String keyCol: keyCols) {
key.append(pfx);
- key.append(row.getString(keyCol));
+ key.append(row.get(keyCol));
pfx = ",";
}
String keyStr = key.toString();
@@ -1036,7 +1026,7 @@ public class CassandraMixin implements MusicInterface {
}
protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException {
- MriReference mriIndex = partition.getMusicRangeInformationIndex();
+ UUID mriIndex = partition.getMusicRangeInformationIndex();
String lockId;
lockId = MusicCore.createLockReference(fullyQualifiedKey);
//\TODO Handle better failures to acquire locks
@@ -1058,11 +1048,13 @@ public class CassandraMixin implements MusicInterface {
try {
MusicCore.forciblyReleaseLock(fullyQualifiedKey,lockId);
CassaLockStore lockingServiceHandle = MusicCore.getLockingServiceHandle();
- CassaLockStore.LockObject lockOwner = lockingServiceHandle.peekLockQueue(music_ns, partition.getMusicRangeInformationTable(), mriIndex.index.toString());
+ CassaLockStore.LockObject lockOwner = lockingServiceHandle.peekLockQueue(music_ns,
+ this.musicRangeInformationTableName, mriIndex.toString());
while(lockOwner.lockRef != lockId) {
MusicCore.forciblyReleaseLock(fullyQualifiedKey, lockOwner.lockRef);
try {
- lockOwner = lockingServiceHandle.peekLockQueue(music_ns, partition.getMusicRangeInformationTable(), mriIndex.index.toString());
+ lockOwner = lockingServiceHandle.peekLockQueue(music_ns,
+ this.musicRangeInformationTableName, mriIndex.toString());
} catch(NullPointerException e){
//Ignore null pointer exception
lockId = MusicCore.createLockReference(fullyQualifiedKey);
@@ -1102,12 +1094,12 @@ public class CassandraMixin implements MusicInterface {
@Override
public void commitLog(DBInterface dbi, DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{
- MriReference mriIndex = partition.getMusicRangeInformationIndex();
+ UUID mriIndex = partition.getMusicRangeInformationIndex();
if(mriIndex==null) {
//\TODO Fetch MriIndex from the Range Information Table
throw new MDBCServiceException("TIT Index retrieval not yet implemented");
}
- String fullyQualifiedMriKey = music_ns+"."+ mriIndex.table+"."+mriIndex.index.toString();
+ String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex.toString();
//0. See if reference to lock was already created
String lockId = partition.getLockId();
if(lockId == null || lockId.isEmpty()) {
@@ -1133,7 +1125,7 @@ public class CassandraMixin implements MusicInterface {
throw new MDBCServiceException("Failed to serialized transaction digest with error "+e.toString());
}
MusicTxDigestId digestId = new MusicTxDigestId(commitId);
- addTxDigest(musicTxDigestTableName, digestId, serializedTransactionDigest);
+ addTxDigest(digestId, serializedTransactionDigest);
//2. Save RRT index to RQ
if(progressKeeper!= null) {
progressKeeper.setRecordId(txId,digestId);
@@ -1188,49 +1180,235 @@ public class CassandraMixin implements MusicInterface {
}
return objects;
}
-
+
+ @Override
+ public List<UUID> getPartitionIndexes() {
+ ArrayList<UUID> partitions = new ArrayList<UUID>();
+ String cql = String.format("SELECT rangeid FROM %s.%s", music_ns, musicRangeInformationTableName);
+ ResultSet rs = executeMusicRead(cql);
+ for (Row r: rs) {
+ partitions.add(r.getUUID("rangeid"));
+ }
+ return partitions;
+ }
+
@Override
- public MusicRangeInformationRow getMusicRangeInformation(DatabasePartition partition) throws MDBCServiceException {
+ public MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException {
//TODO: verify that lock id is valid before calling the database operations function
- MriReference reference = partition.getMusicRangeInformationIndex();
- return DatabaseOperations.getMriRow(music_ns,reference.table,reference.index,partition.getLockId());
+ //UUID id = partition.getMusicRangeInformationIndex();
+
+ String cql = String.format("SELECT * FROM %s.%s WHERE rangeid = ?;", music_ns, musicRangeInformationTableName);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ pQueryObject.addValue(partitionIndex);
+ Row newRow;
+ try {
+ newRow = executeMusicUnlockedQuorumGet(pQueryObject);
+ } catch (MDBCServiceException e) {
+ logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName);
+ throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
+ }
+
+ List<TupleValue> log = newRow.getList("txredolog",TupleValue.class);
+ List<MusicTxDigestId> digestIds = new ArrayList<>();
+ for(TupleValue t: log){
+ //final String tableName = t.getString(0);
+ final UUID index = t.getUUID(1);
+ digestIds.add(new MusicTxDigestId(index));
+ }
+ List<Range> partitions = new ArrayList<>();
+ Set<String> tables = newRow.getSet("keys",String.class);
+ for (String table:tables){
+ partitions.add(new Range(table));
+ }
+ return new MusicRangeInformationRow(new DatabasePartition(partitions, partitionIndex, ""),
+ digestIds, newRow.getString("ownerid"),newRow.getString("metricprocessid"));
}
+
+ /**
+ * This function creates the TransactionInformation table. It contain information related
+ * to the transactions happening in a given partition.
+ * * The schema of the table is
+ * * Id, uiid.
+ * * Partition, uuid id of the partition
+ * * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables
+ * * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables
+ * * Redo: list of uiids associated to the Redo Records Table
+ *
+ */
+ private void createMusicRangeInformationTable() throws MDBCServiceException {
+ String tableName = this.musicRangeInformationTableName;
+ String priKey = "rangeid";
+ StringBuilder fields = new StringBuilder();
+ fields.append("rangeid uuid, ");
+ fields.append("keys set<text>, ");
+ fields.append("ownerid text, ");
+ fields.append("metricprocessid text, ");
+ //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly
+ fields.append("txredolog list<frozen<tuple<text,uuid>>> ");
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));",
+ this.music_ns, tableName, fields, priKey);
+ try {
+ executeMusicWriteQuery(this.music_ns,tableName,cql);
+ } catch (MDBCServiceException e) {
+ logger.error("Initialization error: Failure to create transaction information table");
+ throw(e);
+ }
+ }
+
+
@Override
public DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException {
- DatabasePartition newPartition = new DatabasePartition(info.partition.ranges,info.index,
- musicRangeInformationTableName,null,musicTxDigestTableName);
- String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+info.index.toString();
+ DatabasePartition newPartition = info.getDBPartition();
+ String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+newPartition.getMusicRangeInformationIndex().toString();
String lockId = createAndAssignLock(fullyQualifiedMriKey,newPartition);
- DatabaseOperations.createEmptyMriRow(music_ns,musicRangeInformationTableName,info.metricProcessId,lockId,info.partition.ranges);
+ createEmptyMriRow(info.getMetricProcessId(),lockId,new ArrayList<Range>());
throw new UnsupportedOperationException();
}
+
+ /**
+ * Creates a new empty MRI row
+ * @param processId id of the process that is going to own initially this.
+ * @return uuid associated to the new row
+ */
+ private UUID createEmptyMriRow(String processId, String lockId, List<Range> ranges)
+ throws MDBCServiceException {
+ UUID id = MDBCUtils.generateUniqueKey();
+ return createEmptyMriRow(id,processId,lockId,ranges);
+ }
+
+ /**
+ * Creates a new empty MRI row
+ * @param processId id of the process that is going to own initially this.
+ * @return uuid associated to the new row
+ */
+ private UUID createEmptyMriRow(UUID id, String processId, String lockId, List<Range> ranges)
+ throws MDBCServiceException{
+ StringBuilder insert = new StringBuilder("INSERT INTO ")
+ .append(this.music_ns)
+ .append('.')
+ .append(this.musicRangeInformationTableName)
+ .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ")
+ .append("(")
+ .append(id)
+ .append(",{");
+ boolean first=true;
+ for(Range r: ranges){
+ if(first){ first=false; }
+ else {
+ insert.append(',');
+ }
+ insert.append("'").append(r.toString()).append("'");
+ }
+ insert.append("},'")
+ .append((lockId==null)?"":lockId)
+ .append("','")
+ .append(processId)
+ .append("',[]);");
+ PreparedQueryObject query = new PreparedQueryObject();
+ query.appendQueryString(insert.toString());
+ try {
+ executeMusicLockedPut(this.music_ns,this.musicRangeInformationTableName,id.toString(),query,lockId,null);
+ } catch (MDBCServiceException e) {
+ logger.error("Initialization error: Failure to add new row to transaction information");
+ throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
+ }
+ return id;
+ }
@Override
- public void appendToRedoLog(MriReference mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException {
- PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, mriRowId.index, musicTxDigestTableName, newRecord.tablePrimaryKey);
- ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, mriRowId.index.toString(), appendQuery, partition.getLockId(), null);
+ public void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException {
+ PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, mriRowId, musicTxDigestTableName, newRecord.txId);
+ ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, mriRowId.toString(), appendQuery, partition.getLockId(), null);
if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage());
throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage());
}
}
-
+
+ public void createMusicTxDigest() throws MDBCServiceException {
+ createMusicTxDigest(-1);
+ }
+
+
+ /**
+ * This function creates the MusicTxDigest table. It contain information related to each transaction committed
+ * * LeaseId: id associated with the lease, text
+ * * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
+ * * TransactionDigest: text that contains all the changes in the transaction
+ */
+ private void createMusicTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException {
+ String tableName = this.musicTxDigestTableName;
+ if(musicTxDigestTableNumber >= 0) {
+ tableName = tableName +
+ "-" +
+ Integer.toString(musicTxDigestTableNumber);
+ }
+ String priKey = "txid";
+ StringBuilder fields = new StringBuilder();
+ fields.append("txid uuid, ");
+ fields.append("transactiondigest text ");//notice lack of ','
+ String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey);
+ try {
+ executeMusicWriteQuery(this.music_ns,tableName,cql);
+ } catch (MDBCServiceException e) {
+ logger.error("Initialization error: Failure to create redo records table");
+ throw(e);
+ }
+ }
+
+
@Override
- public void addTxDigest(String musicTxDigestTable, MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException {
- DatabaseOperations.createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest);
+ public void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException {
+ //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest);
+ PreparedQueryObject query = new PreparedQueryObject();
+ String cqlQuery = "INSERT INTO " +
+ this.music_ns +
+ '.' +
+ this.musicTxDigestTableName +
+ " (txid,transactiondigest) " +
+ "VALUES (" +
+ newId.txId + ",'" +
+ transactionDigest +
+ "');";
+ query.appendQueryString(cqlQuery);
+ //\TODO check if I am not shooting on my own foot
+ try {
+ MusicCore.nonKeyRelatedPut(query,"critical");
+ } catch (MusicServiceException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage());
+ throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString());
+ }
}
+
- @Override
- public PartitionInformation getPartitionInformation(DatabasePartition partition) throws MDBCServiceException {
- //\TODO We may want to cache this information to avoid going to the database to obtain this simple information
- MusicRangeInformationRow row = getMusicRangeInformation(partition);
- return row.partition;
- }
@Override
- public HashMap<Range,StagingTable> getTransactionDigest(MusicTxDigestId id) throws MDBCServiceException {
- return DatabaseOperations.getTransactionDigest(music_ns, musicTxDigestTableName, id);
+ public HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException {
+ String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName);
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ pQueryObject.addValue(id.txId);
+ Row newRow;
+ try {
+ newRow = executeMusicUnlockedQuorumGet(pQueryObject);
+ } catch (MDBCServiceException e) {
+ logger.error("Get operation error: Failure to get row from txdigesttable with id:"+id.txId);
+ throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information");
+ }
+ String digest = newRow.getString("transactiondigest");
+ HashMap<Range,StagingTable> changes;
+ try {
+ changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest);
+ } catch (IOException e) {
+ logger.error("IOException when deserializing digest failed with an invalid class for id:"+id.txId);
+ throw new MDBCServiceException("Deserializng digest failed with ioexception");
+ } catch (ClassNotFoundException e) {
+ logger.error("Deserializng digest failed with an invalid class for id:"+id.txId);
+ throw new MDBCServiceException("Deserializng digest failed with an invalid class");
+ }
+ return changes;
}
@Override
@@ -1247,4 +1425,78 @@ public class CassandraMixin implements MusicInterface {
public void relinquish(String ownerId, String rangeId){
throw new UnsupportedOperationException();
}
+
+ /**
+ * This method executes a write query in Music
+ * @param cql the CQL to be sent to Cassandra
+ */
+ private static void executeMusicWriteQuery(String keyspace, String table, String cql)
+ throws MDBCServiceException {
+ PreparedQueryObject pQueryObject = new PreparedQueryObject();
+ pQueryObject.appendQueryString(cql);
+ ResultType rt = null;
+ try {
+ rt = MusicCore.createTable(keyspace,table,pQueryObject,"critical");
+ } catch (MusicServiceException e) {
+ //\TODO: handle better, at least transform into an MDBCServiceException
+ e.printStackTrace();
+ }
+ String result = rt.getResult();
+ if (result==null || result.toLowerCase().equals("failure")) {
+ throw new MDBCServiceException("Music eventual put failed");
+ }
+ }
+
+ private static Row executeMusicLockedGet(String keyspace, String table, PreparedQueryObject cqlObject, String primaryKey,
+ String lock)
+ throws MDBCServiceException{
+ ResultSet result;
+ try {
+ result = MusicCore.criticalGet(keyspace,table,primaryKey,cqlObject,lock);
+ } catch(MusicServiceException e){
+ //\TODO: handle better, at least transform into an MDBCServiceException
+ e.printStackTrace();
+ throw new MDBCServiceException("Error executing critical get");
+ }
+ if(result.isExhausted()){
+ throw new MDBCServiceException("There is not a row that matches the id "+primaryKey);
+ }
+ return result.one();
+ }
+
+ private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject)
+ throws MDBCServiceException{
+ ResultSet result = MusicCore.quorumGet(cqlObject);
+ //\TODO: handle better, at least transform into an MDBCServiceException
+ if(result.isExhausted()){
+ throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]");
+ }
+ return result.one();
+ }
+
+ private void executeMusicLockedPut(String namespace, String tableName,
+ String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId,
+ MusicCore.Condition conditionInfo) throws MDBCServiceException {
+ ReturnType rt ;
+ if(lockId==null) {
+ try {
+ rt = MusicCore.atomicPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, conditionInfo);
+ } catch (MusicLockingException e) {
+ logger.error("Music locked put failed");
+ throw new MDBCServiceException("Music locked put failed");
+ } catch (MusicServiceException e) {
+ logger.error("Music service fail: Music locked put failed");
+ throw new MDBCServiceException("Music service fail: Music locked put failed");
+ } catch (MusicQueryException e) {
+ logger.error("Music query fail: locked put failed");
+ throw new MDBCServiceException("Music query fail: Music locked put failed");
+ }
+ }
+ else {
+ rt = MusicCore.criticalPut(namespace, tableName, primaryKeyWithoutDomain, queryObject, lockId, conditionInfo);
+ }
+ if (rt.getResult().getResult().toLowerCase().equals("failure")) {
+ throw new MDBCServiceException("Music locked put failed");
+ }
+ }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
index 6a1219c..52b3036 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
@@ -27,10 +27,10 @@ import java.util.UUID;
import org.json.JSONObject;
import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
-import org.onap.music.mdbc.tables.PartitionInformation;
import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.MriReference;
@@ -81,8 +81,9 @@ public interface MusicInterface {
/**
* This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables.
* The keyspace name comes from the initialization properties passed to the JDBC driver.
+ * @throws MusicServiceException
*/
- void createKeyspace();
+ void createKeyspace() throws MDBCServiceException;
/**
* This method performs all necessary initialization in Music/Cassandra to store the table <i>tableName</i>.
* @param tableName the table to initialize MUSIC for
@@ -170,23 +171,22 @@ public interface MusicInterface {
*/
void commitLog(DBInterface dbi, DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException;
- MusicRangeInformationRow getMusicRangeInformation(DatabasePartition partition) throws MDBCServiceException;
+ MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException;
DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException;
- void appendToRedoLog(MriReference mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException;
+ void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException;
- void addTxDigest(String musicTxDigestTable, MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException;
+ void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException;
- PartitionInformation getPartitionInformation(DatabasePartition partition) throws MDBCServiceException;
+ HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException;
- HashMap<Range,StagingTable> getTransactionDigest(MusicTxDigestId id) throws MDBCServiceException;
-
void own(List<Range> ranges);
void appendRange(String rangeId, List<Range> ranges);
void relinquish(String ownerId, String rangeId);
+ public List<UUID> getPartitionIndexes();
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
index 46d41d4..a7ea680 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
@@ -31,7 +31,6 @@ import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
-import org.onap.music.mdbc.tables.PartitionInformation;
import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.StagingTable;
import org.onap.music.mdbc.tables.MriReference;
@@ -148,30 +147,6 @@ public class MusicMixin implements MusicInterface {
public void updateDirtyRowAndEntityTableInMusic(String tableName, JSONObject changedRow, boolean isCritical) {
}
-
- public static void loadProperties() {
- Properties prop = new Properties();
- InputStream input = null;
- try {
- input = MusicMixin.class.getClassLoader().getResourceAsStream("mdbc.properties");
- prop.load(input);
- String crTable = prop.getProperty("critical.tables");
- String[] tableArr = crTable.split(",");
- criticalTables = Arrays.asList(tableArr);
-
- } catch (Exception ex) {
- ex.printStackTrace();
- } finally {
- if (input != null) {
- try {
- input.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
public static void releaseZKLocks(Set<LockId> lockIds) {
for (LockId lockId : lockIds) {
System.out.println("Releasing lock: " + lockId);
@@ -208,14 +183,10 @@ public class MusicMixin implements MusicInterface {
}
@Override
- public HashMap<Range, StagingTable> getTransactionDigest(MusicTxDigestId id) {
+ public HashMap<Range, StagingTable> getTxDigest(MusicTxDigestId id) {
return null;
}
- @Override
- public PartitionInformation getPartitionInformation(DatabasePartition partition) {
- return null;
- }
@Override
public DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) {
@@ -223,11 +194,11 @@ public class MusicMixin implements MusicInterface {
}
@Override
- public void appendToRedoLog(MriReference mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) {
+ public void appendToRedoLog(UUID mriRowId, DatabasePartition partition, MusicTxDigestId newRecord) {
}
@Override
- public void addTxDigest(String musicTxDigestTable, MusicTxDigestId newId, String transactionDigest) {
+ public void addTxDigest(MusicTxDigestId newId, String transactionDigest) {
}
@Override
@@ -246,7 +217,14 @@ public class MusicMixin implements MusicInterface {
}
@Override
- public MusicRangeInformationRow getMusicRangeInformation(DatabasePartition partition){
+ public List<UUID> getPartitionIndexes() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException {
+ // TODO Auto-generated method stub
return null;
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
index 23056e7..5943b34 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
@@ -573,10 +573,9 @@ NEW.field refers to the new value
String op = rs.getString("OP");
OperationType opType = toOpEnum(op);
String tbl = rs.getString("TABLENAME");
- String keydataStr = rs.getString("KEYDATA");
+ JSONObject keydataStr = new JSONObject(new JSONTokener(rs.getString("KEYDATA")));
String newRowStr = rs.getString("NEWROWDATA");
JSONObject newRow = new JSONObject(new JSONTokener(newRowStr));
- String musicKey;
TableInfo ti = getTableInfo(tbl);
if (!ti.hasKey()) {
//create music key
@@ -586,26 +585,27 @@ NEW.field refers to the new value
// the actual columns, otherwise performance when doing range queries are going
// to be even worse (see the else bracket down)
//
- musicKey = msm.generateUniqueKey();
+ String musicKey = msm.generateUniqueKey();
/*} else {
//get key from data
musicKey = msm.getMusicKeyFromRowWithoutPrimaryIndexes(tbl,newRow);
}*/
newRow.put(msm.getMusicDefaultPrimaryKeyName(), musicKey);
+ keydataStr.put(msm.getMusicDefaultPrimaryKeyName(), musicKey);
}
- else {
+ /*else {
//Use the keys
musicKey = msm.getMusicKeyFromRow(tbl, newRow);
if(musicKey.isEmpty()) {
logger.error(EELFLoggerDelegate.errorLogger,"Primary key is invalid: ["+tbl+","+op+"]");
throw new NoSuchFieldException("Invalid operation enum");
}
- }
+ }*/
Range range = new Range(tbl);
if(!transactionDigests.containsKey(range)) {
transactionDigests.put(range, new StagingTable());
}
- transactionDigests.get(range).addOperation(musicKey, opType, newRow.toString());
+ transactionDigests.get(range).addOperation(opType, newRow.toString(), keydataStr.toString());
rows.add(ix);
}
rs.getStatement().close();
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java
index cfa8771..86088f9 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java
@@ -20,7 +20,6 @@
package org.onap.music.mdbc.mixins;
import java.io.IOException;
-import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.ResultSet;
@@ -44,7 +43,7 @@ import com.datastax.driver.core.utils.Bytes;
* @author Robert P. Eby
*/
public class Utils {
- private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Utils.class);
+ public static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Utils.class);
/**
* Transforms and JsonObject into an array of objects
@@ -168,7 +167,7 @@ public class Utils {
Properties pr = null;
try {
pr = new Properties();
- pr.load(Utils.class.getResourceAsStream("/mdbc_driver.properties"));
+ pr.load(Utils.class.getResourceAsStream("/mdbc.properties"));
}
catch (IOException e) {
logger.error(EELFLoggerDelegate.errorLogger, "Could not load property file > " + e.getMessage());
@@ -196,7 +195,7 @@ public class Utils {
Properties pr = null;
try {
pr = new Properties();
- pr.load(Utils.class.getResourceAsStream("/mdbc_driver.properties"));
+ pr.load(Utils.class.getResourceAsStream("/mdbc.properties"));
}
catch (IOException e) {
logger.error("Could not load property file > " + e.getMessage());
@@ -215,25 +214,4 @@ public class Utils {
}
}
}
-
- public static Properties getMdbcProperties() {
- Properties prop = new Properties();
- InputStream input = null;
- try {
- input = Utils.class.getClassLoader().getResourceAsStream("/mdbc.properties");
- prop.load(input);
- } catch (Exception e) {
- logger.warn(EELFLoggerDelegate.applicationLogger, "Could load mdbc.properties."
- + "Proceeding with defaults " + e.getMessage());
- } finally {
- if (input != null) {
- try {
- input.close();
- } catch (IOException e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
- }
- }
- }
- return prop;
- }
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java
index 61c7bf1..69f2c31 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java
@@ -22,11 +22,9 @@ package org.onap.music.mdbc.tables;
import java.util.UUID;
public final class MriReference {
- public final String table;
public final UUID index;
- public MriReference(String table, UUID index) {
- this.table = table;
+ public MriReference(UUID index) {
this.index= index;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java
index 6b67e5c..94011d7 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java
@@ -22,20 +22,41 @@ package org.onap.music.mdbc.tables;
import java.util.List;
import java.util.UUID;
+import org.onap.music.mdbc.DatabasePartition;
+
public final class MusicRangeInformationRow {
- public final UUID index;
- public final PartitionInformation partition;
- public final List<MusicTxDigestId> redoLog;
- public final String ownerId;
- public final String metricProcessId;
-
- public MusicRangeInformationRow(UUID index, List<MusicTxDigestId> redoLog, PartitionInformation partition,
- String ownerId, String metricProcessId) {
- this.index = index;
+ private final DatabasePartition dbPartition;
+ //private final UUID partitionIndex;
+ private final List<MusicTxDigestId> redoLog;
+ private final String ownerId;
+ private final String metricProcessId;
+
+ public MusicRangeInformationRow (DatabasePartition dbPartition, List<MusicTxDigestId> redoLog,
+ String ownerId, String metricProcessId) {
+ this.dbPartition = dbPartition;
this.redoLog = redoLog;
- this.partition = partition;
this.ownerId = ownerId;
this.metricProcessId = metricProcessId;
}
+ /*public UUID getPartitionIndex() {
+ return dbPartition.getMusicRangeInformationIndex();
+ } */
+
+ public DatabasePartition getDBPartition() {
+ return this.dbPartition;
+ }
+
+ public List<MusicTxDigestId> getRedoLog() {
+ return redoLog;
+ }
+
+ public String getOwnerId() {
+ return ownerId;
+ }
+
+ public String getMetricProcessId() {
+ return metricProcessId;
+ }
+
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
new file mode 100644
index 0000000..0a5bd60
--- /dev/null
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
@@ -0,0 +1,220 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+package org.onap.music.mdbc.tables;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.json.JSONObject;
+import org.onap.music.datastore.PreparedQueryObject;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.exceptions.MusicServiceException;
+import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.DatabasePartition;
+import org.onap.music.mdbc.MDBCUtils;
+import org.onap.music.mdbc.MdbcServerLogic;
+import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.StateManager;
+import org.onap.music.mdbc.configurations.NodeConfiguration;
+import org.onap.music.mdbc.mixins.CassandraMixin;
+import org.onap.music.mdbc.mixins.MusicInterface;
+
+import com.datastax.driver.core.Row;
+
+public class MusicTxDigest {
+ private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicTxDigest.class);
+
+ //private MdbcServerLogic mdbcServer;
+ //private NodeConfiguration config;
+ private StateManager stateManager;
+
+ public MusicTxDigest(StateManager stateManager) {
+ this.stateManager = stateManager;
+ }
+
+ /**
+ * Parse the transaction digest into individual events
+ * @param digest - base 64 encoded, serialized digest
+ */
+ public void replayTxDigest(HashMap<Range,StagingTable> digest) {
+ for (Map.Entry<Range,StagingTable> entry: digest.entrySet()) {
+ Range r = entry.getKey();
+ StagingTable st = entry.getValue();
+ ArrayList<Operation> opList = st.getOperationList();
+
+ for (Operation op: opList) {
+ replayOperation(r, op);
+ }
+ }
+ }
+
+ /**
+ * Replays operation into local database
+ * @param r
+ * @param op
+ */
+ private void replayOperation(Range r, Operation op) {
+ logger.info("Operation: " + op.getOperationType() + "->" + op.getNewVal());
+ JSONObject jsonOp = op.getNewVal();
+ JSONObject key = op.getKey();
+
+ ArrayList<String> cols = new ArrayList<String>();
+ ArrayList<Object> vals = new ArrayList<Object>();
+ Iterator<String> colIterator = jsonOp.keys();
+ while(colIterator.hasNext()) {
+ String col = colIterator.next();
+ //FIXME: should not explicitly refer to cassandramixin
+ if (col.equals(CassandraMixin.MDBC_PRIMARYKEY_NAME)) {
+ //reserved name
+ continue;
+ }
+ cols.add(col);
+ vals.add(jsonOp.get(col));
+ }
+
+ //build the queries
+ StringBuilder sql = new StringBuilder();
+ String sep = "";
+ switch (op.getOperationType()) {
+ case INSERT:
+ sql.append(op.getOperationType() + " INTO ");
+ sql.append(r.table + " (") ;
+ sep = "";
+ for (String col: cols) {
+ sql.append(sep + col);
+ sep = ", ";
+ }
+ sql.append(") VALUES (");
+ sep = "";
+ for (Object val: vals) {
+ sql.append(sep + "\"" + val + "\"");
+ sep = ", ";
+ }
+ sql.append(");");
+ logger.info(sql.toString());
+ break;
+ case UPDATE:
+ sql.append(op.getOperationType() + " ");
+ sql.append(r.table + " SET ");
+ sep="";
+ for (int i=0; i<cols.size(); i++) {
+ sql.append(sep + cols.get(i) + "=\"" + vals.get(i) +"\"");
+ sep = ", ";
+ }
+ sql.append(" WHERE ");
+ sql.append(getPrimaryKeyConditional(op.getKey()));
+ sql.append(";");
+ logger.info(sql.toString());
+ break;
+ case DELETE:
+ sql.append(op.getOperationType() + " FROM ");
+ sql.append(r.table + " WHERE ");
+ sql.append(getPrimaryKeyConditional(op.getKey()));
+ sql.append(";");
+ logger.info(sql.toString());
+ break;
+ case SELECT:
+ //no update happened, do nothing
+ break;
+ }
+ }
+
+ private String getPrimaryKeyConditional(JSONObject primaryKeys) {
+ StringBuilder keyCondStmt = new StringBuilder();
+ String and = "";
+ for (String key: primaryKeys.keySet()) {
+ Object val = primaryKeys.get(key);
+ keyCondStmt.append(and + key + "=\"" + val + "\"");
+ and = " AND ";
+ }
+ return keyCondStmt.toString();
+ }
+
+ /**
+ * Runs the body of the background daemon
+ * @param daemonSleepTimeS time, in seconds, between updates
+ * @throws InterruptedException
+ */
+ public void backgroundDaemon(int daemonSleepTimeS) throws InterruptedException {
+ MusicInterface mi = stateManager.getMusicInterface();
+ while (true) {
+ //update
+ logger.info(String.format("[%s] Background MusicTxDigest daemon updating local db",
+ new Timestamp(System.currentTimeMillis())));
+
+ //1) get all other partitions from musicrangeinformation
+ List<UUID> partitions = mi.getPartitionIndexes();
+ //2) for each partition I don't own
+ DatabasePartition myPartition = stateManager.getRanges();
+ for (UUID partition: partitions) {
+ if (!partition.equals(myPartition.getMusicRangeInformationIndex())){
+ try {
+ replayDigestForPartition(mi, partition);
+ } catch (MDBCServiceException e) {
+ logger.error("Unable to update for partition : " + partition + ". " + e.getMessage());
+ continue;
+ }
+ }
+ }
+ Thread.sleep(TimeUnit.SECONDS.toMillis(daemonSleepTimeS));
+ }
+ }
+
+ public void replayDigestForPartition(MusicInterface mi, UUID partitionId) throws MDBCServiceException {
+ List<MusicTxDigestId> redoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog();
+ for (MusicTxDigestId txId: redoLogTxIds) {
+ HashMap<Range, StagingTable> digest = mi.getTxDigest(txId);
+ replayTxDigest(digest);
+ }
+ //todo, keep track of where I am in pointer
+ }
+
+ /**
+ * Start the background daemon defined by this object
+ * Spawns a new thread and runs "backgroundDaemon"
+ * @param daemonSleepTimeS time, in seconds, between updates run by daemon
+ */
+ public void startBackgroundDaemon(int daemonSleepTimeS) {
+ class MusicTxBackgroundDaemon implements Runnable {
+ public void run() {
+ while (true) {
+ try {
+ logger.info("MusicTxDigest background daemon started");
+ backgroundDaemon(daemonSleepTimeS);
+ } catch (InterruptedException e) {
+ logger.error("MusicTxDigest background daemon stopped " + e.getMessage());
+ }
+ }
+ }
+ }
+ Thread t = new Thread(new MusicTxBackgroundDaemon());
+ t.start();
+
+ }
+
+
+}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
index 33952e0..fda34e2 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
@@ -22,13 +22,13 @@ package org.onap.music.mdbc.tables;
import java.util.UUID;
public final class MusicTxDigestId {
- public final UUID tablePrimaryKey;
+ public final UUID txId;
public MusicTxDigestId(UUID primaryKey) {
- this.tablePrimaryKey= primaryKey;
+ this.txId= primaryKey;
}
public boolean isEmpty() {
- return (this.tablePrimaryKey==null);
+ return (this.txId==null);
}
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
index 0c68575..0870be9 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
@@ -30,10 +30,12 @@ public final class Operation implements Serializable{
final OperationType TYPE;
final String NEW_VAL;
+ final String KEY;
- public Operation(OperationType type, String newVal) {
+ public Operation(OperationType type, String newVal, String key) {
TYPE = type;
NEW_VAL = newVal;
+ KEY = key;
}
public JSONObject getNewVal(){
@@ -41,6 +43,11 @@ public final class Operation implements Serializable{
return newRow;
}
+ public JSONObject getKey() {
+ JSONObject key = new JSONObject(new JSONTokener(KEY));
+ return key;
+ }
+
public OperationType getOperationType() {
return this.TYPE;
}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/PartitionInformation.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/PartitionInformation.java
deleted file mode 100755
index 6724860..0000000
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/PartitionInformation.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * ============LICENSE_START====================================================
- * org.onap.music.mdbc
- * =============================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END======================================================
- */
-package org.onap.music.mdbc.tables;
-
-import org.onap.music.mdbc.Range;
-
-import java.util.List;
-
-public class PartitionInformation {
- public final List<Range> ranges;
-
- public PartitionInformation(List<Range> ranges) {
- this.ranges=ranges;
- }
-}
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
index d080c54..fcff5ff 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
@@ -20,6 +20,7 @@
package org.onap.music.mdbc.tables;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
@@ -36,32 +37,18 @@ public class StagingTable implements Serializable{
private static final long serialVersionUID = 7583182634761771943L;
private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(StagingTable.class);
//primary key -> Operation
- private HashMap<String,Deque<Operation>> operations;
+ private ArrayList<Operation> operations;
public StagingTable() {
- operations = new HashMap<>();
+ operations = new ArrayList<Operation>();
}
- synchronized public void addOperation(String key, OperationType type, String newVal) {
- if(!operations.containsKey(key)) {
- operations.put(key, new LinkedList<>());
- }
- operations.get(key).add(new Operation(type,newVal));
+ synchronized public void addOperation(OperationType type, String newVal, String key) {
+ operations.add(new Operation(type,newVal, key));
}
- synchronized public Deque<Pair<String,Operation>> getIterableSnapshot() throws NoSuchFieldException{
- Deque<Pair<String,Operation>> response=new LinkedList<Pair<String,Operation>>();
- //\TODO: check if we can just return the last change to a given key
- Set<String> keys = operations.keySet();
- for(String key : keys) {
- Deque<Operation> ops = operations.get(key);
- if(ops.isEmpty()) {
- logger.error(EELFLoggerDelegate.errorLogger, "Invalid state of the Operation data structure when creating snapshot");
- throw new NoSuchFieldException("Invalid state of the operation data structure");
- }
- response.add(Pair.of(key,ops.getLast()));
- }
- return response;
+ synchronized public ArrayList<Operation> getOperationList() {
+ return operations;
}
synchronized public void clean() {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java
index d515539..0277902 100755
--- a/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java
+++ b/mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java
@@ -40,12 +40,6 @@ public class CreatePartition {
private String mriIndex;
@Parameter(names = { "-m", "--mri-table-name" }, required = true,
description = "Mri Table name")
- private String mriTable;
- @Parameter(names = { "-r", "--music-tx-digest-table-name" }, required = true,
- description = "Music Transaction Digest Table name")
- private String mtxdTable;
- @Parameter(names = { "-h", "-help", "--help" }, help = true,
- description = "Print the help message")
private boolean help = false;
NodeConfiguration config;
@@ -54,7 +48,7 @@ public class CreatePartition {
}
public void convert(){
- config = new NodeConfiguration(tables, UUID.fromString(mriIndex),mriTable,"test","", mtxdTable);
+ config = new NodeConfiguration(tables, UUID.fromString(mriIndex),"test","");
}
public void saveToFile(){
diff --git a/mdbc-server/src/main/resources/mdbc.properties b/mdbc-server/src/main/resources/mdbc.properties
index 3e207aa..d3feee2 100755
--- a/mdbc-server/src/main/resources/mdbc.properties
+++ b/mdbc-server/src/main/resources/mdbc.properties
@@ -8,5 +8,8 @@ MIXINS= \
org.onap.music.mdbc.mixins.CassandraMixin \
org.onap.music.mdbc.mixins.Cassandra2Mixin
-critical.tables= \
- TEST \ No newline at end of file
+DEFAULT_DRIVERS=\
+ org.h2.Driver \
+ com.mysql.jdbc.Driver
+
+txdaemonsleeps=15 \ No newline at end of file
diff --git a/mdbc-server/src/main/resources/mdbc_driver.properties b/mdbc-server/src/main/resources/mdbc_driver.properties
deleted file mode 100755
index 487feb3..0000000
--- a/mdbc-server/src/main/resources/mdbc_driver.properties
+++ /dev/null
@@ -1,13 +0,0 @@
-#
-# A list of all Mixins that should be checked by MDBC
-#
-MIXINS= \
- org.onap.music.mdbc.mixins.H2Mixin \
- org.onap.music.mdbc.mixins.H2ServerMixin \
- org.onap.music.mdbc.mixins.MySQLMixin \
- org.onap.music.mdbc.mixins.CassandraMixin \
- org.onap.music.mdbc.mixins.Cassandra2Mixin
-
-DEFAULT_DRIVERS=\
- org.h2.Driver \
- com.mysql.jdbc.Driver \ No newline at end of file
diff --git a/mdbc-server/src/main/resources/music.properties b/mdbc-server/src/main/resources/music.properties
index 201651e..83dcb7c 100755
--- a/mdbc-server/src/main/resources/music.properties
+++ b/mdbc-server/src/main/resources/music.properties
@@ -1,8 +1,8 @@
cassandra.host =\
-135.197.226.108
+ localhost
cassandra.user =\
cassandra
cassandra.password =\
cassandra
zookeeper.host =\
-135.197.226.108
+ localhost
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/DatabaseOperationsTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/DatabaseOperationsTest.java
deleted file mode 100755
index 07c1451..0000000
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/DatabaseOperationsTest.java
+++ /dev/null
@@ -1,480 +0,0 @@
-/*
- * ============LICENSE_START====================================================
- * org.onap.music.mdbc
- * =============================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END======================================================
- */
-package org.onap.music.mdbc;
-
-import com.datastax.driver.core.*;
-import com.datastax.driver.core.exceptions.QueryExecutionException;
-import com.datastax.driver.core.exceptions.SyntaxError;
-import org.apache.commons.lang3.tuple.Pair;
-//import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.Ignore;
-import org.onap.music.datastore.CassaDataStore;
-import org.onap.music.datastore.PreparedQueryObject;
-import org.onap.music.exceptions.MDBCServiceException;
-import org.onap.music.exceptions.MusicLockingException;
-import org.onap.music.exceptions.MusicQueryException;
-import org.onap.music.exceptions.MusicServiceException;
-import org.onap.music.main.MusicCore;
-import org.onap.music.main.MusicUtil;
-import org.onap.music.main.ResultType;
-import org.onap.music.main.ReturnType;
-import org.onap.music.mdbc.tables.*;
-
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.junit.Assert.*;
-
-@Ignore
-public class DatabaseOperationsTest {
-
- final private String keyspace="metricmusictest";
- final private String mriTableName = "musicrangeinformation";
- final private String mtdTableName = "musictxdigest";
-
- //Properties used to connect to music
- private static Cluster cluster;
- private static Session session;
- private static String cassaHost = "localhost";
-
- @BeforeClass
- public static void init() throws MusicServiceException {
- try {
- // EmbeddedCassandraServerHelper.startEmbeddedCassandra();
- } catch (Exception e) {
- System.out.println(e);
- }
-
- cluster = new Cluster.Builder().addContactPoint(cassaHost).withPort(9142).build();
- cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(20000);
- session = cluster.connect();
-
- assertNotNull("Invalid configuration for cassandra", cluster);
- session = cluster.connect();
- assertNotNull("Invalid configuration for cassandra", session);
-// TestUtils.populateMusicUtilsWithProperties(prop);
- CassaDataStore store = new CassaDataStore(cluster, session);
- assertNotNull("Invalid configuration for music", store);
- MusicCore.mDstoreHandle = store;
-
- }
-
- @AfterClass
- public static void close() throws MusicServiceException, MusicQueryException {
-
- //TODO: shutdown cassandra
-
- }
-
- @Before
- public void setUp() throws Exception {
- // System.out.println("TEST 1: Getting ready for testing connection to Cassandra");
- //Create keyspace
-
-
- createKeyspace();
- useKeyspace();
- }
-
- @After
- public void tearDown() {
- deleteKeyspace();
- }
-
- private void createKeyspace() {
- String queryOp = "CREATE KEYSPACE " +
- keyspace +
- " WITH REPLICATION " +
- "= {'class':'SimpleStrategy', 'replication_factor':1}; ";
- ResultSet res=null;
- try {
- res = session.execute(queryOp);
- }
- catch(QueryExecutionException e){
- fail("Failure executing creation of keyspace with error: " + e.getMessage());
- } catch(SyntaxError e){
- fail("Failure executing creation of keyspace with syntax error: " + e.getMessage());
- }
- assertTrue("Keyspace "+keyspace+" is already being used, please change it to avoid loosing data",res.wasApplied());
- }
-
- private void useKeyspace(){
- String queryBuilder = "USE " +
- keyspace +
- "; ";
- ResultSet res = session.execute(queryBuilder);
- assertTrue("Keyspace "+keyspace+" is already being used, please change it to avoid loosing data",res.wasApplied());
- }
-
- private void deleteKeyspace(){
- String queryBuilder = "DROP KEYSPACE " +
- keyspace +
- ";";
- ResultSet res = session.execute(queryBuilder);
- assertTrue("Keyspace "+keyspace+" doesn't exist and it should",res.wasApplied());
- }
-
- private void CreateMTD(){
- try {
- DatabaseOperations.createMusicTxDigest(keyspace, mtdTableName);
- } catch (MDBCServiceException e) {
- fail("Execution of creating music tx digest failed");
- }
- }
-
- @Test
- public void createMusicTxDigest() {
- HashSet<String> expectedColumns = new HashSet<>(
- Arrays.asList("txid","transactiondigest")
- );
- HashMap<String,DataType> expectedTypes = new HashMap<>();
- expectedTypes.put("txid",DataType.uuid());
- expectedTypes.put("transactiondigest",DataType.text());
- CreateMTD();
- //check structure of table
- CassaDataStore ds=null;
- try {
- ds = MusicCore.getDSHandle();
- } catch (MusicServiceException e) {
- fail("Getting DS handle fail with error " + e.getErrorMessage());
- }
- TableMetadata table = ds.returnColumnMetadata(keyspace,mtdTableName);
- assertNotNull("Error obtaining metadata of table, there may be an error with its creation", table);
- List<ColumnMetadata> columnsMeta = table.getColumns();
- checkDataTypeForTable(columnsMeta,expectedColumns,expectedTypes);
- }
-
- @Test
- public void createMusicRangeInformationTable() {
- HashSet<String> expectedColumns = new HashSet<>(
- Arrays.asList("rangeid","keys","txredolog","ownerid","metricprocessid")
- );
- HashMap<String,DataType> expectedTypes = new HashMap<>();
- expectedTypes.put("rangeid",DataType.uuid());
- expectedTypes.put("keys",DataType.set(DataType.text()));
- ProtocolVersion currentVer = cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
- assertNotNull("Protocol version for cluster is invalid", currentVer);
- CodecRegistry registry = cluster.getConfiguration().getCodecRegistry();
- assertNotNull("Codec registry for cluster is invalid", registry);
- expectedTypes.put("txredolog",DataType.list(TupleType.of(currentVer,registry,DataType.text(),DataType.uuid())));
- expectedTypes.put("ownerid",DataType.text());
- expectedTypes.put("metricprocessid",DataType.text());
- try {
- DatabaseOperations.createMusicRangeInformationTable(keyspace,mriTableName);
- } catch (MDBCServiceException e) {
- fail("Execution of creating music tx digest failed");
- }
- //check structure of table
- CassaDataStore ds=null;
- try {
- ds = MusicCore.getDSHandle();
- } catch (MusicServiceException e) {
- fail("Getting DS handle fail with error " + e.getErrorMessage());
- }
- TableMetadata table = ds.returnColumnMetadata(keyspace,mriTableName);
- assertNotNull("Error obtaining metadata of table, there may be an error with its creation", table);
- List<ColumnMetadata> columnsMeta = table.getColumns();
- checkDataTypeForTable(columnsMeta,expectedColumns,expectedTypes);
- }
-
- private void checkDataTypeForTable(List<ColumnMetadata> columnsMeta, HashSet<String> expectedColumns,
- HashMap<String,DataType> expectedTypes){
- for(ColumnMetadata cMeta : columnsMeta){
- String columnName = cMeta.getName();
- DataType type = cMeta.getType();
- assertTrue("Invalid column name: "+columnName,expectedColumns.contains(columnName));
- assertTrue("Fix the contents of expectedtypes for column: "+columnName,
- expectedTypes.containsKey(columnName));
- assertEquals("Invalid type for column: "+columnName,
- expectedTypes.get(columnName),type);
- }
- }
-
- private void createMRI(){
- try {
- DatabaseOperations.createMusicRangeInformationTable(keyspace,mriTableName);
- } catch (MDBCServiceException e) {
- fail("Execution of creating music tx digest failed");
- }
- }
-
- @Test
- public void createEmptyMriRow() {
- //Assume mri creation is working
- createMRI();
- List<Range> ranges = new ArrayList<>();
- ranges.add(new Range("table1"));
- ranges.add(new Range("table2"));
- final String lockId = null;
- String processId = "tcp://test:1234";
- UUID newRowId=null;
- try {
- newRowId = DatabaseOperations.createEmptyMriRow(keyspace,mriTableName,processId,
- lockId, ranges);
- } catch (MDBCServiceException e) {
- fail("Adding a new empty mri row failed");
- }
- getRowFromMriAndCompare(newRowId,ranges,lockId,processId);
- }
-
- private String getLock(String table, MriReference mriIndex){
- String fullyQualifiedMriKey = keyspace+"."+ mriIndex.table+"."+mriIndex.index.toString();
- String lockId;
- lockId = MusicCore.createLockReference(fullyQualifiedMriKey);
- //\TODO Handle better failures to acquire locks
- ReturnType lockReturn=null;
- try {
- lockReturn = MusicCore.acquireLock(fullyQualifiedMriKey,lockId);
- } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
- fail(e.getMessage());
- }
- assertEquals(lockReturn.getResult(),ResultType.SUCCESS);
- return lockId;
- }
-
- private void releaseLock(MriReference mriIndex, String lock){
- String fullyQualifiedMriKey = keyspace+"."+ mriIndex.table+"."+mriIndex.index.toString();
- try {
- MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey,lock);
- } catch (MusicLockingException e) {
- fail(e.getMessage());
- }
- }
-
- private List<Range> getTestRanges(){
- List<Range> ranges = new ArrayList<>();
- ranges.add(new Range("table1"));
- ranges.add(new Range("table2"));
- return ranges;
- }
-
- private String getTestProcessId(){
- return "tcp://test:1234";
- }
-
- private UUID CreateRowWithLockAndCheck(UUID newId, String lockId){
-
- List<Range> ranges = getTestRanges();
- String processId = getTestProcessId();
- UUID newRowId=null;
- try {
- newRowId = DatabaseOperations.createEmptyMriRow(keyspace,mriTableName,newId, processId, lockId, ranges);
- } catch (MDBCServiceException e) {
- fail("Adding a new empty mri row failed");
- }
- getRowFromMriAndCompare(newRowId,ranges,lockId,processId);
- return newRowId;
- }
-
- @Test
- public void createEmptyMriRowWithLock() {
- createMRI();
- //Assume mri creation is working
- UUID newId = DatabaseOperations.generateUniqueKey();
- MriReference mriIndex = new MriReference(mriTableName,newId);
- String lockId = getLock(mriTableName,mriIndex);
- assertTrue("Error obtaining lock",!lockId.isEmpty());
- UUID newRowId = CreateRowWithLockAndCheck(newId,lockId);
- assertEquals(newRowId,newId);
- releaseLock(mriIndex,lockId);
- }
-
- private void getRowFromMriAndCompare(UUID newRowId, List<Range> ranges, String lockId, String processId){
- lockId=(lockId==null)?"":lockId;
- ResultSet res=null;
- String queryOp = "SELECT * FROM " +
- keyspace + "." + mriTableName +
- " WHERE rangeid = " +
- newRowId +
- ";";
- try {
- res = session.execute(queryOp);
- }
- catch(QueryExecutionException e){
- fail("Failure executing retrieval of row in MRU error: " + e.getMessage());
- } catch(SyntaxError e){
- fail("Failure executing retrieval of row with syntax error: " + e.getMessage());
- }
- assertFalse(res.isExhausted());
- Row response = res.one();
- UUID id = response.get("rangeid",UUID.class);
- assertEquals(id,newRowId);
- Set<String> keys = response.getSet("keys",String.class);
- for(Range r : ranges){
- assertTrue("Table was not found in retrieved keys",keys.contains(r.table));
- }
- List<TupleValue> redo = response.getList("txredolog",TupleValue.class);
- assertTrue(redo.isEmpty());
- String ownerId = response.getString("ownerid");
- assertEquals(ownerId,lockId);
- String mpid= response.getString("metricprocessid");
- assertEquals(mpid,processId);
- }
-
- @Test
- public void getMriRow() {
- createMRI();
- //Assume mri creation is working
- UUID newId = DatabaseOperations.generateUniqueKey();
- MriReference mriIndex = new MriReference(mriTableName,newId);
- String lockId = getLock(mriTableName,mriIndex);
- assertTrue("Error obtaining lock",!lockId.isEmpty());
- UUID newRowId = CreateRowWithLockAndCheck(newId,lockId);
- MusicRangeInformationRow mriRow=null;
- try {
- mriRow = DatabaseOperations.getMriRow(keyspace, mriTableName, newRowId, lockId);
- } catch (MDBCServiceException e) {
- fail(e.getErrorMessage());
- }
- final List<Range> ranges = getTestRanges();
- String processId = getTestProcessId();
- assertEquals("invalid process id", mriRow.metricProcessId,processId);
- assertEquals("invalid index", mriRow.index,newRowId);
- assertEquals("invalid lock id",mriRow.ownerId,lockId);
- assertTrue("redo log is not empty", mriRow.redoLog.isEmpty());
- List<Range> readRange = mriRow.partition.ranges;
- List<Range> range = ranges;
- for(Range r: range){
- boolean found = false;
- for(Range rr : readRange) {
- if(r.equals(rr)) {
- found = true;
- }
-
- }
- assertTrue("ranges are incorrect", found);
- }
- }
-
- @Test
- public void getTransactionDigest() {
- CreateMTD();
- Range inputRange = new Range("table1");
- StagingTable inputStaging = new StagingTable();
- inputStaging.addOperation("key1", OperationType.INSERT,"1");
- HashMap<Range, StagingTable> input= new HashMap<>();
- input.put(inputRange, inputStaging);
- MusicTxDigestId newId = new MusicTxDigestId(DatabaseOperations.generateUniqueKey());
- try {
- DatabaseOperations.createTxDigestRow(keyspace,mtdTableName,newId,MDBCUtils.toString(input));
- } catch (MDBCServiceException e) {
- fail("Adding a new mtd row failed");
- } catch (IOException e) {
- fail("Fail compressing input staging tables");
- }
- HashMap<Range, StagingTable> results=null;
- try {
- results = DatabaseOperations.getTransactionDigest(keyspace,mtdTableName,newId);
- } catch (MDBCServiceException e) {
- fail("Adding a new mtd row failed with error: "+e.getErrorMessage());
- }
- assertTrue(results.containsKey(inputRange));
- StagingTable newStaging = results.get(inputRange);
- Deque<Pair<String,Operation>> opers=null;
- Deque<Pair<String,Operation>> initialOpers=null;
- try {
- opers=newStaging.getIterableSnapshot();
- initialOpers=inputStaging.getIterableSnapshot();
- } catch (NoSuchFieldException e) {
- fail(e.getMessage());
- }
- assertEquals("Operations are not equal",opers.size(),initialOpers.size());
- while(!opers.isEmpty()){
- Pair<String,Operation> recvOper = opers.getFirst();
- Pair<String,Operation> originalOper = initialOpers.getFirst();
- assertEquals(recvOper.getKey(),originalOper.getKey());
- assertEquals(recvOper.getValue(),originalOper.getValue());
- opers.removeFirst();
- initialOpers.removeFirst();
- }
- }
-
- @Test
- public void createNamespace() {
- deleteKeyspace();
- try {
- DatabaseOperations.createNamespace(keyspace,1);
- } catch (MDBCServiceException e) {
- fail(e.getErrorMessage());
- }
- String describeOp = "USE "+keyspace+";";
- ResultSet res=null;
- try {
- res = session.execute(describeOp);
- }
- catch(QueryExecutionException e){
- fail("Failure executing retrieval of row in MRU error: " + e.getMessage());
- } catch(SyntaxError e){
- fail("Failure executing retrieval of row with syntax error: " + e.getMessage());
- }
- assertTrue("Error with keyspace: "+keyspace, res.wasApplied());
- }
-
- private void getRowFromMtdAndCompare(MusicTxDigestId newId, String transactionDigest){
- ResultSet res=null;
- String queryOp = "SELECT * FROM " +
- keyspace + "." + mtdTableName+
- " WHERE txid = " +
- newId.tablePrimaryKey +
- ";";
- try {
- res = session.execute(queryOp);
- }
- catch(QueryExecutionException e){
- fail("Failure executing retrieval of row in MTD error: " + e.getMessage());
- } catch(SyntaxError e){
- fail("Failure executing retrieval of row in MTD with syntax error: " + e.getMessage());
- }
- assertFalse(res.isExhausted());
- Row response = res.one();
- UUID id = response.getUUID("txId");
- assertEquals(id,newId.tablePrimaryKey);
- String digest = response.getString("transactiondigest");
- assertEquals(digest,transactionDigest);
- }
-
- @Test
- public void createTxDigestRow(){
- CreateMTD();
- MusicTxDigestId newId = new MusicTxDigestId(DatabaseOperations.generateUniqueKey());
- String transactionDigest = "newdigest";
- try {
- DatabaseOperations.createTxDigestRow(keyspace,mtdTableName,newId,transactionDigest);
- } catch (MDBCServiceException e) {
- fail("Adding a new empty mtd row failed");
- }
- getRowFromMtdAndCompare(newId,transactionDigest);
-
- }
-
-}
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java
index a02578e..676d760 100755
--- a/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java
@@ -37,7 +37,8 @@ public class MDBCUtilsTest {
@Test
public void toStringTest1() {
StagingTable table = new StagingTable();
- table.addOperation("test",OperationType.INSERT,(new JSONObject(new String[]{"test3", "Test4"})).toString());
+ table.addOperation(OperationType.INSERT,(new JSONObject(new String[]{"test3", "Test4"})).toString(),
+ (new JSONObject(new String[]{"test_key", "test_value"})).toString());
String output=null;
try {
output = MDBCUtils.toString(table);
@@ -53,7 +54,8 @@ public class MDBCUtilsTest {
public void toStringTest2() {
HashMap<String,StagingTable> mapToSerialize = new HashMap<>();
StagingTable table = new StagingTable();
- table.addOperation("test",OperationType.INSERT,(new JSONObject(new String[]{"test3", "Test4"})).toString());
+ table.addOperation(OperationType.INSERT,(new JSONObject(new String[]{"test3", "Test4"}).toString()),
+ (new JSONObject(new String[]{"test_key", "test_value"})).toString());
mapToSerialize.put("table",table);
String output=null;
try {
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java
new file mode 100644
index 0000000..eab38d3
--- /dev/null
+++ b/mdbc-server/src/test/java/org/onap/music/mdbc/MusicTxDigestTest.java
@@ -0,0 +1,42 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.onap.music.mdbc.tables.MusicTxDigest;
+import org.onap.music.mdbc.tables.StagingTable;
+
+public class MusicTxDigestTest {
+
+ @Test
+ public void test() throws Exception {
+ MusicTxDigest txDigest = new MusicTxDigest(null);
+ String t1 = "rO0ABXNyABFqYXZhLnV0aWwuSGFzaE1hcAUH2sHDFmDRAwACRgAKbG9hZEZhY3RvckkACXRocmVzaG9sZHhwP0AAAAAAAAx3CAAAABAAAAABc3IAGW9yZy5vbmFwLm11c2ljLm1kYmMuUmFuZ2UWWoOV+3nB2AIAAUwABXRhYmxldAASTGphdmEvbGFuZy9TdHJpbmc7eHB0AAdwZXJzb25zc3IAJ29yZy5vbmFwLm11c2ljLm1kYmMudGFibGVzLlN0YWdpbmdUYWJsZWk84G3L4tunAgABTAAKb3BlcmF0aW9uc3QAFUxqYXZhL3V0aWwvQXJyYXlMaXN0O3hwc3IAE2phdmEudXRpbC5BcnJheUxpc3R4gdIdmcdhnQMAAUkABHNpemV4cAAAAAV3BAAAAAVzcgAkb3JnLm9uYXAubXVzaWMubWRiYy50YWJsZXMuT3BlcmF0aW9u7yJhSJSWe0ACAANMAANLRVlxAH4AA0wAB05FV19WQUxxAH4AA0wABFRZUEV0ACpMb3JnL29uYXAvbXVzaWMvbWRiYy90YWJsZXMvT3BlcmF0aW9uVHlwZTt4cHQAJHsiUGVyc29uSUQiOjEsIkxhc3ROYW1lIjoiTWFydGluZXoifXQAWXsiQWRkcmVzcyI6IktBQ0IiLCJQZXJzb25JRCI6MSwiRmlyc3ROYW1lIjoiSnVhbiIsIkNpdHkiOiJBVExBTlRBIiwiTGFzdE5hbWUiOiJNYXJ0aW5leiJ9fnIAKG9yZy5vbmFwLm11c2ljLm1kYmMudGFibGVzLk9wZXJhdGlvblR5cGUAAAAAAAAAABIAAHhyAA5qYXZhLmxhbmcuRW51bQAAAAAAAAAAEgAAeHB0AAZJTlNFUlRzcQB+AAt0ACR7IlBlcnNvbklEIjoxLCJMYXN0TmFtZSI6Ik1hcnRpbmV6In10AFl7IkFkZHJlc3MiOiJLQUNCIiwiUGVyc29uSUQiOjEsIkZpcnN0TmFtZSI6Ikp1YW4iLCJDaXR5IjoiQVRMQU5UQSIsIkxhc3ROYW1lIjoiTWFydGluZXoifX5xAH4AEHQABkRFTEVURXNxAH4AC3QAIXsiUGVyc29uSUQiOjIsIkxhc3ROYW1lIjoiU21pdGgifXQAWXsiQWRkcmVzcyI6IkdOT0MiLCJQZXJzb25JRCI6MiwiRmlyc3ROYW1lIjoiSk9ITiIsIkNpdHkiOiJCRURNSU5TVEVSIiwiTGFzdE5hbWUiOiJTbWl0aCJ9cQB+ABJzcQB+AAt0ACF7IlBlcnNvbklEIjoyLCJMYXN0TmFtZSI6IlNtaXRoIn10AFl7IkFkZHJlc3MiOiJHTk9DIiwiUGVyc29uSUQiOjIsIkZpcnN0TmFtZSI6IkpPU0giLCJDaXR5IjoiQkVETUlOU1RFUiIsIkxhc3ROYW1lIjoiU21pdGgifX5xAH4AEHQABlVQREFURXNxAH4AC3QAIXsiUGVyc29uSUQiOjIsIkxhc3ROYW1lIjoiU21pdGgifXQAWXsiQWRkcmVzcyI6IkdOT0MiLCJQZXJzb25JRCI6MiwiRmlyc3ROYW1lIjoiSk9ITiIsIkNpdHkiOiJCRURNSU5TVEVSIiwiTGFzdE5hbWUiOiJTbWl0aCJ9cQB+AB94eA==";
+ HashMap<Range, StagingTable> digest = (HashMap<Range, StagingTable>) MDBCUtils.fromString(t1);
+ txDigest.replayTxDigest(digest);
+ }
+
+}