diff options
Diffstat (limited to 'dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java')
-rwxr-xr-x[-rw-r--r--] | dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java | 169 |
1 files changed, 134 insertions, 35 deletions
diff --git a/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java index 7a27a20c..a2eb0f9c 100644..100755 --- a/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java +++ b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,11 +20,8 @@ package org.onap.ccsdk.sli.core.dblib; -import com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException; import org.apache.tomcat.jdbc.pool.PoolExhaustedException; import org.onap.ccsdk.sli.core.dblib.config.DbConfigPool; -import org.onap.ccsdk.sli.core.dblib.factory.AbstractDBResourceManagerFactory; -import org.onap.ccsdk.sli.core.dblib.factory.AbstractResourceManagerFactory; import org.onap.ccsdk.sli.core.dblib.factory.DBConfigFactory; import org.onap.ccsdk.sli.core.dblib.pm.PollingWorker; import org.onap.ccsdk.sli.core.dblib.pm.SQLExecutionMonitor; @@ -52,8 +49,21 @@ import java.util.Properties; import java.util.Queue; import java.util.Set; import java.util.TimerTask; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import javax.sql.DataSource; +import javax.sql.rowset.CachedRowSet; + +import org.apache.tomcat.jdbc.pool.PoolExhaustedException; +import org.onap.ccsdk.sli.core.dblib.config.DbConfigPool; +import org.onap.ccsdk.sli.core.dblib.config.JDBCConfiguration; +import org.onap.ccsdk.sli.core.dblib.factory.DBConfigFactory; +import org.onap.ccsdk.sli.core.dblib.pm.PollingWorker; +import org.onap.ccsdk.sli.core.dblib.pm.SQLExecutionMonitor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @version $Revision: 1.15 $ @@ -72,11 +82,18 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb protected final AtomicBoolean dsSelector = new AtomicBoolean(); // Queue<CachedDataSource> dsQueue = new ConcurrentLinkedQueue<CachedDataSource>(); - Queue<CachedDataSource> dsQueue = new PriorityQueue<>(4, new Comparator<CachedDataSource>() { + Queue<CachedDataSource> dsQueue = new PriorityQueue<CachedDataSource>(4, new Comparator<CachedDataSource>() { @Override public int compare(CachedDataSource left, CachedDataSource right) { try { - if (!left.isSlave()) { + if(left == null){ + return 1; + } + if(right == null){ + return -1; + } + + if(!left.isSlave()) { return -1; } if (!right.isSlave()) { @@ -144,31 +161,108 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb } private void config(Properties configProps) throws Exception { - + final ConcurrentLinkedQueue<CachedDataSource> semaphore = new ConcurrentLinkedQueue<CachedDataSource>(); final DbConfigPool dbConfig = DBConfigFactory.createConfig(configProps); - final AbstractResourceManagerFactory factory = - AbstractDBResourceManagerFactory.getFactory(dbConfig.getType()); - LOGGER.info("Default DB config is : {}", dbConfig.getType()); - LOGGER.info("Using factory : {}", factory.getClass().getName()); - final CachedDataSource[] cachedDS = factory.initDBResourceManager(dbConfig, this); - if (cachedDS == null || cachedDS.length == 0) { - LOGGER.error("Initialization of CachedDataSources failed. No instance was created."); - throw new Exception("Failed to initialize DB Library. No data source was created."); + long startTime = System.currentTimeMillis(); + + try { + JDBCConfiguration[] config = dbConfig.getJDBCbSourceArray(); + CachedDataSource[] cachedDS = new CachedDataSource[config.length]; + if (cachedDS == null || cachedDS.length == 0) { + LOGGER.error("Initialization of CachedDataSources failed. No instance was created."); + throw new Exception("Failed to initialize DB Library. No data source was created."); + } + + for(int i = 0; i < config.length; i++) { + cachedDS[i] = CachedDataSourceFactory.createDataSource(config[i]); + if(cachedDS[i] == null) + continue; + semaphore.add(cachedDS[i]); + cachedDS[i].setInterval(monitoringInterval); + cachedDS[i].setInitialDelay(monitoringInitialDelay); + cachedDS[i].setExpectedCompletionTime(expectedCompletionTime); + cachedDS[i].setUnprocessedFailoverThreshold(unprocessedFailoverThreshold); + cachedDS[i].addObserver(DBResourceManager.this); + } + +// CachedDataSource[] cachedDS = factory.initDBResourceManager(dbConfig, DBResourceManager.this, semaphore); + DataSourceTester[] tester = new DataSourceTester[config.length]; + + for(int i=0; i<tester.length; i++){ + tester[i] = new DataSourceTester(cachedDS[i], DBResourceManager.this, semaphore); + tester[i].start(); + } + + // the timeout param is set is seconds. + long timeout = ((dbConfig.getTimeout() <= 0) ? 60L : dbConfig.getTimeout()); + LOGGER.debug("Timeout set to " +timeout+" seconds"); + timeout *= 1000; + + + synchronized (semaphore) { + semaphore.wait(timeout); + } + } catch(Exception exc){ + LOGGER.warn("DBResourceManager.initWorker", exc); + } finally { + startTime = System.currentTimeMillis() - startTime; + LOGGER.info("Completed wait with "+ dsQueue.size() + " active datasource(s) in " + startTime + " ms"); + } + } + + + class DataSourceTester extends Thread { + + private final CachedDataSource ds; + private final DBResourceManager manager; + private final ConcurrentLinkedQueue<CachedDataSource> semaphoreQ; + + public DataSourceTester(CachedDataSource ds, DBResourceManager manager, ConcurrentLinkedQueue<CachedDataSource> semaphore) { + this.ds = ds; + this.manager = manager; + this.semaphoreQ = semaphore; } - for (final CachedDataSource ds : cachedDS) { - if(ds != null && ds.isInitialized()){ - setDataSource(ds); - ds.setInterval(monitoringInterval); - ds.setInitialDelay(monitoringInitialDelay); - ds.setExpectedCompletionTime(expectedCompletionTime); - ds.setUnprocessedFailoverThreshold(unprocessedFailoverThreshold); - ds.addObserver(this); + @Override + public void run() { + manager.setDataSource(ds); + boolean slave = true; + if(ds != null) { + try { + slave = ds.isSlave(); + } catch (Exception exc) { + LOGGER.warn("", exc); + } } + if(!slave) { + LOGGER.info(String.format("Adding MASTER (%s) to active queue", ds.getDbConnectionName())); + try { + synchronized (semaphoreQ) { + semaphoreQ.notifyAll(); + } + } catch(Exception exc) { + LOGGER.warn("", exc); + } } + try { + synchronized (semaphoreQ) { + semaphoreQ.remove(ds); + } + if(semaphoreQ.isEmpty()) { + synchronized (semaphoreQ) { + semaphoreQ.notifyAll(); + } + } + } catch(Exception exc) { + LOGGER.warn("", exc); + } + LOGGER.info(String.format("Thread DataSourceTester terminated %s for %s", this.getName(), ds.getDbConnectionName())); + } + } + private long getLongFromProperties(Properties props, String property, long defaultValue) { String value = null; @@ -306,7 +400,7 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb CachedDataSource active = null; // test if there are any connection pools available - LinkedList<CachedDataSource> sources = new LinkedList<>(this.dsQueue); + LinkedList<CachedDataSource> sources = new LinkedList<CachedDataSource>(this.dsQueue); if(sources.isEmpty()){ LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available."); throw new DBLibException("No active DB connection pools are available in RequestDataWithRecovery call."); @@ -423,8 +517,8 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb return writeDataNoRecovery(statement, newList, preferredDS); } - CachedDataSource findMaster() throws PoolExhaustedException, MySQLNonTransientConnectionException { - CachedDataSource master; + CachedDataSource findMaster() throws PoolExhaustedException { + CachedDataSource master = null; CachedDataSource[] dss = this.dsQueue.toArray(new CachedDataSource[0]); for(int i=0; i<dss.length; i++) { if(!dss[i].isSlave()) { @@ -496,7 +590,12 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb return true; } - private void setDataSource(CachedDataSource dataSource) { + public void setDataSource(CachedDataSource dataSource) { + if(this.dsQueue.contains(dataSource)) + return; + if(this.broken.contains(dataSource)) + return; + if(dataSource.testConnection(true)){ this.dsQueue.add(dataSource); } else { @@ -525,16 +624,12 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb lastException = exc; } catch(PoolExhaustedException exc) { throw new NoAvailableConnectionsException(exc); - } catch(MySQLNonTransientConnectionException exc){ - throw new NoAvailableConnectionsException(exc); } catch(Exception exc){ lastException = exc; if(recoveryMode){ handleGetConnectionException(active, exc); } else { - if(exc instanceof MySQLNonTransientConnectionException) { - throw new NoAvailableConnectionsException(exc); - } if(exc instanceof SQLException) { + if(exc instanceof SQLException) { throw (SQLException)exc; } else { DBLibException excptn = new DBLibException(exc.getMessage()); @@ -700,7 +795,7 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb public String getDBStatus(boolean htmlFormat) { StringBuilder buffer = new StringBuilder(); - ArrayList<CachedDataSource> list = new ArrayList<>(); + ArrayList<CachedDataSource> list = new ArrayList<CachedDataSource>(); list.addAll(dsQueue); list.addAll(broken); if (htmlFormat) @@ -785,7 +880,7 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb public String getPreferredDataSourceName(AtomicBoolean flipper) { - LinkedList<CachedDataSource> snapshot = new LinkedList<>(dsQueue); + LinkedList<CachedDataSource> snapshot = new LinkedList<CachedDataSource>(dsQueue); if(snapshot.size() > 1){ CachedDataSource first = snapshot.getFirst(); CachedDataSource last = snapshot.getLast(); @@ -849,4 +944,8 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb ds.getPoolInfo(false); } } + + public int poolSize() { + return dsQueue.size(); + } } |