From 1359bbd68acb27e47e32b5dff8b7f35354584366 Mon Sep 17 00:00:00 2001 From: st782s Date: Fri, 8 Feb 2019 17:30:33 -0500 Subject: Fix Eventual consistency Issue-ID: MUSIC-319 Change-Id: I7433e7d71c44ffe623e560863e7e1fd9c0f247ce Signed-off-by: st782s --- .../onap/music/mdbc/examples/MdbcTestClient.java | 310 ++++++++++----------- .../org/onap/music/mdbc/mixins/MusicMixin.java | 47 +++- mdbc-server/src/main/resources/music.properties | 6 +- 3 files changed, 191 insertions(+), 172 deletions(-) mode change 100755 => 100644 mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java (limited to 'mdbc-server/src/main') diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java old mode 100755 new mode 100644 index 35293ef..6a17d4c --- a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java @@ -1,155 +1,155 @@ -/* - * ============LICENSE_START==================================================== - * org.onap.music.mdbc - * ============================================================================= - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END====================================================== - */ -package org.onap.music.mdbc.examples; - -import java.sql.*; -import org.apache.calcite.avatica.remote.Driver; - -public class MdbcTestClient { - - public static class Hr { - public final Employee[] emps = { - new Employee(100, "Bill"), - new Employee(200, "Eric"), - new Employee(150, "Sebastian"), - }; - } - - public static class Employee { - public final int empid; - public final String name; - - public Employee(int empid, String name) { - this.empid = empid; - this.name = name; - } - } - - public static void main(String[] args){ - try { - Class.forName("org.apache.calcite.avatica.remote.Driver"); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - System.exit(1); - } - Connection connection; - try { - connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf"); - } catch (SQLException e) { - e.printStackTrace(); - return; - } - - try { - connection.setAutoCommit(false); - } catch (SQLException e) { - e.printStackTrace(); - return; - } - - - final String sql = "CREATE TABLE IF NOT EXISTS Persons (\n" + - " PersonID int,\n" + - " LastName varchar(255),\n" + - " FirstName varchar(255),\n" + - " Address varchar(255),\n" + - " City varchar(255),\n" + - " PRIMARY KEY (PersonID,LastName)" + - ");"; - Statement stmt; - try { - stmt = connection.createStatement(); - } catch (SQLException e) { - e.printStackTrace(); - return; - } - - boolean execute; - try { - execute = stmt.execute(sql); - } catch (SQLException e) { - e.printStackTrace(); - return; - } - - if (execute) { - try { - connection.commit(); - } catch (SQLException e) { - e.printStackTrace(); - } - } - - try { - stmt.close(); - } catch (SQLException e) { - e.printStackTrace(); - } - - final String insertSQL = "INSERT INTO Persons VALUES (1, 'Martinez', 'Juan', 'KACB', 'ATLANTA');"; - final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=1;"; - final String insertSQL2 = "INSERT INTO Persons VALUES (2, 'Smith', 'JOHN', 'GNOC', 'BEDMINSTER');"; - final String insertSQL3 = "UPDATE Persons SET FirstName='JOSH' WHERE LastName='Smith';"; - final String insertSQL4 = "UPDATE Persons SET FirstName='JOHN' WHERE LastName='Smith';"; - - - Statement insertStmt; - try { - insertStmt = connection.createStatement(); - } catch (SQLException e) { - e.printStackTrace(); - return; - } - - try { - execute = insertStmt.execute(insertSQL); - execute = insertStmt.execute(insertSQL1); - execute = insertStmt.execute(insertSQL2); - execute = insertStmt.execute(insertSQL3); - execute = insertStmt.execute(insertSQL4); - - } catch (SQLException e) { - e.printStackTrace(); - return; - } - - try { - connection.commit(); - } catch (SQLException e) { - e.printStackTrace(); - return; - } - - try { - stmt.close(); - insertStmt.close(); - } catch (SQLException e) { - e.printStackTrace(); - } - - try { - connection.close(); - } catch (SQLException e) { - e.printStackTrace(); - } - - - } -} +/* + * ============LICENSE_START==================================================== + * org.onap.music.mdbc + * ============================================================================= + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END====================================================== + */ +package org.onap.music.mdbc.examples; + +import java.sql.*; +import org.apache.calcite.avatica.remote.Driver; + +public class MdbcTestClient { + + public static class Hr { + public final Employee[] emps = { + new Employee(100, "Bill"), + new Employee(200, "Eric"), + new Employee(150, "Sebastian"), + }; + } + + public static class Employee { + public final int empid; + public final String name; + + public Employee(int empid, String name) { + this.empid = empid; + this.name = name; + } + } + + public static void main(String[] args){ + try { + Class.forName("org.apache.calcite.avatica.remote.Driver"); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + System.exit(1); + } + Connection connection; + try { + connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf"); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + try { + connection.setAutoCommit(false); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + + final String sql = "CREATE TABLE IF NOT EXISTS Persons (\n" + + " PersonID int,\n" + + " LastName varchar(255),\n" + + " FirstName varchar(255),\n" + + " Address varchar(255),\n" + + " City varchar(255),\n" + + " PRIMARY KEY (PersonID,LastName)" + + ");"; + Statement stmt; + try { + stmt = connection.createStatement(); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + boolean execute = true; +// try { +// execute = stmt.execute(sql); +// } catch (SQLException e) { +// e.printStackTrace(); +// return; +// } + + if (execute) { + try { + connection.commit(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + + try { + stmt.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + + final String insertSQL = "INSERT INTO Persons VALUES (1, 'Martinez', 'Juan', 'KACB', 'ATLANTA');"; + final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=1;"; + final String insertSQL2 = "INSERT INTO Persons VALUES (2, 'Smith', 'JOHN', 'GNOC', 'BEDMINSTER');"; + final String insertSQL3 = "UPDATE Persons SET FirstName='JOSH' WHERE LastName='Smith';"; + final String insertSQL4 = "UPDATE Persons SET FirstName='JOHN' WHERE LastName='Smith';"; + + + Statement insertStmt; + try { + insertStmt = connection.createStatement(); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + try { + execute = insertStmt.execute(insertSQL); + execute = insertStmt.execute(insertSQL1); + execute = insertStmt.execute(insertSQL2); + execute = insertStmt.execute(insertSQL3); + execute = insertStmt.execute(insertSQL4); + + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + try { + connection.commit(); + } catch (SQLException e) { + e.printStackTrace(); + return; + } + + try { + stmt.close(); + insertStmt.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + + try { + connection.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + + + } +} diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java index 963647c..0210cd1 100644 --- a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java +++ b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java @@ -998,7 +998,7 @@ public class MusicMixin implements MusicInterface { * @return a ResultSet containing the rows returned from the query */ protected ResultSet executeMusicRead(String cql) throws MDBCServiceException { - logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql); + logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC read:"+ cql); PreparedQueryObject pQueryObject = new PreparedQueryObject(); pQueryObject.appendQueryString(cql); ResultSet results = null; @@ -1010,6 +1010,23 @@ public class MusicMixin implements MusicInterface { } return results; } + + /** + * This method executes a read query in Music + * @param pQueryObject the PreparedQueryObject to be sent to Cassandra + * @return a ResultSet containing the rows returned from the query + */ + protected ResultSet executeMusicRead(PreparedQueryObject pQueryObject) throws MDBCServiceException { + logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC read:"+ pQueryObject.getQuery()); + ResultSet results = null; + try { + results = MusicCore.get(pQueryObject); + } catch (MusicServiceException e) { + logger.error("Error executing music get operation for query: ["+pQueryObject.getQuery()+"]"); + throw new MDBCServiceException("Error executing get: "+e.getMessage(), e); + } + return results; + } /** * Returns the default primary key name that this mixin uses @@ -1924,23 +1941,23 @@ public class MusicMixin implements MusicInterface { HashMap changes; String cql; LinkedHashMap> ecDigestInformation = new LinkedHashMap>(); - String musicevetxdigestNodeinfoTimeID = getTxTimeIdFromNodeInfo(nodeName); + UUID musicevetxdigestNodeinfoTimeID = getTxTimeIdFromNodeInfo(nodeName); PreparedQueryObject pQueryObject = new PreparedQueryObject(); - if (musicevetxdigestNodeinfoTimeID != null && !musicevetxdigestNodeinfoTimeID.isEmpty() ) { + if (musicevetxdigestNodeinfoTimeID != null) { // this will fetch only few records based on the time-stamp condition. - cql = String.format("SELECT * FROM %s.%s WHERE txtimeid > ?;", music_ns, this.musicEventualTxDigestTableName); + cql = String.format("SELECT * FROM %s.%s WHERE txtimeid > ? LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName); pQueryObject.appendQueryString(cql); pQueryObject.addValue(musicevetxdigestNodeinfoTimeID); } else { // This is going to Fetch all the Transactiondigest records from the musicevetxdigest table. - cql = String.format("SELECT * FROM %s.%s ;", music_ns, this.musicEventualTxDigestTableName); + cql = String.format("SELECT * FROM %s.%s LIMIT 10;", music_ns, this.musicEventualTxDigestTableName); pQueryObject.appendQueryString(cql); } // I need to get a ResultSet of all the records and give each row to the below HashMap. - ResultSet rs = executeMusicRead(pQueryObject.getQuery()); + ResultSet rs = executeMusicRead(pQueryObject); while (!rs.isExhausted()) { Row row = rs.one(); String digest = row.getString("transactiondigest"); @@ -2561,13 +2578,16 @@ public class MusicMixin implements MusicInterface { @Override public void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException{ - String cql = String.format("UPDATE %s.%s SET txtimeid = (%s), txupdatedatetime = now() WHERE nodename = ?;", music_ns, this.musicEventualTxDigestTableName, txTimeID); + String cql = String.format("UPDATE %s.%s SET txtimeid = %s, txupdatedatetime = now() WHERE nodename = ?;", music_ns, this.musicNodeInfoTableName, txTimeID); PreparedQueryObject pQueryObject = new PreparedQueryObject(); pQueryObject.appendQueryString(cql); pQueryObject.addValue(nodeName); - - executeMusicWriteQuery(pQueryObject.getQuery()); - logger.info("Successfully updated nodeinfo table with txtimeid value: " + txTimeID + " against the node:" + nodeName); + + ReturnType rt = MusicCore.eventualPut(pQueryObject); + if(rt.getResult().getResult().toLowerCase().equals("failure")) { + logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage()); + } + else logger.info("Successfully updated nodeinfo table with txtimeid value: " + txTimeID + " against the node:" + nodeName); } @@ -2618,7 +2638,7 @@ public class MusicMixin implements MusicInterface { } } - public String getTxTimeIdFromNodeInfo(String nodeName) throws MDBCServiceException { + public UUID getTxTimeIdFromNodeInfo(String nodeName) throws MDBCServiceException { // expecting NodeName from base-0.json file: which is : NJNode //String nodeName = MdbcServer.stateManager.getMdbcServerName(); // this retrieves the NJNode row from Cassandra's NodeInfo table so that I can retrieve TimeStamp for further processing. @@ -2632,13 +2652,12 @@ public class MusicMixin implements MusicInterface { } catch (MDBCServiceException e) { logger.error("Get operation error: Failure to get row from nodeinfo with nodename:"+nodeName); // TODO check underlying exception if no data and return empty string - return ""; + return null; //throw new MDBCServiceException("error:Failure to retrive nodeinfo details information", e); } - String txtimeid = newRow.getString("txtimeid"); + return newRow.getUUID("txtimeid"); - return txtimeid; } diff --git a/mdbc-server/src/main/resources/music.properties b/mdbc-server/src/main/resources/music.properties index 21f3e92..83dcb7c 100755 --- a/mdbc-server/src/main/resources/music.properties +++ b/mdbc-server/src/main/resources/music.properties @@ -1,8 +1,8 @@ cassandra.host =\ - 192.168.1.19 + localhost cassandra.user =\ - metric + cassandra cassandra.password =\ - metriccluster + cassandra zookeeper.host =\ localhost -- cgit 1.2.3-korg