/*
* ============LICENSE_START====================================================
* org.onap.music.mdbc
* =============================================================================
* Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* =============================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END======================================================
*/
package org.onap.music.mdbc.mixins;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.*;
import java.util.Map.Entry;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.delete.Delete;
import net.sf.jsqlparser.statement.insert.Insert;
import net.sf.jsqlparser.statement.update.Update;
import org.apache.commons.lang3.tuple.Pair;
//import org.apache.zookeeper.KeeperException.UnimplementedException;
import org.json.JSONObject;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.Configuration;
import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
import org.onap.music.mdbc.TableInfo;
import org.onap.music.mdbc.mixins.MySQLMixin.StagingTableUpdateRunnable;
import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.Operation;
import org.onap.music.mdbc.query.SQLOperation;
import org.onap.music.mdbc.tables.StagingTable;
import org.postgresql.util.PGInterval;
import org.postgresql.util.PGobject;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
/**
* This class provides the methods that MDBC needs in order to mirror data to/from a
* MySQL or MariaDB database instance. This class
* uses the JSON_OBJECT()
database function, which means it requires the following minimum versions of
* either database:
*
*
* DATABASE |
* VERSION |
*
*
* MySQL |
* 5.7.8 |
*
*
* MariaDB |
* 10.2.3 (Note: 10.2.3 is currently (July 2017) a beta release) |
*
*
*
* @author Robert P. Eby
*/
public class PostgresMixin implements DBInterface {
private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(PostgresMixin.class);
public static final String MIXIN_NAME = "postgres";
public static final String TRANS_TBL_SCHEMA = "audit";
public static final String TRANS_TBL = "mdbc_translog";
private final MusicInterface mi;
private final String connId;
private final String dbName;
private final String schema;
private final Connection jdbcConn;
private final Map tables;
private PreparedStatement deleteStagingStatement;
private boolean useAsyncStagingUpdate = false;
private Object stagingHandlerLock = new Object();
private AsyncUpdateHandler stagingHandler = null;
private StagingTable currentStaging = null;
public PostgresMixin() {
this.mi = null;
this.connId = "";
this.dbName = null;
this.schema = null;
this.jdbcConn = null;
this.tables = null;
this.deleteStagingStatement = null;
}
private void initializeDeleteStatement() throws SQLException {
deleteStagingStatement = jdbcConn.prepareStatement("DELETE FROM " + TRANS_TBL_SCHEMA + "." + TRANS_TBL
+ " WHERE (ix BETWEEN ? AND ? ) AND " + "connection_id = ?;");
}
public PostgresMixin(MusicInterface mi, String url, Connection conn, Properties info) throws SQLException {
this.mi = mi;
this.connId = generateConnID(conn);
this.dbName = getDBName(conn);
this.schema = getSchema(conn);
this.jdbcConn = conn;
this.tables = new HashMap<>();
useAsyncStagingUpdate = Boolean.parseBoolean(info.getProperty(Configuration.KEY_ASYNC_STAGING_TABLE_UPDATE,
Configuration.ASYNC_STAGING_TABLE_UPDATE));
initializePostgresTriggersStructures();
initializeDeleteStatement();
}
class StagingTableUpdateRunnable implements Runnable {
private PostgresMixin mixin;
private StagingTable staging;
StagingTableUpdateRunnable(PostgresMixin mixin, StagingTable staging) {
this.mixin = mixin;
this.staging = staging;
}
@Override
public void run() {
try {
this.mixin.updateStagingTable(staging);
} catch (NoSuchFieldException | MDBCServiceException e) {
this.mixin.logger.error("Error when updating the staging table");
}
}
}
private void createTriggerTable() throws SQLException {
final String createSchemaSQL = "CREATE SCHEMA IF NOT EXISTS " + TRANS_TBL_SCHEMA + ";";
final String revokeCreatePrivilegesSQL = "REVOKE CREATE ON schema " + TRANS_TBL_SCHEMA + " FROM public;";
final String createTableSQL = "CREATE TABLE IF NOT EXISTS " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " ("
+ "ix serial," + "op TEXT NOT NULL CHECK (op IN ('I','D','U'))," + "schema_name text NOT NULL,"
+ "table_name text NOT NULL," + "original_data json," + "new_data json," + "connection_id text,"
+ "PRIMARY KEY (connection_id,ix)" + ") WITH (fillfactor=100);";
final String revokeSQL = "REVOKE INSERT,UPDATE,DELETE,TRUNCATE,REFERENCES,TRIGGER ON " + TRANS_TBL_SCHEMA + "."
+ TRANS_TBL + " FROM public;";
final String grantSelectSQL = "GRANT SELECT ON " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " TO public;";
final String createIndexSQL = "CREATE INDEX IF NOT EXISTS logged_actions_connection_id_idx" + " ON "
+ TRANS_TBL_SCHEMA + "." + TRANS_TBL + " (connection_id);";
Map sqlStatements = new LinkedHashMap() {
{
put("create_schema", createSchemaSQL);
put("revoke_privileges", revokeCreatePrivilegesSQL);
put("create_table", createTableSQL);
put("revoke_sql", revokeSQL);
put("grant_select", grantSelectSQL);
put("create_index", createIndexSQL);
}
};
for (Entry query : sqlStatements.entrySet()) {
int retryCount = 0;
boolean ready = false;
while (retryCount < 3 && !ready) {
try {
Statement statement = jdbcConn.createStatement();
statement.executeUpdate(query.getValue());
if (!jdbcConn.getAutoCommit()) {
jdbcConn.commit();
}
statement.close();
ready = true;
} catch (SQLException e) {
if (e.getMessage().equalsIgnoreCase("ERROR: tuple concurrently updated") || e.getMessage()
.toLowerCase().startsWith("error: duplicate key value violates unique constraint")) {
logger.warn("Error creating schema, retrying. for " + query.getKey(), e);
try {
Thread.sleep(100);
} catch (InterruptedException e1) {
}
} else {
logger.error("Error executing " + query.getKey(), e);
throw e;
}
}
retryCount++;
}
}
}
private String updateTriggerSection() {
return "IF (TG_OP = 'UPDATE') THEN\n" + "v_old_data := row_to_json(OLD);\n"
+ "v_new_data := row_to_json(NEW);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA + "." + TRANS_TBL
+ " (op,schema_name,table_name,original_data,new_data,connection_id)\n"
+ "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_old_data,v_new_data,pg_backend_pid());\n"
+ "RETURN NEW; ";
}
private String insertTriggerSection() {
// \TODO add additinoal conditional on change "IF NEW IS DISTINCT FROM OLD THEN"
return "IF (TG_OP = 'INSERT') THEN\n" + "v_new_data := row_to_json(NEW);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA
+ "." + TRANS_TBL + " (op,schema_name,table_name,new_data,connection_id)\n"
+ "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_new_data,pg_backend_pid());\n"
+ "RETURN NEW; ";
}
private String deleteTriggerSection() {
return "IF (TG_OP = 'DELETE') THEN\n" + "v_old_data := row_to_json(OLD);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA
+ "." + TRANS_TBL + " (op,schema_name,table_name,original_data,connection_id)\n"
+ "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_old_data,pg_backend_pid());\n"
+ "RETURN OLD; ";
}
private String functionName(SQLOperation type) {
final String functionName = (type.equals(SQLOperation.UPDATE)) ? "if_updated_func"
: (type.equals(SQLOperation.INSERT)) ? "if_inserted_func" : "if_deleted_func";
return "audit." + functionName + "()";
}
private void createTriggerFunctions(SQLOperation type) throws SQLException {
StringBuilder functionSQL =
new StringBuilder("CREATE OR REPLACE FUNCTION " + functionName(type) + " RETURNS TRIGGER AS $body$\n"
+ "DECLARE\n" + "v_old_data json;\n" + "v_new_data json;\n" + "BEGIN\n");
switch (type) {
case UPDATE:
functionSQL.append(updateTriggerSection());
break;
case INSERT:
functionSQL.append(insertTriggerSection());
break;
case DELETE:
functionSQL.append(deleteTriggerSection());
break;
default:
throw new IllegalArgumentException("Invalid operation type for creation of trigger functions");
}
functionSQL.append("ELSE\n"
+ "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - Other action occurred: %, at %',TG_OP,now();\n"
+ "RETURN NULL;\n" + "END IF;\n" + "EXCEPTION\n" + "WHEN data_exception THEN\n"
+ "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [DATA EXCEPTION] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n"
+ "RETURN NULL;\n" + "WHEN unique_violation THEN\n"
+ "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [UNIQUE] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n"
+ "RETURN NULL;\n" + "WHEN OTHERS THEN\n"
+ "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [OTHER] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n"
+ "RETURN NULL;\n" + "END;\n" + "$body$\n" + "LANGUAGE plpgsql\n" + "SECURITY DEFINER\n"
+ "SET search_path = pg_catalog, audit;");
int retryCount = 0;
boolean ready = false;
while (retryCount < 3 && !ready) {
try {
executeSQLWrite(functionSQL.toString());
ready = true;
} catch (SQLException e) {
if (e.getMessage().equalsIgnoreCase("ERROR: tuple concurrently updated")) {
logger.warn("Error creating schema, retrying. ", e);
try {
Thread.sleep(200);
} catch (InterruptedException e1) {
}
} else {
logger.error("Error executing creation of trigger function", e);
throw e;
}
}
retryCount++;
}
}
private void initializePostgresTriggersStructures() throws SQLException {
try {
createTriggerTable();
} catch (SQLException e) {
logger.error("Error creating the trigger tables in postgres", e);
throw e;
}
try {
createTriggerFunctions(SQLOperation.INSERT);
createTriggerFunctions(SQLOperation.UPDATE);
createTriggerFunctions(SQLOperation.DELETE);
} catch (SQLException e) {
logger.error("Error creating the trigger functions in postgres", e);
throw e;
}
}
// This is used to generate a unique connId for this connection to the DB.
private String generateConnID(Connection conn) {
String rv = Integer.toString((int) System.currentTimeMillis()); // random-ish
try {
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT pg_backend_pid() AS IX");
if (rs.next()) {
rv = rs.getString("IX");
}
stmt.close();
} catch (SQLException e) {
logger.error(EELFLoggerDelegate.errorLogger, "generateConnID: problem generating a connection ID!");
}
return rv;
}
/**
* Get the name of this DBnterface mixin object.
*
* @return the name
*/
@Override
public String getMixinName() {
return MIXIN_NAME;
}
@Override
public void close() {
// nothing yet
}
/**
* Determines the db name associated with the connection This is the private/internal method that actually
* determines the name
*
* @param conn
* @return
*/
private String getDBName(Connection conn) {
String dbname = "mdbc"; // default name
try {
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT current_database();");
if (rs.next()) {
dbname = rs.getString("current_database");
}
stmt.close();
} catch (SQLException e) {
logger.error(EELFLoggerDelegate.errorLogger, "getDBName: problem getting database name from mysql");
}
return dbname;
}
/**
* Determines the db name associated with the connection This is the private/internal method that actually
* determines the name
*
* @param conn
* @return
*/
private String getSchema(Connection conn) {
String schema = "public"; // default name
try {
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT current_schema();");
if (rs.next()) {
schema = rs.getString("current_schema");
}
stmt.close();
} catch (SQLException e) {
logger.error(EELFLoggerDelegate.errorLogger, "getDBName: problem getting database name from mysql");
}
return schema;
}
@Override
public String getSchema() {
return schema;
}
@Override
public String getDatabaseName() {
return this.dbName;
}
/**
* Get a set of the table names in the database.
*
* @return the set
*/
@Override
public Set getSQLRangeSet() {
Set set = new TreeSet();
String sql =
"SELECT table_name FROM information_schema.tables WHERE table_type='BASE TABLE' AND table_schema=current_schema();";
try {
Statement stmt = jdbcConn.createStatement();
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
String s = rs.getString("table_name");
set.add(s);
}
stmt.close();
} catch (SQLException e) {
logger.error(EELFLoggerDelegate.errorLogger, "getSQLTableSet: " + e);
}
logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set);
Set rangeSet = new HashSet<>();
for (String table : set) {
rangeSet.add(new Range(table));
}
return rangeSet;
}
/**
* Return a TableInfo object for the specified table. This method first looks in a cache of previously constructed
* TableInfo objects for the table. If not found, it queries the INFORMATION_SCHEMA.COLUMNS table to obtain the
* column names, types, and indexes of the table. It creates a new TableInfo object with the results.
*
* @param tableName the table to look up
* @return a TableInfo object containing the info we need, or null if the table does not exist
*/
@Override
public TableInfo getTableInfo(String tableName) {
// \TODO: invalidate "tables" when a table schema is modified (uncommon), but needs to be handled
TableInfo ti = tables.get(tableName);
if (ti == null) {
try {
String tbl, localSchema;
final String[] split = tableName.split("\\.");
if (split.length == 2) {
localSchema = split[0];
tbl = split[1];
} else {
tbl = tableName;
localSchema = this.schema;
}
String sql;
if (schema == null) {
sql = "select column_name, data_type from information_schema.columns where table_schema=current_schema() and table_name='"
+ tbl + "';";
} else {
sql = "select column_name, data_type from information_schema.columns where table_schema='"
+ localSchema + "' and table_name='" + tbl + "';";
}
ResultSet rs = executeSQLRead(sql);
if (rs != null) {
ti = new TableInfo();
while (rs.next()) {
String name = rs.getString("column_name");
String type = rs.getString("data_type");
ti.columns.add(name);
ti.coltype.add(mapDatatypeNameToType(type));
}
rs.getStatement().close();
} else {
logger.error(EELFLoggerDelegate.errorLogger,
"Cannot retrieve table info for table " + tableName + " from POSTGRES.");
return null;
}
final String keysql =
"SELECT a.attname as column_name FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid"
+ " AND a.attnum = ANY(i.indkey) WHERE i.indrelid = '" + tbl + "'::regclass "
+ " AND i.indisprimary;";
ResultSet rs2 = executeSQLRead(keysql);
Set keycols = new HashSet<>();
if (rs2 != null) {
while (rs2.next()) {
String name = rs2.getString("column_name");
keycols.add(name);
}
rs2.getStatement().close();
} else {
logger.error(EELFLoggerDelegate.errorLogger,
"Cannot retrieve table info for table " + tableName + " from MySQL.");
}
for (String col : ti.columns) {
if (keycols.contains(col)) {
ti.iskey.add(true);
} else {
ti.iskey.add(false);
}
}
} catch (SQLException e) {
logger.error(EELFLoggerDelegate.errorLogger,
"Cannot retrieve table info for table " + tableName + " from MySQL: " + e);
return null;
}
tables.put(tableName, ti);
}
return ti;
}
// Map Postgres data type names to the java.sql.Types equivalent
private int mapDatatypeNameToType(String nm) {
switch (nm) {
case "character":
return Types.CHAR;
case "national character":
return Types.NCHAR;
case "character varying":
return Types.VARCHAR;
case "national character varying":
return Types.NVARCHAR;
case "text":
return Types.VARCHAR;
case "bytea":
return Types.BINARY;
case "smallint":
return Types.SMALLINT;
case "integer":
return Types.INTEGER;
case "bigint":
return Types.BIGINT;
case "smallserial":
return Types.SMALLINT;
case "serial":
return Types.INTEGER;
case "bigserial":
return Types.BIGINT;
case "real":
return Types.REAL;
case "double precision":
return Types.DOUBLE;
case "numeric":
return Types.NUMERIC;
case "decimal":
return Types.DECIMAL;
case "date":
return Types.DATE;
case "time with time zone":
return Types.TIME_WITH_TIMEZONE;
case "time without time zone":
return Types.TIME;
case "timestamp without time zone":
return Types.TIMESTAMP;
case "timestamp with time zone":
return Types.TIMESTAMP_WITH_TIMEZONE;
case "boolean":
return Types.BIT;
case "bit":
return Types.BIT;
case "oid":
return Types.BIGINT;
case "xml":
return Types.SQLXML;
case "array":
return Types.ARRAY;
case "tinyint":
return Types.TINYINT;
case "uuid":
case "money":
case "interval":
case "bit varying":
case "box":
case "point":
case "lseg":
case "path":
case "polygon":
case "circle":
case "json":
case "inet":
case "cidr":
case "macaddr":
case "tsvector":
case "tsquery":
return Types.OTHER;
default:
logger.error(EELFLoggerDelegate.errorLogger, "unrecognized and/or unsupported data type " + nm);
return Types.VARCHAR;
}
}
@Override
public void createSQLTriggers(String tableName) {
// Don't create triggers for the table the triggers write into!!!
if (tableName.equals(TRANS_TBL) || tableName.equals(TRANS_TBL_SCHEMA + "." + TRANS_TBL))
return;
try {
// No SELECT trigger
executeSQLWrite(generateTrigger(tableName, SQLOperation.INSERT));
executeSQLWrite(generateTrigger(tableName, SQLOperation.DELETE));
executeSQLWrite(generateTrigger(tableName, SQLOperation.UPDATE));
} catch (SQLException e) {
if (e.getMessage().trim().endsWith("already exists")) {
// only warn if trigger already exists
logger.warn(EELFLoggerDelegate.applicationLogger, "createSQLTriggers" + e);
} else {
logger.error(EELFLoggerDelegate.errorLogger, "createSQLTriggers: " + e);
}
}
}
private String generateTrigger(String tableName, SQLOperation op) {
StringBuilder triggerSql = new StringBuilder("CREATE TRIGGER ").append(getTriggerName(tableName, op))
.append(" AFTER " + op + " ON ").append(tableName).append(" FOR EACH ROW EXECUTE PROCEDURE ")
.append(functionName(op)).append(';');
return triggerSql.toString();
}
private String getTriggerName(String tableName, SQLOperation op) {
switch (op) {
case DELETE:
return "D_" + tableName;
case UPDATE:
return "U_" + tableName;
case INSERT:
return "I_" + tableName;
default:
throw new IllegalArgumentException("Invalid option in trigger operation type");
}
}
private String[] getTriggerNames(String tableName) {
return new String[] {getTriggerName(tableName, SQLOperation.INSERT),
getTriggerName(tableName, SQLOperation.DELETE), getTriggerName(tableName, SQLOperation.UPDATE),};
}
@Override
public void dropSQLTriggers(String tableName) {
try {
for (String name : getTriggerNames(tableName)) {
logger.debug(EELFLoggerDelegate.applicationLogger, "REMOVE trigger " + name + " from postgres");
executeSQLWrite("DROP TRIGGER IF EXISTS " + name + ";");
}
} catch (SQLException e) {
logger.error(EELFLoggerDelegate.errorLogger, "dropSQLTriggers: " + e);
}
}
@Override
public void insertRowIntoSqlDb(String tableName, Map map) {
throw new org.apache.commons.lang.NotImplementedException("Function not implemented yet in postgres");
}
@Override
public void deleteRowFromSqlDb(String tableName, Map map) {
throw new org.apache.commons.lang.NotImplementedException("Function not implemented yet in postgres");
}
/**
* This method executes a read query in the SQL database. Methods that call this method should be sure to call
* resultset.getStatement().close() when done in order to free up resources.
*
* @param sql the query to run
* @return a ResultSet containing the rows returned from the query
*/
@Override
public ResultSet executeSQLRead(String sql) {
logger.debug(EELFLoggerDelegate.applicationLogger, "Executing sql read in postgres");
logger.debug("Executing SQL read:" + sql);
ResultSet rs;
try {
Statement stmt = jdbcConn.createStatement();
rs = stmt.executeQuery(sql);
} catch (SQLException e) {
logger.error(EELFLoggerDelegate.errorLogger, "executeSQLRead" + e);
return null;
}
return rs;
}
/**
* This method executes a write query in the sql database.
*
* @param sql the SQL to be sent to MySQL
* @throws SQLException if an underlying JDBC method throws an exception
*/
protected void executeSQLWrite(String sql) throws SQLException {
logger.debug(EELFLoggerDelegate.applicationLogger, "Executing SQL write:" + sql);
Statement stmt = jdbcConn.createStatement();
stmt.execute(sql);
stmt.close();
}
@Override
public void preCommitHook() {
synchronized (stagingHandlerLock) {
// \TODO check if this can potentially block forever in certain scenarios
if (stagingHandler != null) {
stagingHandler.waitForAllPendingUpdates();
}
}
}
/**
* Code to be run within the DB driver before a SQL statement is executed. This is where tables can be synchronized
* before a SELECT, for those databases that do not support SELECT triggers.
*
* @param sql the SQL statement that is about to be executed
* @return list of keys that will be updated, if they can't be determined afterwards (i.e. sql table doesn't have
* primary key)
*/
@Override
public void preStatementHook(final String sql) {
if (sql == null) {
return;
}
// \TODO: check if anything needs to be executed here for postgres
}
/**
* Code to be run within the DB driver after a SQL statement has been executed. This is where remote statement
* actions can be copied back to Cassandra/MUSIC.
*
* @param sql the SQL statement that was executed
*/
@Override
public void postStatementHook(final String sql, StagingTable transactionDigest) {
if (sql != null) {
String[] parts = sql.trim().split(" ");
String cmd = parts[0].toLowerCase();
if ("delete".equals(cmd) || "insert".equals(cmd) || "update".equals(cmd)) {
if (useAsyncStagingUpdate) {
synchronized (stagingHandlerLock) {
if (stagingHandler == null || currentStaging != transactionDigest) {
Runnable newRunnable =
new PostgresMixin.StagingTableUpdateRunnable(this, transactionDigest);
currentStaging = transactionDigest;
stagingHandler = new AsyncUpdateHandler(newRunnable);
}
// else we can keep using the current staging Handler
}
stagingHandler.processNewUpdate();
} else {
try {
this.updateStagingTable(transactionDigest);
} catch (NoSuchFieldException | MDBCServiceException e) {
// TODO Auto-generated catch block
this.logger.error("Error updating the staging table");
}
}
}
}
}
private SQLOperation toOpEnum(String operation) throws NoSuchFieldException {
switch (operation.toLowerCase()) {
case "i":
return SQLOperation.INSERT;
case "d":
return SQLOperation.DELETE;
case "u":
return SQLOperation.UPDATE;
case "s":
return SQLOperation.SELECT;
default:
logger.error(EELFLoggerDelegate.errorLogger, "Invalid operation selected: [" + operation + "]");
throw new NoSuchFieldException("Invalid operation enum");
}
}
/**
* Copy data that is in transaction table into music interface
*
* @param transactionDigests
* @throws NoSuchFieldException
*/
private void updateStagingTable(StagingTable transactionDigests) throws NoSuchFieldException, MDBCServiceException {
String selectSql = "select ix, op, schema_name, table_name, original_data,new_data FROM " + TRANS_TBL_SCHEMA
+ "." + TRANS_TBL + " where connection_id = '" + this.connId + "';";
Integer biggestIx = Integer.MIN_VALUE;
Integer smallestIx = Integer.MAX_VALUE;
try {
ResultSet rs = executeSQLRead(selectSql);
Set rows = new TreeSet();
while (rs.next()) {
int ix = rs.getInt("ix");
biggestIx = Integer.max(biggestIx, ix);
smallestIx = Integer.min(smallestIx, ix);
String op = rs.getString("op");
SQLOperation opType = toOpEnum(op);
String schema = rs.getString("schema_name");
String tbl = rs.getString("table_name");
String original = rs.getString("original_data");
String newData = rs.getString("new_data");
Range range = new Range(schema + "." + tbl);
transactionDigests.addOperation(range, opType, newData, original);
rows.add(ix);
}
rs.getStatement().close();
if (rows.size() > 0) {
logger.debug("Staging delete: Executing with vals [" + smallestIx + "," + biggestIx + "," + this.connId
+ "]");
this.deleteStagingStatement.setInt(1, smallestIx);
this.deleteStagingStatement.setInt(2, biggestIx);
this.deleteStagingStatement.setString(3, this.connId);
this.deleteStagingStatement.execute();
}
} catch (SQLException e) {
logger.warn("Exception in postStatementHook: " + e);
e.printStackTrace();
}
}
/**
* Update music with data from MySQL table
*
* @param tableName - name of table to update in music
*/
@Override
public void synchronizeData(String tableName) {}
/**
* Return a list of "reserved" names, that should not be used by MySQL client/MUSIC These are reserved for mdbc
*/
@Override
public List getReservedTblNames() {
ArrayList rsvdTables = new ArrayList();
rsvdTables.add(TRANS_TBL_SCHEMA + "." + TRANS_TBL);
rsvdTables.add(TRANS_TBL);
// Add others here as necessary
return rsvdTables;
}
@Override
public String getPrimaryKey(String sql, String tableName) {
return null;
}
/**
* Parse the transaction digest into individual events
*
* @param transaction - base 64 encoded, serialized digest
*/
@Override
public void replayTransaction(StagingTable transaction)
throws SQLException, MDBCServiceException {
boolean autocommit = jdbcConn.getAutoCommit();
jdbcConn.setAutoCommit(false);
Statement jdbcStmt = jdbcConn.createStatement();
final ArrayList opList = transaction.getOperationList();
for (Operation op : opList) {
try {
replayOperationIntoDB(jdbcStmt, op);
} catch (SQLException | MDBCServiceException e) {
// rollback transaction
logger.error("Unable to replay: " + op.getOperationType() + "->" + op.getVal() + "."
+ "Rolling back the entire digest replay.");
jdbcConn.rollback();
throw e;
}
}
clearReplayedOperations(jdbcStmt);
jdbcConn.commit();
jdbcStmt.close();
jdbcConn.setAutoCommit(autocommit);
}
@Override
public void disableForeignKeyChecks() throws SQLException {
Statement disable = jdbcConn.createStatement();
disable.execute("SET session_replication_role = 'replica';");
disable.closeOnCompletion();
}
@Override
public void enableForeignKeyChecks() throws SQLException {
Statement enable = jdbcConn.createStatement();
enable.execute("SET session_replication_role = 'origin';");
enable.closeOnCompletion();
}
@Override
public void applyTxDigest(StagingTable txDigest) throws SQLException, MDBCServiceException {
replayTransaction(txDigest);
}
/**
* Replays operation into database, usually from txDigest
*
* @param jdbcStmt: Connection used to perform the replay
* @param op: operation to be replayed
* @throws SQLException
* @throws MDBCServiceException
*/
private void replayOperationIntoDB(Statement jdbcStmt, Operation op) throws SQLException, MDBCServiceException {
logger.debug("Replaying Operation: " + op.getOperationType() + "->" + op.getVal());
JSONObject newVal = op.getVal();
JSONObject oldVal = null;
try {
oldVal = op.getKey();
} catch (MDBCServiceException e) {
// Ignore exception, in postgres the structure of the operation is different
}
TableInfo ti = getTableInfo(op.getTable());
final List keyColumns = ti.getKeyColumns();
// build and replay the queries
String sql = constructSQL(op, keyColumns, newVal, oldVal);
if (sql == null)
return;
try {
logger.debug("Replaying operation: " + sql);
int updated = jdbcStmt.executeUpdate(sql);
if (updated == 0) {
// This applies only for replaying transactions involving Eventually Consistent tables
logger.warn(
"Error Replaying operation: " + sql + "; Replacing insert/replace/viceversa and replaying ");
buildAndExecuteSQLInverse(jdbcStmt, op, keyColumns, newVal, oldVal);
}
} catch (SQLException sqlE) {
// This applies for replaying transactions involving Eventually Consistent tables
// or transactions that replay on top of existing keys
logger.warn(
"Error Replaying operation: " + sql + ";" + "Replacing insert/replace/viceversa and replaying ");
buildAndExecuteSQLInverse(jdbcStmt, op, keyColumns, newVal, oldVal);
}
}
protected void buildAndExecuteSQLInverse(Statement jdbcStmt, Operation op, List keyColumns,
JSONObject newVals, JSONObject oldVals) throws SQLException, MDBCServiceException {
String sqlInverse = constructSQLInverse(op, keyColumns, newVals, oldVals);
if (sqlInverse == null)
return;
logger.debug("Replaying operation: " + sqlInverse);
jdbcStmt.executeUpdate(sqlInverse);
}
protected String constructSQLInverse(Operation op, List keyColumns, JSONObject newVals, JSONObject oldVals)
throws MDBCServiceException {
String sqlInverse = null;
switch (op.getOperationType()) {
case INSERT:
sqlInverse = constructUpdate(op.getTable(), keyColumns, newVals, oldVals);
break;
case UPDATE:
sqlInverse = constructInsert(op.getTable(), newVals);
break;
default:
break;
}
return sqlInverse;
}
protected String constructSQL(Operation op, List keyColumns, JSONObject newVals, JSONObject oldVals)
throws MDBCServiceException {
String sql = null;
switch (op.getOperationType()) {
case INSERT:
sql = constructInsert(op.getTable(), newVals);
break;
case UPDATE:
sql = constructUpdate(op.getTable(), keyColumns, newVals, oldVals);
break;
case DELETE:
sql = constructDelete(op.getTable(), keyColumns, oldVals);
break;
case SELECT:
// no update happened, do nothing
break;
default:
logger.error(op.getOperationType() + "not implemented for replay");
}
return sql;
}
private String constructDelete(String tableName, List keyColumns, JSONObject oldVals)
throws MDBCServiceException {
if (oldVals == null) {
throw new MDBCServiceException("Trying to update row with an empty old val exception");
}
StringBuilder sql = new StringBuilder();
sql.append("DELETE FROM ");
sql.append(tableName + " WHERE ");
sql.append(getPrimaryKeyConditional(keyColumns, oldVals));
sql.append(";");
return sql.toString();
}
private String constructInsert(String tableName, JSONObject newVals) {
StringBuilder keys = new StringBuilder();
StringBuilder vals = new StringBuilder();
String sep = "";
for (String col : newVals.keySet()) {
keys.append(sep + col);
vals.append(sep + "'" + newVals.get(col) + "'");
sep = ", ";
}
StringBuilder sql = new StringBuilder();
sql.append("INSERT INTO ").append(tableName + " (").append(keys).append(") VALUES (").append(vals).append(");");
return sql.toString();
}
private String constructUpdate(String tableName, List keyColumns, JSONObject newVals, JSONObject oldVals)
throws MDBCServiceException {
if (oldVals == null) {
throw new MDBCServiceException("Trying to update row with an empty old val exception");
}
StringBuilder sql = new StringBuilder();
sql.append("UPDATE ").append(tableName).append(" SET ");
String sep = "";
for (String key : newVals.keySet()) {
sql.append(sep).append(key).append("=\"").append(newVals.get(key)).append("\"");
sep = ", ";
}
sql.append(" WHERE ");
sql.append(getPrimaryKeyConditional(keyColumns, oldVals));
sql.append(";");
return sql.toString();
}
/**
* Create an SQL string for AND'ing all of the primary keys
*
* @param keyColumns list with the name of the columns that are key
* @param vals json with the contents of the old row
* @return string in the form of PK1=Val1 AND PK2=Val2 AND PK3=Val3
*/
private String getPrimaryKeyConditional(List keyColumns, JSONObject vals) {
StringBuilder keyCondStmt = new StringBuilder();
String and = "";
for (String key : keyColumns) {
// We cannot use the default primary key for the sql table and operations
if (!key.equals(mi.getMusicDefaultPrimaryKeyName())) {
Object val = vals.get(key);
keyCondStmt.append(and + key + "=\"" + val + "\"");
and = " AND ";
}
}
return keyCondStmt.toString();
}
/**
* Cleans out the transaction table, removing the replayed operations
*
* @param jdbcStmt
* @throws SQLException
*/
private void clearReplayedOperations(Statement jdbcStmt) throws SQLException {
logger.info("Clearing replayed operations");
String sql =
"DELETE FROM " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " WHERE CONNECTION_ID = '" + this.connId + "';";
jdbcStmt.executeUpdate(sql);
}
@Override
public Connection getSQLConnection() {
return jdbcConn;
}
@Override
public Set getSQLTableSet() {
Set set = new TreeSet();
String sql =
"SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=CURRENT_SCHEMA() AND TABLE_TYPE='BASE TABLE'";
try {
Statement stmt = jdbcConn.createStatement();
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
String s = rs.getString("TABLE_NAME");
set.add(s);
}
stmt.close();
} catch (SQLException e) {
logger.error(EELFLoggerDelegate.errorLogger, "getSQLTableSet: " + e);
System.out.println("getSQLTableSet: " + e);
e.printStackTrace();
}
logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set);
System.out.println("getSQLTableSet returning: " + set);
return set;
}
@Override
public void updateCheckpointLocations(Range r, Pair playbackPointer) {
throw new org.apache.commons.lang.NotImplementedException();
}
@Override
public Map> getCheckpointLocations() {
throw new org.apache.commons.lang.NotImplementedException();
}
@Override
public void initTables() {
throw new org.apache.commons.lang.NotImplementedException();
}
}