diff options
author | Rich Tabedzki <richard.tabedzki@att.com> | 2018-03-02 04:42:45 +0000 |
---|---|---|
committer | Timoney, Dan (dt5972) <dt5972@att.com> | 2018-03-02 13:28:59 -0500 |
commit | e69c6e3320f27d31d6dcd86a1ab40960758b59cb (patch) | |
tree | 241c597b8c092ebc17454b74a7fd10ee9fd0bbee /dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java | |
parent | 3cc57351f6b8e438ac6d6ed98cb098a22c62608f (diff) |
Updated master database detection algorithm
Changes made:
* updated algorithm in DBResourceManager
* Updated CachedDataSource, JdbcDBCachedDataSource
* added new unit tests
Change-Id: I4f6bbeb3839f55d183d7e762743fbc9171b63b1a
Issue-ID: CCSDK-192
Signed-off-by: Rich Tabedzki <richard.tabedzki@att.com>
Diffstat (limited to 'dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java')
-rwxr-xr-x | dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java | 279 |
1 files changed, 117 insertions, 162 deletions
diff --git a/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java index b31645d4..27c14d2a 100755 --- a/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java +++ b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java @@ -20,39 +20,39 @@ package org.onap.ccsdk.sli.core.dblib; -import org.apache.tomcat.jdbc.pool.PoolExhaustedException; -import org.onap.ccsdk.sli.core.dblib.config.DbConfigPool; -import org.onap.ccsdk.sli.core.dblib.factory.DBConfigFactory; -import org.onap.ccsdk.sli.core.dblib.pm.PollingWorker; -import org.onap.ccsdk.sli.core.dblib.pm.SQLExecutionMonitor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.sql.DataSource; -import javax.sql.rowset.CachedRowSet; import java.io.PrintWriter; import java.sql.Connection; import java.sql.SQLDataException; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLIntegrityConstraintViolationException; +import java.sql.SQLNonTransientConnectionException; import java.sql.SQLSyntaxErrorException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.Observable; -import java.util.PriorityQueue; import java.util.Properties; -import java.util.Queue; import java.util.Set; +import java.util.SortedSet; import java.util.TimerTask; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ConcurrentSkipListSet; + +import javax.sql.DataSource; +import javax.sql.rowset.CachedRowSet; +import org.apache.tomcat.jdbc.pool.PoolExhaustedException; +import org.onap.ccsdk.sli.core.dblib.config.DbConfigPool; import org.onap.ccsdk.sli.core.dblib.config.JDBCConfiguration; +import org.onap.ccsdk.sli.core.dblib.factory.DBConfigFactory; +import org.onap.ccsdk.sli.core.dblib.pm.PollingWorker; +import org.onap.ccsdk.sli.core.dblib.pm.SQLExecutionMonitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @version $Revision: 1.15 $ @@ -68,31 +68,7 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb transient protected long retryInterval = 10000L; transient boolean recoveryMode = true; - protected final AtomicBoolean dsSelector = new AtomicBoolean(); - - Queue<CachedDataSource> dsQueue = new PriorityQueue<>(4, new Comparator<CachedDataSource>() { - @Override - public int compare(CachedDataSource left, CachedDataSource right) { - try { - if (left == null) { - return 1; - } - if (right == null) { - return -1; - } - - if (!left.isSlave()) { - return -1; - } - if (!right.isSlave()) { - return 1; - } - } catch (Throwable e) { - LOGGER.warn("", e); - } - return 0; - } -}); + SortedSet<CachedDataSource> dsQueue = new ConcurrentSkipListSet<CachedDataSource>(new DataSourceComparator()); protected final Set<CachedDataSource> broken = Collections.synchronizedSet(new HashSet<CachedDataSource>()); protected final Object monitor = new Object(); protected final Properties configProps; @@ -174,7 +150,6 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb cachedDS[i].addObserver(DBResourceManager.this); } -// CachedDataSource[] cachedDS = factory.initDBResourceManager(dbConfig, DBResourceManager.this, semaphore); DataSourceTester[] tester = new DataSourceTester[config.length]; for(int i=0; i<tester.length; i++){ @@ -184,7 +159,7 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb // the timeout param is set is seconds. long timeout = ((dbConfig.getTimeout() <= 0) ? 60L : dbConfig.getTimeout()); - LOGGER.debug("Timeout set to " +timeout+" seconds"); + LOGGER.debug("Timeout set to {} seconds", timeout); timeout *= 1000; @@ -195,11 +170,39 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb LOGGER.warn("DBResourceManager.initWorker", exc); } finally { startTime = System.currentTimeMillis() - startTime; - LOGGER.info("Completed wait with "+ dsQueue.size() + " active datasource(s) in " + startTime + " ms"); + LOGGER.info("Completed wait with {} active datasource(s) in {} ms", dsQueue.size(), startTime); } } + private final class DataSourceComparator implements Comparator<CachedDataSource> { + @Override + public int compare(CachedDataSource left, CachedDataSource right) { + if(LOGGER.isTraceEnabled()) + LOGGER.trace("----------SORTING-------- () : ()", left.getDbConnectionName(), right.getDbConnectionName()); + try { + if(left == right) { + return 0; + } + if(left == null){ + return 1; + } + if(right == null){ + return -1; + } + + if(!left.isSlave()) + return -1; + if(!right.isSlave()) + return 1; + + } catch (Throwable e) { + LOGGER.warn("", e); + } + return -1; + } + } + class DataSourceTester extends Thread { private final CachedDataSource ds; @@ -224,7 +227,7 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb } } if(!slave) { - LOGGER.info(String.format("Adding MASTER (%s) to active queue", ds.getDbConnectionName())); + LOGGER.info("Adding MASTER {} to active queue", ds.getDbConnectionName()); try { synchronized (semaphoreQ) { semaphoreQ.notifyAll(); @@ -245,7 +248,7 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb } catch(Exception exc) { LOGGER.warn("", exc); } - LOGGER.info(String.format("Thread DataSourceTester terminated %s for %s", this.getName(), ds.getDbConnectionName())); + LOGGER.info("Thread DataSourceTester terminated {} for {}", this.getName(), ds.getDbConnectionName()); } } @@ -300,7 +303,7 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb if(monitor.getParent() instanceof CachedDataSource) { CachedDataSource dataSource = (CachedDataSource)monitor.getParent(); - if(dataSource == dsQueue.peek()) + if(dataSource == dsQueue.first()) { if(recoveryMode && dsQueue.size() > 1){ handleGetConnectionException(dataSource, new Exception(data.toString())); @@ -312,7 +315,7 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb public void testForceRecovery() { - CachedDataSource active = this.dsQueue.peek(); + CachedDataSource active = this.dsQueue.first(); handleGetConnectionException(active, new Exception("test")); } @@ -386,51 +389,45 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb private CachedRowSet requestDataWithRecovery(String statement, ArrayList<Object> arguments, String preferredDS) throws SQLException { Throwable lastException = null; - CachedDataSource active = null; // test if there are any connection pools available - LinkedList<CachedDataSource> sources = new LinkedList<>(this.dsQueue); - if(sources.isEmpty()){ + if(this.dsQueue.isEmpty()){ LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available."); throw new DBLibException("No active DB connection pools are available in RequestDataWithRecovery call."); } - if(preferredDS != null && !sources.peek().getDbConnectionName().equals(preferredDS)) { - Collections.reverse(sources); - } - // loop through available data sources to retrieve data. - while(!sources.isEmpty()) + for(int i=0; i< 2; i++) { - active = sources.peek(); + CachedDataSource active = this.dsQueue.first(); long time = System.currentTimeMillis(); try { if(!active.isFabric()) { + if(this.dsQueue.size() > 1 && active.isSlave()) { CachedDataSource master = findMaster(); if(master != null) { active = master; - master = null; } } - sources.remove(active); + } + return active.getData(statement, arguments); } catch(SQLDataException | SQLSyntaxErrorException | SQLIntegrityConstraintViolationException exc){ throw exc; } catch(Throwable exc){ - lastException = exc; - String message = exc.getMessage(); - if(message == null) { - if(exc.getCause() != null) { - message = exc.getCause().getMessage(); + if(exc instanceof SQLException) { + SQLException sqlExc = (SQLException)exc; + int code = sqlExc.getErrorCode(); + String state = sqlExc.getSQLState(); + LOGGER.debug("SQLException code: {} state: {}", code, state); + if("07001".equals(sqlExc.getSQLState())) { + throw sqlExc; } - if(message == null) - message = exc.getClass().getName(); } - LOGGER.error("Generated alarm: "+active.getDbConnectionName()+" - "+message); + lastException = exc; + LOGGER.error("Generated alarm: "+active.getDbConnectionName(), exc); handleGetConnectionException(active, exc); - if(sources.contains(active)) - sources.remove(active); } finally { if(LOGGER.isDebugEnabled()){ time = System.currentTimeMillis() - time; @@ -462,14 +459,17 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available."); throw new DBLibException("No active DB connection pools are available in RequestDataNoRecovery call."); } - CachedDataSource active = this.dsQueue.peek(); + CachedDataSource active = this.dsQueue.first(); long time = System.currentTimeMillis(); try { if(!active.isFabric()) { + if(this.dsQueue.size() > 1 && active.isSlave()) { CachedDataSource master = findMaster(); - if(master != null) + if(master != null) { active = master; } + } + } return active.getData(statement, arguments); // } catch(SQLDataException exc){ // throw exc; @@ -508,17 +508,21 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb return writeDataNoRecovery(statement, newList, preferredDS); } - CachedDataSource findMaster() throws PoolExhaustedException { - CachedDataSource master = null; - CachedDataSource[] dss = this.dsQueue.toArray(new CachedDataSource[0]); - for(int i=0; i<dss.length; i++) { - if(!dss[i].isSlave()) { - master = dss[i]; - if(i != 0) { - dsQueue.remove(master); - dsQueue.add(master); + synchronized CachedDataSource findMaster() throws SQLException { + final CachedDataSource[] clone = this.dsQueue.toArray(new CachedDataSource[0]); + + for(final CachedDataSource dss : clone) { + if(!dss.isSlave()) { + final CachedDataSource first = this.dsQueue.first(); + if(first != dss) { + if(LOGGER.isDebugEnabled()) + LOGGER.debug("----------REODRERING--------"); + dsQueue.clear(); + if(!dsQueue.addAll(Arrays.asList(clone))) { + LOGGER.error("Failed adding datasources"); + } } - return master; + return dss; } } LOGGER.warn("MASTER not found."); @@ -534,17 +538,19 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb boolean initialRequest = true; boolean retryAllowed = true; - CachedDataSource active = this.dsQueue.peek(); + CachedDataSource active = this.dsQueue.first(); long time = System.currentTimeMillis(); while(initialRequest) { initialRequest = false; try { if(!active.isFabric()) { + if(this.dsQueue.size() > 1 && active.isSlave()) { CachedDataSource master = findMaster(); if(master != null) { active = master; } } + } return active.writeData(statement, arguments); } catch(Throwable exc){ @@ -557,8 +563,7 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb // handle read-only exception if(sqlExc.getErrorCode() == 1290 && "HY000".equals(sqlExc.getSQLState())) { LOGGER.warn("retrying due to: " + sqlExc.getMessage()); - dsQueue.remove(active); - dsQueue.add(active); + this.findMaster(); if(retryAllowed){ retryAllowed = false; initialRequest = true; @@ -604,10 +609,15 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb } try { - active = dsQueue.peek(); - CachedDataSource tmpActive = findMaster(); - if(tmpActive != null) { - active = tmpActive; + active = dsQueue.first(); + + if(!active.isFabric()) { + if(this.dsQueue.size() > 1 && active.isSlave()) { + CachedDataSource master = findMaster(); + if(master != null) { + active = master; + } + } } return new DBLibConnection(active.getConnection(), active); } catch(javax.sql.rowset.spi.SyncFactoryException exc){ @@ -616,6 +626,8 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb lastException = exc; } catch(PoolExhaustedException exc) { throw new NoAvailableConnectionsException(exc); + } catch(SQLNonTransientConnectionException exc){ + throw new NoAvailableConnectionsException(exc); } catch(Exception exc){ lastException = exc; if(recoveryMode){ @@ -667,10 +679,14 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb try { - active = dsQueue.peek(); - CachedDataSource tmpActive = findMaster(); - if(tmpActive != null) { - active = tmpActive; + active = dsQueue.first(); + if(!active.isFabric()) { + if(this.dsQueue.size() > 1 && active.isSlave()) { + CachedDataSource master = findMaster(); + if(master != null) { + active = master; + } + } } return active.getConnection(username, password); } catch(Throwable exc){ @@ -691,7 +707,7 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb throw new DBLibException("No connections available in DBResourceManager in GetConnection call."); } - private void handleGetConnectionException(CachedDataSource source, Throwable exc) { + private void handleGetConnectionException(final CachedDataSource source, Throwable exc) { try { if(!source.canTakeOffLine()) { @@ -715,7 +731,7 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb { if(!dsQueue.isEmpty()) { - LOGGER.warn("DB DataSource <" + dsQueue.peek().getDbConnectionName() + "> became active"); + LOGGER.warn("DB DataSource <" + dsQueue.first().getDbConnectionName() + "> became active"); } } } catch (Exception e) { @@ -749,28 +765,28 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb @Override public PrintWriter getLogWriter() throws SQLException { - return this.dsQueue.peek().getLogWriter(); + return this.dsQueue.first().getLogWriter(); } @Override public int getLoginTimeout() throws SQLException { - return this.dsQueue.peek().getLoginTimeout(); + return this.dsQueue.first().getLoginTimeout(); } @Override public void setLogWriter(PrintWriter out) throws SQLException { - this.dsQueue.peek().setLogWriter(out); + this.dsQueue.first().setLogWriter(out); } @Override public void setLoginTimeout(int seconds) throws SQLException { - this.dsQueue.peek().setLoginTimeout(seconds); + this.dsQueue.first().setLoginTimeout(seconds); } public void displayState(){ if(LOGGER.isDebugEnabled()){ LOGGER.debug("POOLS : Active = "+dsQueue.size() + ";\t Broken = "+broken.size()); - CachedDataSource current = dsQueue.peek(); + CachedDataSource current = dsQueue.first(); if(current != null) { LOGGER.debug("POOL : Active name = \'"+current.getDbConnectionName()+ "\'"); } @@ -811,7 +827,7 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb buffer.append("<td>in recovery</td>"); } if (dsQueue.contains(list.get(i))) { - if (dsQueue.peek() == list.get(i)) + if (dsQueue.first() == list.get(i)) buffer.append("<td>active</td>"); else buffer.append("<td>standby</td>"); @@ -827,7 +843,7 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb buffer.append("in recovery"); } else if (dsQueue.contains(list.get(i))) { - if (dsQueue.peek() == list.get(i)) + if (dsQueue.first() == list.get(i)) buffer.append("active"); else buffer.append("standby"); @@ -859,7 +875,7 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb } public void test(){ - CachedDataSource obj = dsQueue.peek(); + CachedDataSource obj = dsQueue.first(); Exception ption = new Exception(); try { for(int i=0; i<5; i++) @@ -871,77 +887,16 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb } } - public String getPreferredDSName(){ - if(isActive()){ - return getPreferredDataSourceName(dsSelector); - } - return ""; - } - - public String getPreferredDataSourceName(AtomicBoolean flipper) { - - LinkedList<CachedDataSource> snapshot = new LinkedList<>(dsQueue); - if(snapshot.size() > 1){ - CachedDataSource first = snapshot.getFirst(); - CachedDataSource last = snapshot.getLast(); - - int delta = first.getMonitor().getProcessedConnectionsCount() - last.getMonitor().getProcessedConnectionsCount(); - if(delta < 0) { - flipper.set(false); - } else if(delta > 0) { - flipper.set(true); - } else { - // check the last value and return !last - flipper.getAndSet(!flipper.get()); - } - - if (flipper.get()) - Collections.reverse(snapshot); - } - return snapshot.peek().getDbConnectionName(); - } - @Override public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException { return null; } - public String getMasterName() { - if(isActive()){ - return getMasterDataSourceName(dsSelector); - } - return ""; - } - - - private String getMasterDataSourceName(AtomicBoolean flipper) { - - LinkedList<CachedDataSource> snapshot = new LinkedList<>(dsQueue); - if(snapshot.size() > 1){ - CachedDataSource first = snapshot.getFirst(); - CachedDataSource last = snapshot.getLast(); - - int delta = first.getMonitor().getProcessedConnectionsCount() - last.getMonitor().getProcessedConnectionsCount(); - if(delta < 0) { - flipper.set(false); - } else if(delta > 0) { - flipper.set(true); - } else { - // check the last value and return !last - flipper.getAndSet(!flipper.get()); - } - - if (flipper.get()) - Collections.reverse(snapshot); - } - return snapshot.peek().getDbConnectionName(); - } - class RemindTask extends TimerTask { @Override public void run() { - CachedDataSource ds = dsQueue.peek(); + CachedDataSource ds = dsQueue.first(); if(ds != null) ds.getPoolInfo(false); } |