aboutsummaryrefslogtreecommitdiffstats
path: root/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java')
-rwxr-xr-x[-rw-r--r--]dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java169
1 files changed, 134 insertions, 35 deletions
diff --git a/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java
index 7a27a20c..a2eb0f9c 100644..100755
--- a/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java
+++ b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,11 +20,8 @@
package org.onap.ccsdk.sli.core.dblib;
-import com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException;
import org.apache.tomcat.jdbc.pool.PoolExhaustedException;
import org.onap.ccsdk.sli.core.dblib.config.DbConfigPool;
-import org.onap.ccsdk.sli.core.dblib.factory.AbstractDBResourceManagerFactory;
-import org.onap.ccsdk.sli.core.dblib.factory.AbstractResourceManagerFactory;
import org.onap.ccsdk.sli.core.dblib.factory.DBConfigFactory;
import org.onap.ccsdk.sli.core.dblib.pm.PollingWorker;
import org.onap.ccsdk.sli.core.dblib.pm.SQLExecutionMonitor;
@@ -52,8 +49,21 @@ import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.TimerTask;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.sql.DataSource;
+import javax.sql.rowset.CachedRowSet;
+
+import org.apache.tomcat.jdbc.pool.PoolExhaustedException;
+import org.onap.ccsdk.sli.core.dblib.config.DbConfigPool;
+import org.onap.ccsdk.sli.core.dblib.config.JDBCConfiguration;
+import org.onap.ccsdk.sli.core.dblib.factory.DBConfigFactory;
+import org.onap.ccsdk.sli.core.dblib.pm.PollingWorker;
+import org.onap.ccsdk.sli.core.dblib.pm.SQLExecutionMonitor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @version $Revision: 1.15 $
@@ -72,11 +82,18 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb
protected final AtomicBoolean dsSelector = new AtomicBoolean();
// Queue<CachedDataSource> dsQueue = new ConcurrentLinkedQueue<CachedDataSource>();
- Queue<CachedDataSource> dsQueue = new PriorityQueue<>(4, new Comparator<CachedDataSource>() {
+ Queue<CachedDataSource> dsQueue = new PriorityQueue<CachedDataSource>(4, new Comparator<CachedDataSource>() {
@Override
public int compare(CachedDataSource left, CachedDataSource right) {
try {
- if (!left.isSlave()) {
+ if(left == null){
+ return 1;
+ }
+ if(right == null){
+ return -1;
+ }
+
+ if(!left.isSlave()) {
return -1;
}
if (!right.isSlave()) {
@@ -144,31 +161,108 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb
}
private void config(Properties configProps) throws Exception {
-
+ final ConcurrentLinkedQueue<CachedDataSource> semaphore = new ConcurrentLinkedQueue<CachedDataSource>();
final DbConfigPool dbConfig = DBConfigFactory.createConfig(configProps);
- final AbstractResourceManagerFactory factory =
- AbstractDBResourceManagerFactory.getFactory(dbConfig.getType());
- LOGGER.info("Default DB config is : {}", dbConfig.getType());
- LOGGER.info("Using factory : {}", factory.getClass().getName());
- final CachedDataSource[] cachedDS = factory.initDBResourceManager(dbConfig, this);
- if (cachedDS == null || cachedDS.length == 0) {
- LOGGER.error("Initialization of CachedDataSources failed. No instance was created.");
- throw new Exception("Failed to initialize DB Library. No data source was created.");
+ long startTime = System.currentTimeMillis();
+
+ try {
+ JDBCConfiguration[] config = dbConfig.getJDBCbSourceArray();
+ CachedDataSource[] cachedDS = new CachedDataSource[config.length];
+ if (cachedDS == null || cachedDS.length == 0) {
+ LOGGER.error("Initialization of CachedDataSources failed. No instance was created.");
+ throw new Exception("Failed to initialize DB Library. No data source was created.");
+ }
+
+ for(int i = 0; i < config.length; i++) {
+ cachedDS[i] = CachedDataSourceFactory.createDataSource(config[i]);
+ if(cachedDS[i] == null)
+ continue;
+ semaphore.add(cachedDS[i]);
+ cachedDS[i].setInterval(monitoringInterval);
+ cachedDS[i].setInitialDelay(monitoringInitialDelay);
+ cachedDS[i].setExpectedCompletionTime(expectedCompletionTime);
+ cachedDS[i].setUnprocessedFailoverThreshold(unprocessedFailoverThreshold);
+ cachedDS[i].addObserver(DBResourceManager.this);
+ }
+
+// CachedDataSource[] cachedDS = factory.initDBResourceManager(dbConfig, DBResourceManager.this, semaphore);
+ DataSourceTester[] tester = new DataSourceTester[config.length];
+
+ for(int i=0; i<tester.length; i++){
+ tester[i] = new DataSourceTester(cachedDS[i], DBResourceManager.this, semaphore);
+ tester[i].start();
+ }
+
+ // the timeout param is set is seconds.
+ long timeout = ((dbConfig.getTimeout() <= 0) ? 60L : dbConfig.getTimeout());
+ LOGGER.debug("Timeout set to " +timeout+" seconds");
+ timeout *= 1000;
+
+
+ synchronized (semaphore) {
+ semaphore.wait(timeout);
+ }
+ } catch(Exception exc){
+ LOGGER.warn("DBResourceManager.initWorker", exc);
+ } finally {
+ startTime = System.currentTimeMillis() - startTime;
+ LOGGER.info("Completed wait with "+ dsQueue.size() + " active datasource(s) in " + startTime + " ms");
+ }
+ }
+
+
+ class DataSourceTester extends Thread {
+
+ private final CachedDataSource ds;
+ private final DBResourceManager manager;
+ private final ConcurrentLinkedQueue<CachedDataSource> semaphoreQ;
+
+ public DataSourceTester(CachedDataSource ds, DBResourceManager manager, ConcurrentLinkedQueue<CachedDataSource> semaphore) {
+ this.ds = ds;
+ this.manager = manager;
+ this.semaphoreQ = semaphore;
}
- for (final CachedDataSource ds : cachedDS) {
- if(ds != null && ds.isInitialized()){
- setDataSource(ds);
- ds.setInterval(monitoringInterval);
- ds.setInitialDelay(monitoringInitialDelay);
- ds.setExpectedCompletionTime(expectedCompletionTime);
- ds.setUnprocessedFailoverThreshold(unprocessedFailoverThreshold);
- ds.addObserver(this);
+ @Override
+ public void run() {
+ manager.setDataSource(ds);
+ boolean slave = true;
+ if(ds != null) {
+ try {
+ slave = ds.isSlave();
+ } catch (Exception exc) {
+ LOGGER.warn("", exc);
+ }
}
+ if(!slave) {
+ LOGGER.info(String.format("Adding MASTER (%s) to active queue", ds.getDbConnectionName()));
+ try {
+ synchronized (semaphoreQ) {
+ semaphoreQ.notifyAll();
+ }
+ } catch(Exception exc) {
+ LOGGER.warn("", exc);
+ }
}
+ try {
+ synchronized (semaphoreQ) {
+ semaphoreQ.remove(ds);
+ }
+ if(semaphoreQ.isEmpty()) {
+ synchronized (semaphoreQ) {
+ semaphoreQ.notifyAll();
+ }
+ }
+ } catch(Exception exc) {
+ LOGGER.warn("", exc);
+ }
+ LOGGER.info(String.format("Thread DataSourceTester terminated %s for %s", this.getName(), ds.getDbConnectionName()));
+ }
+
}
+
private long getLongFromProperties(Properties props, String property, long defaultValue)
{
String value = null;
@@ -306,7 +400,7 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb
CachedDataSource active = null;
// test if there are any connection pools available
- LinkedList<CachedDataSource> sources = new LinkedList<>(this.dsQueue);
+ LinkedList<CachedDataSource> sources = new LinkedList<CachedDataSource>(this.dsQueue);
if(sources.isEmpty()){
LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available.");
throw new DBLibException("No active DB connection pools are available in RequestDataWithRecovery call.");
@@ -423,8 +517,8 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb
return writeDataNoRecovery(statement, newList, preferredDS);
}
- CachedDataSource findMaster() throws PoolExhaustedException, MySQLNonTransientConnectionException {
- CachedDataSource master;
+ CachedDataSource findMaster() throws PoolExhaustedException {
+ CachedDataSource master = null;
CachedDataSource[] dss = this.dsQueue.toArray(new CachedDataSource[0]);
for(int i=0; i<dss.length; i++) {
if(!dss[i].isSlave()) {
@@ -496,7 +590,12 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb
return true;
}
- private void setDataSource(CachedDataSource dataSource) {
+ public void setDataSource(CachedDataSource dataSource) {
+ if(this.dsQueue.contains(dataSource))
+ return;
+ if(this.broken.contains(dataSource))
+ return;
+
if(dataSource.testConnection(true)){
this.dsQueue.add(dataSource);
} else {
@@ -525,16 +624,12 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb
lastException = exc;
} catch(PoolExhaustedException exc) {
throw new NoAvailableConnectionsException(exc);
- } catch(MySQLNonTransientConnectionException exc){
- throw new NoAvailableConnectionsException(exc);
} catch(Exception exc){
lastException = exc;
if(recoveryMode){
handleGetConnectionException(active, exc);
} else {
- if(exc instanceof MySQLNonTransientConnectionException) {
- throw new NoAvailableConnectionsException(exc);
- } if(exc instanceof SQLException) {
+ if(exc instanceof SQLException) {
throw (SQLException)exc;
} else {
DBLibException excptn = new DBLibException(exc.getMessage());
@@ -700,7 +795,7 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb
public String getDBStatus(boolean htmlFormat) {
StringBuilder buffer = new StringBuilder();
- ArrayList<CachedDataSource> list = new ArrayList<>();
+ ArrayList<CachedDataSource> list = new ArrayList<CachedDataSource>();
list.addAll(dsQueue);
list.addAll(broken);
if (htmlFormat)
@@ -785,7 +880,7 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb
public String getPreferredDataSourceName(AtomicBoolean flipper) {
- LinkedList<CachedDataSource> snapshot = new LinkedList<>(dsQueue);
+ LinkedList<CachedDataSource> snapshot = new LinkedList<CachedDataSource>(dsQueue);
if(snapshot.size() > 1){
CachedDataSource first = snapshot.getFirst();
CachedDataSource last = snapshot.getLast();
@@ -849,4 +944,8 @@ public class DBResourceManager implements DataSource, DataAccessor, DBResourceOb
ds.getPoolInfo(false);
}
}
+
+ public int poolSize() {
+ return dsQueue.size();
+ }
}