summaryrefslogtreecommitdiffstats
path: root/dblib/provider/src
diff options
context:
space:
mode:
authorRich Tabedzki <richard.tabedzki@att.com>2018-01-05 20:26:58 +0000
committerRich Tabedzki <richard.tabedzki@att.com>2018-01-06 17:11:58 +0000
commitd3cd4dd5d5f300684be256eb1bf70d750a9a957e (patch)
treec347d5c84fe597cf2232de529a8219f4f0a7b16f /dblib/provider/src
parent50e04450a21eb584fb843fbe009b29798b94a413 (diff)
Generalization of CCSDK core/utils framework
Changes made: * Created generalized version of core/utils/dblib as core/utils/common * Deprecated core/utils/dblib package Change-Id: I0992c43910278fbe254674d1e39d7e4fcad0a592 Issue-ID: CCSDK-168 Signed-off-by: Rich Tabedzki <richard.tabedzki@att.com>
Diffstat (limited to 'dblib/provider/src')
-rw-r--r--dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBLIBResourceProvider.java18
-rwxr-xr-xdblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java1759
2 files changed, 889 insertions, 888 deletions
diff --git a/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBLIBResourceProvider.java b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBLIBResourceProvider.java
index 411f04705..ddfb73316 100644
--- a/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBLIBResourceProvider.java
+++ b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBLIBResourceProvider.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,11 +27,11 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Vector;
-import org.onap.ccsdk.sli.core.utils.dblib.DblibDefaultFileResolver;
-import org.onap.ccsdk.sli.core.utils.dblib.DblibEnvVarFileResolver;
import org.onap.ccsdk.sli.core.utils.JREFileResolver;
import org.onap.ccsdk.sli.core.utils.KarafRootFileResolver;
import org.onap.ccsdk.sli.core.utils.PropertiesFileResolver;
+import org.onap.ccsdk.sli.core.utils.common.CoreDefaultFileResolver;
+import org.onap.ccsdk.sli.core.utils.common.SdncConfigEnvVarFileResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +58,7 @@ public class DBLIBResourceProvider {
/**
* A prioritized list of strategies for resolving dblib properties files.
*/
- private Vector<PropertiesFileResolver> dblibPropertiesFileResolvers = new Vector();
+ private Vector<PropertiesFileResolver> dblibPropertiesFileResolvers = new Vector<>();
/**
* The configuration properties for the db connection.
@@ -69,14 +69,14 @@ public class DBLIBResourceProvider {
* Set up the prioritized list of strategies for resolving dblib properties files.
*/
public DBLIBResourceProvider() {
- dblibPropertiesFileResolvers.add(new DblibEnvVarFileResolver(
+ dblibPropertiesFileResolvers.add(new SdncConfigEnvVarFileResolver(
"Using property file (1) from environment variable"
));
- dblibPropertiesFileResolvers.add(new DblibDefaultFileResolver(
- "Using property file (1) from default directory"
+ dblibPropertiesFileResolvers.add(new CoreDefaultFileResolver(
+ "Using property file (2) from default directory"
));
dblibPropertiesFileResolvers.add(new JREFileResolver(
- "Using property file (2) from JRE argument", DBLIBResourceProvider.class
+ "Using property file (3) from JRE argument", DBLIBResourceProvider.class
));
dblibPropertiesFileResolvers.add(new KarafRootFileResolver(
"Using property file (4) from karaf root", this));
diff --git a/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java
index 61fb000fb..b31645d41 100755
--- a/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java
+++ b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/DBResourceManager.java
@@ -62,891 +62,892 @@ import org.onap.ccsdk.sli.core.dblib.config.JDBCConfiguration;
* Rich Tabedzki
*/
public class DBResourceManager implements DataSource, DataAccessor, DBResourceObserver, DbLibService {
- private static Logger LOGGER = LoggerFactory.getLogger(DBResourceManager.class);
-
- transient boolean terminating = false;
- transient protected long retryInterval = 10000L;
- transient boolean recoveryMode = true;
-
- protected final AtomicBoolean dsSelector = new AtomicBoolean();
-
-// Queue<CachedDataSource> dsQueue = new ConcurrentLinkedQueue<CachedDataSource>();
- Queue<CachedDataSource> dsQueue = new PriorityQueue<>(4, new Comparator<CachedDataSource>() {
- @Override
- public int compare(CachedDataSource left, CachedDataSource right) {
- try {
- if (left == null) {
- return 1;
- }
- if (right == null) {
- return -1;
- }
-
- if (!left.isSlave()) {
- return -1;
- }
- if (!right.isSlave()) {
- return 1;
- }
- } catch (Throwable e) {
- LOGGER.warn("", e);
- }
- return 0;
- }
+ private static Logger LOGGER = LoggerFactory.getLogger(DBResourceManager.class);
+
+ transient boolean terminating = false;
+ transient protected long retryInterval = 10000L;
+ transient boolean recoveryMode = true;
+
+ protected final AtomicBoolean dsSelector = new AtomicBoolean();
+
+ Queue<CachedDataSource> dsQueue = new PriorityQueue<>(4, new Comparator<CachedDataSource>() {
+ @Override
+ public int compare(CachedDataSource left, CachedDataSource right) {
+ try {
+ if (left == null) {
+ return 1;
+ }
+ if (right == null) {
+ return -1;
+ }
+
+ if (!left.isSlave()) {
+ return -1;
+ }
+ if (!right.isSlave()) {
+ return 1;
+ }
+ } catch (Throwable e) {
+ LOGGER.warn("", e);
+ }
+ return 0;
+ }
});
- protected final Set<CachedDataSource> broken = Collections.synchronizedSet(new HashSet<CachedDataSource>());
- protected final Object monitor = new Object();
- protected final Properties configProps;
- protected final Thread worker;
-
- protected final long terminationTimeOut;
- protected final boolean monitorDbResponse;
- protected final long monitoringInterval;
- protected final long monitoringInitialDelay;
- protected final long expectedCompletionTime;
- protected final long unprocessedFailoverThreshold;
-
- public DBResourceManager(final DBLIBResourceProvider configuration) {
- this(configuration.getProperties());
- }
-
- public DBResourceManager(final Properties properties) {
- this.configProps = properties;
-
- // get retry interval value
- retryInterval = getLongFromProperties(properties, "org.onap.dblib.connection.retry", 10000L);
-
- // get recovery mode flag
- recoveryMode = getBooleanFromProperties(properties, "org.onap.dblib.connection.recovery", true);
- if(!recoveryMode)
- {
- recoveryMode = false;
- LOGGER.info("Recovery Mode disabled");
- }
- // get time out value for thread cleanup
- terminationTimeOut = getLongFromProperties(properties, "org.onap.dblib.termination.timeout", 300000L);
- // get properties for monitoring
- monitorDbResponse = getBooleanFromProperties(properties, "org.onap.dblib.connection.monitor", false);
- monitoringInterval = getLongFromProperties(properties, "org.onap.dblib.connection.monitor.interval", 1000L);
- monitoringInitialDelay = getLongFromProperties(properties, "org.onap.dblib.connection.monitor.startdelay", 5000L);
- expectedCompletionTime = getLongFromProperties(properties, "org.onap.dblib.connection.monitor.expectedcompletiontime", 5000L);
- unprocessedFailoverThreshold = getLongFromProperties(properties, "org.onap.dblib.connection.monitor.unprocessedfailoverthreshold", 3L);
-
- // initialize performance monitor
- PollingWorker.createInistance(properties);
-
- // initialize recovery thread
- worker = new RecoveryMgr();
- worker.setName("DBResourcemanagerWatchThread");
- worker.setDaemon(true);
- worker.start();
-
- try {
- this.config(properties);
- } catch (final Exception e) {
- // TODO: config throws <code>Exception</code> which is poor practice. Eliminate this in a separate patch.
- LOGGER.error("Fatal Exception encountered while configuring DBResourceManager", e);
- }
- }
-
- private void config(Properties configProps) throws Exception {
- final ConcurrentLinkedQueue<CachedDataSource> semaphore = new ConcurrentLinkedQueue<>();
- final DbConfigPool dbConfig = DBConfigFactory.createConfig(configProps);
-
- long startTime = System.currentTimeMillis();
-
- try {
- JDBCConfiguration[] config = dbConfig.getJDBCbSourceArray();
- CachedDataSource[] cachedDS = new CachedDataSource[config.length];
- if (cachedDS == null || cachedDS.length == 0) {
- LOGGER.error("Initialization of CachedDataSources failed. No instance was created.");
- throw new Exception("Failed to initialize DB Library. No data source was created.");
- }
-
- for(int i = 0; i < config.length; i++) {
- cachedDS[i] = CachedDataSourceFactory.createDataSource(config[i]);
- if(cachedDS[i] == null)
- continue;
- semaphore.add(cachedDS[i]);
- cachedDS[i].setInterval(monitoringInterval);
- cachedDS[i].setInitialDelay(monitoringInitialDelay);
- cachedDS[i].setExpectedCompletionTime(expectedCompletionTime);
- cachedDS[i].setUnprocessedFailoverThreshold(unprocessedFailoverThreshold);
- cachedDS[i].addObserver(DBResourceManager.this);
- }
-
-// CachedDataSource[] cachedDS = factory.initDBResourceManager(dbConfig, DBResourceManager.this, semaphore);
- DataSourceTester[] tester = new DataSourceTester[config.length];
-
- for(int i=0; i<tester.length; i++){
- tester[i] = new DataSourceTester(cachedDS[i], DBResourceManager.this, semaphore);
- tester[i].start();
- }
-
- // the timeout param is set is seconds.
- long timeout = ((dbConfig.getTimeout() <= 0) ? 60L : dbConfig.getTimeout());
- LOGGER.debug("Timeout set to " +timeout+" seconds");
- timeout *= 1000;
-
-
- synchronized (semaphore) {
- semaphore.wait(timeout);
- }
- } catch(Exception exc){
- LOGGER.warn("DBResourceManager.initWorker", exc);
- } finally {
- startTime = System.currentTimeMillis() - startTime;
- LOGGER.info("Completed wait with "+ dsQueue.size() + " active datasource(s) in " + startTime + " ms");
- }
- }
-
-
- class DataSourceTester extends Thread {
-
- private final CachedDataSource ds;
- private final DBResourceManager manager;
- private final ConcurrentLinkedQueue<CachedDataSource> semaphoreQ;
-
- public DataSourceTester(CachedDataSource ds, DBResourceManager manager, ConcurrentLinkedQueue<CachedDataSource> semaphore) {
- this.ds = ds;
- this.manager = manager;
- this.semaphoreQ = semaphore;
- }
-
- @Override
- public void run() {
- manager.setDataSource(ds);
- boolean slave = true;
- if(ds != null) {
- try {
- slave = ds.isSlave();
- } catch (Exception exc) {
- LOGGER.warn("", exc);
- }
- }
- if(!slave) {
- LOGGER.info(String.format("Adding MASTER (%s) to active queue", ds.getDbConnectionName()));
- try {
- synchronized (semaphoreQ) {
- semaphoreQ.notifyAll();
- }
- } catch(Exception exc) {
- LOGGER.warn("", exc);
- }
- }
- try {
- synchronized (semaphoreQ) {
- semaphoreQ.remove(ds);
- }
- if(semaphoreQ.isEmpty()) {
- synchronized (semaphoreQ) {
- semaphoreQ.notifyAll();
- }
- }
- } catch(Exception exc) {
- LOGGER.warn("", exc);
- }
- LOGGER.info(String.format("Thread DataSourceTester terminated %s for %s", this.getName(), ds.getDbConnectionName()));
- }
-
- }
-
-
- private long getLongFromProperties(Properties props, String property, long defaultValue)
- {
- String value = null;
- long tmpLongValue = defaultValue;
- try {
- value = props.getProperty(property);
- if(value != null)
- tmpLongValue = Long.parseLong(value);
-
- } catch(NumberFormatException exc) {
- if(LOGGER.isWarnEnabled()){
- LOGGER.warn("'"+property+"'=" + value+" is invalid. It should be a numeric value");
- }
- } catch(Exception exc) {
- }
- return tmpLongValue;
-
- }
-
- private boolean getBooleanFromProperties(Properties props, String property, boolean defaultValue)
- {
- boolean tmpValue = defaultValue;
- String value = null;
-
- try {
- value = props.getProperty(property);
- if(value != null)
- tmpValue = Boolean.parseBoolean(value);
-
- } catch(NumberFormatException exc) {
- if(LOGGER.isWarnEnabled()){
- LOGGER.warn("'"+property+"'=" + value+" is invalid. It should be a boolean value");
- }
- } catch(Exception exc) {
- }
- return tmpValue;
-
- }
-
-
- @Override
- public void update(Observable observable, Object data) {
- // if observable is active and there is a standby available, switch
- if(observable instanceof SQLExecutionMonitor)
- {
- SQLExecutionMonitor monitor = (SQLExecutionMonitor)observable;
- if(monitor.getParent() instanceof CachedDataSource)
- {
- CachedDataSource dataSource = (CachedDataSource)monitor.getParent();
- if(dataSource == dsQueue.peek())
- {
- if(recoveryMode && dsQueue.size() > 1){
- handleGetConnectionException(dataSource, new Exception(data.toString()));
- }
- }
- }
- }
- }
-
- public void testForceRecovery()
- {
- CachedDataSource active = this.dsQueue.peek();
- handleGetConnectionException(active, new Exception("test"));
- }
-
- class RecoveryMgr extends Thread {
-
- @Override
- public void run() {
- while(!terminating)
- {
- try {
- Thread.sleep(retryInterval);
- } catch (InterruptedException e1) { }
- CachedDataSource brokenSource = null;
- try {
- if (!broken.isEmpty()) {
- CachedDataSource[] sourceArray = broken.toArray(new CachedDataSource[0]);
- for (int i = 0; i < sourceArray.length; i++)
- {
- brokenSource = sourceArray[i];
- if (brokenSource instanceof TerminatingCachedDataSource)
- break;
- if (resetConnectionPool(brokenSource)) {
- broken.remove(brokenSource);
- brokenSource.blockImmediateOffLine();
- dsQueue.add(brokenSource);
- LOGGER.info("DataSource <"
- + brokenSource.getDbConnectionName()
- + "> recovered.");
- }
- brokenSource = null;
- }
- }
- } catch (Exception exc) {
- LOGGER.warn(exc.getMessage());
- if(brokenSource != null){
- try {
- if(!broken.contains(brokenSource))
- broken.add(brokenSource);
- brokenSource = null;
- } catch (Exception e1) { }
- }
- }
- }
- LOGGER.info("DBResourceManager.RecoveryMgr <"+this.toString() +"> terminated." );
- }
-
- private boolean resetConnectionPool(CachedDataSource dataSource){
- try {
- return dataSource.testConnection();
- } catch (Exception exc) {
- LOGGER.info("DataSource <" + dataSource.getDbConnectionName() + "> resetCache failed with error: "+ exc.getMessage());
- return false;
- }
- }
- }
-
- /* (non-Javadoc)
- * @see org.onap.ccsdk.sli.resource.dblib.DbLibService#getData(java.lang.String, java.util.ArrayList, java.lang.String)
- */
- @Override
- public CachedRowSet getData(String statement, ArrayList<String> arguments, String preferredDS) throws SQLException {
- ArrayList<Object> newList= new ArrayList<>();
- if(arguments != null && !arguments.isEmpty()) {
- newList.addAll(arguments);
- }
- if(recoveryMode)
- return requestDataWithRecovery(statement, newList, preferredDS);
- else
- return requestDataNoRecovery(statement, newList, preferredDS);
- }
-
- private CachedRowSet requestDataWithRecovery(String statement, ArrayList<Object> arguments, String preferredDS) throws SQLException {
- Throwable lastException = null;
- CachedDataSource active = null;
-
- // test if there are any connection pools available
- LinkedList<CachedDataSource> sources = new LinkedList<>(this.dsQueue);
- if(sources.isEmpty()){
- LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available.");
- throw new DBLibException("No active DB connection pools are available in RequestDataWithRecovery call.");
- }
- if(preferredDS != null && !sources.peek().getDbConnectionName().equals(preferredDS)) {
- Collections.reverse(sources);
- }
-
-
- // loop through available data sources to retrieve data.
- while(!sources.isEmpty())
- {
- active = sources.peek();
-
- long time = System.currentTimeMillis();
- try {
- if(!active.isFabric()) {
- CachedDataSource master = findMaster();
- if(master != null) {
- active = master;
- master = null;
- }
- }
- sources.remove(active);
- return active.getData(statement, arguments);
- } catch(SQLDataException | SQLSyntaxErrorException | SQLIntegrityConstraintViolationException exc){
- throw exc;
- } catch(Throwable exc){
- lastException = exc;
- String message = exc.getMessage();
- if(message == null) {
- if(exc.getCause() != null) {
- message = exc.getCause().getMessage();
- }
- if(message == null)
- message = exc.getClass().getName();
- }
- LOGGER.error("Generated alarm: "+active.getDbConnectionName()+" - "+message);
- handleGetConnectionException(active, exc);
- } finally {
- if(LOGGER.isDebugEnabled()){
- time = System.currentTimeMillis() - time;
- LOGGER.debug("getData processing time : "+ active.getDbConnectionName()+" "+time+" miliseconds.");
- }
- }
- }
- if(lastException instanceof SQLException){
- throw (SQLException)lastException;
- }
- // repackage the exception
- // you are here because either you run out of available data sources
- // or the last exception was not of SQLException type.
- // repackage the exception
- if(lastException == null) {
- throw new DBLibException("The operation timed out while waiting to acquire a new connection." );
- } else {
- SQLException exception = new DBLibException(lastException.getMessage());
- exception.setStackTrace(lastException.getStackTrace());
- if(lastException.getCause() instanceof SQLException) {
- throw (SQLException)lastException.getCause();
- }
- throw exception;
- }
- }
-
- private CachedRowSet requestDataNoRecovery(String statement, ArrayList<Object> arguments, String preferredDS) throws SQLException {
- if(dsQueue.isEmpty()){
- LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available.");
- throw new DBLibException("No active DB connection pools are available in RequestDataNoRecovery call.");
- }
- CachedDataSource active = this.dsQueue.peek();
- long time = System.currentTimeMillis();
- try {
- if(!active.isFabric()) {
- CachedDataSource master = findMaster();
- if(master != null)
- active = master;
- }
- return active.getData(statement, arguments);
-// } catch(SQLDataException exc){
-// throw exc;
- } catch(Throwable exc){
- String message = exc.getMessage();
- if(message == null)
- message = exc.getClass().getName();
- LOGGER.error("Generated alarm: "+active.getDbConnectionName()+" - "+message);
- if(exc instanceof SQLException)
- throw (SQLException)exc;
- else {
- DBLibException excptn = new DBLibException(exc.getMessage());
- excptn.setStackTrace(exc.getStackTrace());
- throw excptn;
- }
- } finally {
- if(LOGGER.isDebugEnabled()){
- time = System.currentTimeMillis() - time;
- LOGGER.debug(">> getData : "+ active.getDbConnectionName()+" "+time+" miliseconds.");
- }
- }
- }
-
-
- /* (non-Javadoc)
- * @see org.onap.ccsdk.sli.resource.dblib.DbLibService#writeData(java.lang.String, java.util.ArrayList, java.lang.String)
- */
- @Override
- public boolean writeData(String statement, ArrayList<String> arguments, String preferredDS) throws SQLException
+ protected final Set<CachedDataSource> broken = Collections.synchronizedSet(new HashSet<CachedDataSource>());
+ protected final Object monitor = new Object();
+ protected final Properties configProps;
+ protected final Thread worker;
+
+ protected final long terminationTimeOut;
+ protected final boolean monitorDbResponse;
+ protected final long monitoringInterval;
+ protected final long monitoringInitialDelay;
+ protected final long expectedCompletionTime;
+ protected final long unprocessedFailoverThreshold;
+
+ public DBResourceManager(final DBLIBResourceProvider configuration) {
+ this(configuration.getProperties());
+ }
+
+ public DBResourceManager(final Properties properties) {
+ this.configProps = properties;
+
+ // get retry interval value
+ retryInterval = getLongFromProperties(properties, "org.onap.dblib.connection.retry", 10000L);
+
+ // get recovery mode flag
+ recoveryMode = getBooleanFromProperties(properties, "org.onap.dblib.connection.recovery", true);
+ if(!recoveryMode)
+ {
+ recoveryMode = false;
+ LOGGER.info("Recovery Mode disabled");
+ }
+ // get time out value for thread cleanup
+ terminationTimeOut = getLongFromProperties(properties, "org.onap.dblib.termination.timeout", 300000L);
+ // get properties for monitoring
+ monitorDbResponse = getBooleanFromProperties(properties, "org.onap.dblib.connection.monitor", false);
+ monitoringInterval = getLongFromProperties(properties, "org.onap.dblib.connection.monitor.interval", 1000L);
+ monitoringInitialDelay = getLongFromProperties(properties, "org.onap.dblib.connection.monitor.startdelay", 5000L);
+ expectedCompletionTime = getLongFromProperties(properties, "org.onap.dblib.connection.monitor.expectedcompletiontime", 5000L);
+ unprocessedFailoverThreshold = getLongFromProperties(properties, "org.onap.dblib.connection.monitor.unprocessedfailoverthreshold", 3L);
+
+ // initialize performance monitor
+ PollingWorker.createInistance(properties);
+
+ // initialize recovery thread
+ worker = new RecoveryMgr();
+ worker.setName("DBResourcemanagerWatchThread");
+ worker.setDaemon(true);
+ worker.start();
+
+ try {
+ this.config(properties);
+ } catch (final Exception e) {
+ // TODO: config throws <code>Exception</code> which is poor practice. Eliminate this in a separate patch.
+ LOGGER.error("Fatal Exception encountered while configuring DBResourceManager", e);
+ }
+ }
+
+ private void config(Properties configProps) throws Exception {
+ final ConcurrentLinkedQueue<CachedDataSource> semaphore = new ConcurrentLinkedQueue<>();
+ final DbConfigPool dbConfig = DBConfigFactory.createConfig(configProps);
+
+ long startTime = System.currentTimeMillis();
+
+ try {
+ JDBCConfiguration[] config = dbConfig.getJDBCbSourceArray();
+ CachedDataSource[] cachedDS = new CachedDataSource[config.length];
+ if (cachedDS == null || cachedDS.length == 0) {
+ LOGGER.error("Initialization of CachedDataSources failed. No instance was created.");
+ throw new Exception("Failed to initialize DB Library. No data source was created.");
+ }
+
+ for(int i = 0; i < config.length; i++) {
+ cachedDS[i] = CachedDataSourceFactory.createDataSource(config[i]);
+ if(cachedDS[i] == null)
+ continue;
+ semaphore.add(cachedDS[i]);
+ cachedDS[i].setInterval(monitoringInterval);
+ cachedDS[i].setInitialDelay(monitoringInitialDelay);
+ cachedDS[i].setExpectedCompletionTime(expectedCompletionTime);
+ cachedDS[i].setUnprocessedFailoverThreshold(unprocessedFailoverThreshold);
+ cachedDS[i].addObserver(DBResourceManager.this);
+ }
+
+// CachedDataSource[] cachedDS = factory.initDBResourceManager(dbConfig, DBResourceManager.this, semaphore);
+ DataSourceTester[] tester = new DataSourceTester[config.length];
+
+ for(int i=0; i<tester.length; i++){
+ tester[i] = new DataSourceTester(cachedDS[i], DBResourceManager.this, semaphore);
+ tester[i].start();
+ }
+
+ // the timeout param is set is seconds.
+ long timeout = ((dbConfig.getTimeout() <= 0) ? 60L : dbConfig.getTimeout());
+ LOGGER.debug("Timeout set to " +timeout+" seconds");
+ timeout *= 1000;
+
+
+ synchronized (semaphore) {
+ semaphore.wait(timeout);
+ }
+ } catch(Exception exc){
+ LOGGER.warn("DBResourceManager.initWorker", exc);
+ } finally {
+ startTime = System.currentTimeMillis() - startTime;
+ LOGGER.info("Completed wait with "+ dsQueue.size() + " active datasource(s) in " + startTime + " ms");
+ }
+ }
+
+
+ class DataSourceTester extends Thread {
+
+ private final CachedDataSource ds;
+ private final DBResourceManager manager;
+ private final ConcurrentLinkedQueue<CachedDataSource> semaphoreQ;
+
+ public DataSourceTester(CachedDataSource ds, DBResourceManager manager, ConcurrentLinkedQueue<CachedDataSource> semaphore) {
+ this.ds = ds;
+ this.manager = manager;
+ this.semaphoreQ = semaphore;
+ }
+
+ @Override
+ public void run() {
+ manager.setDataSource(ds);
+ boolean slave = true;
+ if(ds != null) {
+ try {
+ slave = ds.isSlave();
+ } catch (Exception exc) {
+ LOGGER.warn("", exc);
+ }
+ }
+ if(!slave) {
+ LOGGER.info(String.format("Adding MASTER (%s) to active queue", ds.getDbConnectionName()));
+ try {
+ synchronized (semaphoreQ) {
+ semaphoreQ.notifyAll();
+ }
+ } catch(Exception exc) {
+ LOGGER.warn("", exc);
+ }
+ }
+ try {
+ synchronized (semaphoreQ) {
+ semaphoreQ.remove(ds);
+ }
+ if(semaphoreQ.isEmpty()) {
+ synchronized (semaphoreQ) {
+ semaphoreQ.notifyAll();
+ }
+ }
+ } catch(Exception exc) {
+ LOGGER.warn("", exc);
+ }
+ LOGGER.info(String.format("Thread DataSourceTester terminated %s for %s", this.getName(), ds.getDbConnectionName()));
+ }
+
+ }
+
+
+ private long getLongFromProperties(Properties props, String property, long defaultValue)
+ {
+ String value = null;
+ long tmpLongValue = defaultValue;
+ try {
+ value = props.getProperty(property);
+ if(value != null)
+ tmpLongValue = Long.parseLong(value);
+
+ } catch(NumberFormatException exc) {
+ if(LOGGER.isWarnEnabled()){
+ LOGGER.warn("'"+property+"'=" + value+" is invalid. It should be a numeric value");
+ }
+ } catch(Exception exc) {
+ }
+ return tmpLongValue;
+
+ }
+
+ private boolean getBooleanFromProperties(Properties props, String property, boolean defaultValue)
+ {
+ boolean tmpValue = defaultValue;
+ String value = null;
+
+ try {
+ value = props.getProperty(property);
+ if(value != null)
+ tmpValue = Boolean.parseBoolean(value);
+
+ } catch(NumberFormatException exc) {
+ if(LOGGER.isWarnEnabled()){
+ LOGGER.warn("'"+property+"'=" + value+" is invalid. It should be a boolean value");
+ }
+ } catch(Exception exc) {
+ }
+ return tmpValue;
+
+ }
+
+
+ @Override
+ public void update(Observable observable, Object data) {
+ // if observable is active and there is a standby available, switch
+ if(observable instanceof SQLExecutionMonitor)
{
- ArrayList<Object> newList= new ArrayList<>();
- if(arguments != null && !arguments.isEmpty()) {
- newList.addAll(arguments);
- }
-
- return writeDataNoRecovery(statement, newList, preferredDS);
- }
-
- CachedDataSource findMaster() throws PoolExhaustedException {
- CachedDataSource master = null;
- CachedDataSource[] dss = this.dsQueue.toArray(new CachedDataSource[0]);
- for(int i=0; i<dss.length; i++) {
- if(!dss[i].isSlave()) {
- master = dss[i];
- if(i != 0) {
- dsQueue.remove(master);
- dsQueue.add(master);
- }
- return master;
- }
- }
- LOGGER.warn("MASTER not found.");
- return null;
- }
-
-
- private boolean writeDataNoRecovery(String statement, ArrayList<Object> arguments, String preferredDS) throws SQLException {
- if(dsQueue.isEmpty()){
- LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available.");
- throw new DBLibException("No active DB connection pools are available in RequestDataNoRecovery call.");
- }
-
- boolean initialRequest = true;
- boolean retryAllowed = true;
- CachedDataSource active = this.dsQueue.peek();
- long time = System.currentTimeMillis();
- while(initialRequest) {
- initialRequest = false;
- try {
- if(!active.isFabric()) {
- CachedDataSource master = findMaster();
- if(master != null) {
- active = master;
- }
- }
-
- return active.writeData(statement, arguments);
- } catch(Throwable exc){
- String message = exc.getMessage();
- if(message == null)
- message = exc.getClass().getName();
- LOGGER.error("Generated alarm: "+active.getDbConnectionName()+" - "+message);
- if(exc instanceof SQLException) {
- SQLException sqlExc = SQLException.class.cast(exc);
- // handle read-only exception
- if(sqlExc.getErrorCode() == 1290 && "HY000".equals(sqlExc.getSQLState())) {
- LOGGER.warn("retrying due to: " + sqlExc.getMessage());
- dsQueue.remove(active);
- dsQueue.add(active);
- if(retryAllowed){
- retryAllowed = false;
- initialRequest = true;
- continue;
- }
- }
- throw (SQLException)exc;
- } else {
- DBLibException excptn = new DBLibException(exc.getMessage());
- excptn.setStackTrace(exc.getStackTrace());
- throw excptn;
- }
- } finally {
- if(LOGGER.isDebugEnabled()){
- time = System.currentTimeMillis() - time;
- LOGGER.debug("writeData processing time : "+ active.getDbConnectionName()+" "+time+" miliseconds.");
- }
- }
- }
- return true;
- }
-
- public void setDataSource(CachedDataSource dataSource) {
- if(this.dsQueue.contains(dataSource))
- return;
- if(this.broken.contains(dataSource))
- return;
-
- if(dataSource.testConnection(true)){
- this.dsQueue.add(dataSource);
- } else {
- this.broken.add(dataSource);
- }
- }
-
- @Override
- public Connection getConnection() throws SQLException {
- Throwable lastException = null;
- CachedDataSource active = null;
-
- if(dsQueue.isEmpty()){
- throw new DBLibException("No active DB connection pools are available in GetConnection call.");
- }
-
- try {
- active = dsQueue.peek();
- CachedDataSource tmpActive = findMaster();
- if(tmpActive != null) {
- active = tmpActive;
- }
- return new DBLibConnection(active.getConnection(), active);
- } catch(javax.sql.rowset.spi.SyncFactoryException exc){
- LOGGER.debug("Free memory (bytes): " + Runtime.getRuntime().freeMemory());
- LOGGER.warn("CLASSPATH issue. Allowing retry", exc);
- lastException = exc;
- } catch(PoolExhaustedException exc) {
- throw new NoAvailableConnectionsException(exc);
- } catch(Exception exc){
- lastException = exc;
- if(recoveryMode){
- handleGetConnectionException(active, exc);
- } else {
- if(exc instanceof SQLException) {
- throw (SQLException)exc;
- } else {
- DBLibException excptn = new DBLibException(exc.getMessage());
- excptn.setStackTrace(exc.getStackTrace());
- throw excptn;
- }
- }
- } catch (Throwable trwb) {
- DBLibException excptn = new DBLibException(trwb.getMessage());
- excptn.setStackTrace(trwb.getStackTrace());
- throw excptn;
- } finally {
- if(LOGGER.isDebugEnabled()){
- displayState();
- }
- }
-
- if(lastException instanceof SQLException){
- throw (SQLException)lastException;
- }
- // repackage the exception
- if(lastException == null) {
- throw new DBLibException("The operation timed out while waiting to acquire a new connection." );
- } else {
- SQLException exception = new DBLibException(lastException.getMessage());
- exception.setStackTrace(lastException.getStackTrace());
- if(lastException.getCause() instanceof SQLException) {
-// exception.setNextException((SQLException)lastException.getCause());
- throw (SQLException)lastException.getCause();
- }
- throw exception;
- }
- }
-
- @Override
- public Connection getConnection(String username, String password)
- throws SQLException {
- CachedDataSource active = null;
-
- if(dsQueue.isEmpty()){
- throw new DBLibException("No active DB connection pools are available in GetConnection call.");
- }
-
-
- try {
- active = dsQueue.peek();
- CachedDataSource tmpActive = findMaster();
- if(tmpActive != null) {
- active = tmpActive;
- }
- return active.getConnection(username, password);
- } catch(Throwable exc){
- if(recoveryMode){
- handleGetConnectionException(active, exc);
- } else {
- if(exc instanceof SQLException)
- throw (SQLException)exc;
- else {
- DBLibException excptn = new DBLibException(exc.getMessage());
- excptn.setStackTrace(exc.getStackTrace());
- throw excptn;
- }
- }
-
- }
-
- throw new DBLibException("No connections available in DBResourceManager in GetConnection call.");
- }
-
- private void handleGetConnectionException(CachedDataSource source, Throwable exc) {
- try {
- if(!source.canTakeOffLine())
- {
- LOGGER.error("Could not switch due to blocking");
- return;
- }
-
- boolean removed = dsQueue.remove(source);
- if(!broken.contains(source))
- {
- if(broken.add(source))
- {
- LOGGER.warn("DB Recovery: DataSource <" + source.getDbConnectionName() + "> put in the recovery mode. Reason : " + exc.getMessage());
- } else {
- LOGGER.warn("Error putting DataSource <" +source.getDbConnectionName()+ "> in recovery mode.");
- }
- } else {
- LOGGER.info("DB Recovery: DataSource <" + source.getDbConnectionName() + "> already in recovery queue");
- }
- if(removed)
- {
- if(!dsQueue.isEmpty())
- {
- LOGGER.warn("DB DataSource <" + dsQueue.peek().getDbConnectionName() + "> became active");
- }
- }
- } catch (Exception e) {
- LOGGER.error("", e);
- }
- }
-
- public void cleanUp() {
- for(Iterator<CachedDataSource> it=dsQueue.iterator();it.hasNext();){
- CachedDataSource cds = it.next();
- it.remove();
- cds.cleanUp();
- }
-
- try {
- this.terminating = true;
- if(broken != null)
- {
- try {
- broken.add( new TerminatingCachedDataSource(null));
- } catch(Exception exc){
- LOGGER.error("Waiting for Worker to stop", exc);
- }
- }
- worker.join(terminationTimeOut);
- LOGGER.info("DBResourceManager.RecoveryMgr <"+worker.toString() +"> termination was successful: " + worker.getState());
- } catch(Exception exc){
- LOGGER.error("Waiting for Worker thread to terminate ", exc);
- }
- }
-
- @Override
- public PrintWriter getLogWriter() throws SQLException {
- return this.dsQueue.peek().getLogWriter();
- }
-
- @Override
- public int getLoginTimeout() throws SQLException {
- return this.dsQueue.peek().getLoginTimeout();
- }
-
- @Override
- public void setLogWriter(PrintWriter out) throws SQLException {
- this.dsQueue.peek().setLogWriter(out);
- }
-
- @Override
- public void setLoginTimeout(int seconds) throws SQLException {
- this.dsQueue.peek().setLoginTimeout(seconds);
- }
-
- public void displayState(){
- if(LOGGER.isDebugEnabled()){
- LOGGER.debug("POOLS : Active = "+dsQueue.size() + ";\t Broken = "+broken.size());
- CachedDataSource current = dsQueue.peek();
- if(current != null) {
- LOGGER.debug("POOL : Active name = \'"+current.getDbConnectionName()+ "\'");
- }
- }
- }
-
- /* (non-Javadoc)
- * @see org.onap.ccsdk.sli.resource.dblib.DbLibService#isActive()
- */
- @Override
- public boolean isActive() {
- return this.dsQueue.size()>0;
- }
-
- public String getActiveStatus(){
- return "Connected: " + dsQueue.size()+"\tIn-recovery: "+broken.size();
- }
-
- public String getDBStatus(boolean htmlFormat) {
- StringBuilder buffer = new StringBuilder();
-
- ArrayList<CachedDataSource> list = new ArrayList<>();
- list.addAll(dsQueue);
- list.addAll(broken);
- if (htmlFormat)
- {
- buffer.append("<tr class=\"headerRow\"><th id=\"header1\">")
- .append("Name:").append("</th>");
- for (int i = 0; i < list.size(); i++) {
- buffer.append("<th id=\"header").append(2 + i).append("\">");
- buffer.append(list.get(i).getDbConnectionName()).append("</th>");
- }
- buffer.append("</tr>");
-
- buffer.append("<tr><td>State:</td>");
- for (int i = 0; i < list.size(); i++) {
- if (broken.contains(list.get(i))) {
- buffer.append("<td>in recovery</td>");
- }
- if (dsQueue.contains(list.get(i))) {
- if (dsQueue.peek() == list.get(i))
- buffer.append("<td>active</td>");
- else
- buffer.append("<td>standby</td>");
- }
- }
- buffer.append("</tr>");
-
- } else {
- for (int i = 0; i < list.size(); i++) {
- buffer.append("Name: ").append(list.get(i).getDbConnectionName());
- buffer.append("\tState: ");
- if (broken.contains(list.get(i))) {
- buffer.append("in recovery");
- } else
- if (dsQueue.contains(list.get(i))) {
- if (dsQueue.peek() == list.get(i))
- buffer.append("active");
- else
- buffer.append("standby");
- }
-
- buffer.append("\n");
-
- }
- }
- return buffer.toString();
- }
-
- @Override
- public boolean isWrapperFor(Class<?> iface) throws SQLException {
- return false;
- }
-
- @Override
- public <T> T unwrap(Class<T> iface) throws SQLException {
- return null;
- }
-
- /**
- * @return the monitorDbResponse
- */
- @Override
- public final boolean isMonitorDbResponse() {
- return recoveryMode && monitorDbResponse;
- }
-
- public void test(){
- CachedDataSource obj = dsQueue.peek();
- Exception ption = new Exception();
- try {
- for(int i=0; i<5; i++)
- {
- handleGetConnectionException(obj, ption);
- }
- } catch(Throwable exc){
- LOGGER.warn("", exc);
- }
- }
-
- public String getPreferredDSName(){
- if(isActive()){
- return getPreferredDataSourceName(dsSelector);
- }
- return "";
- }
-
- public String getPreferredDataSourceName(AtomicBoolean flipper) {
-
- LinkedList<CachedDataSource> snapshot = new LinkedList<>(dsQueue);
- if(snapshot.size() > 1){
- CachedDataSource first = snapshot.getFirst();
- CachedDataSource last = snapshot.getLast();
-
- int delta = first.getMonitor().getProcessedConnectionsCount() - last.getMonitor().getProcessedConnectionsCount();
- if(delta < 0) {
- flipper.set(false);
- } else if(delta > 0) {
- flipper.set(true);
- } else {
- // check the last value and return !last
- flipper.getAndSet(!flipper.get());
- }
-
- if (flipper.get())
- Collections.reverse(snapshot);
- }
- return snapshot.peek().getDbConnectionName();
- }
-
- @Override
- public java.util.logging.Logger getParentLogger()
- throws SQLFeatureNotSupportedException {
- return null;
- }
-
- public String getMasterName() {
- if(isActive()){
- return getMasterDataSourceName(dsSelector);
- }
- return "";
- }
-
-
- private String getMasterDataSourceName(AtomicBoolean flipper) {
-
- LinkedList<CachedDataSource> snapshot = new LinkedList<>(dsQueue);
- if(snapshot.size() > 1){
- CachedDataSource first = snapshot.getFirst();
- CachedDataSource last = snapshot.getLast();
-
- int delta = first.getMonitor().getProcessedConnectionsCount() - last.getMonitor().getProcessedConnectionsCount();
- if(delta < 0) {
- flipper.set(false);
- } else if(delta > 0) {
- flipper.set(true);
- } else {
- // check the last value and return !last
- flipper.getAndSet(!flipper.get());
- }
-
- if (flipper.get())
- Collections.reverse(snapshot);
- }
- return snapshot.peek().getDbConnectionName();
- }
+ SQLExecutionMonitor monitor = (SQLExecutionMonitor)observable;
+ if(monitor.getParent() instanceof CachedDataSource)
+ {
+ CachedDataSource dataSource = (CachedDataSource)monitor.getParent();
+ if(dataSource == dsQueue.peek())
+ {
+ if(recoveryMode && dsQueue.size() > 1){
+ handleGetConnectionException(dataSource, new Exception(data.toString()));
+ }
+ }
+ }
+ }
+ }
+
+ public void testForceRecovery()
+ {
+ CachedDataSource active = this.dsQueue.peek();
+ handleGetConnectionException(active, new Exception("test"));
+ }
+
+ class RecoveryMgr extends Thread {
+
+ @Override
+ public void run() {
+ while(!terminating)
+ {
+ try {
+ Thread.sleep(retryInterval);
+ } catch (InterruptedException e1) { }
+ CachedDataSource brokenSource = null;
+ try {
+ if (!broken.isEmpty()) {
+ CachedDataSource[] sourceArray = broken.toArray(new CachedDataSource[0]);
+ for (int i = 0; i < sourceArray.length; i++)
+ {
+ brokenSource = sourceArray[i];
+ if (brokenSource instanceof TerminatingCachedDataSource)
+ break;
+ if (resetConnectionPool(brokenSource)) {
+ broken.remove(brokenSource);
+ brokenSource.blockImmediateOffLine();
+ dsQueue.add(brokenSource);
+ LOGGER.info("DataSource <"
+ + brokenSource.getDbConnectionName()
+ + "> recovered.");
+ }
+ brokenSource = null;
+ }
+ }
+ } catch (Exception exc) {
+ LOGGER.warn(exc.getMessage());
+ if(brokenSource != null){
+ try {
+ if(!broken.contains(brokenSource))
+ broken.add(brokenSource);
+ brokenSource = null;
+ } catch (Exception e1) { }
+ }
+ }
+ }
+ LOGGER.info("DBResourceManager.RecoveryMgr <"+this.toString() +"> terminated." );
+ }
+
+ private boolean resetConnectionPool(CachedDataSource dataSource){
+ try {
+ return dataSource.testConnection();
+ } catch (Exception exc) {
+ LOGGER.info("DataSource <" + dataSource.getDbConnectionName() + "> resetCache failed with error: "+ exc.getMessage());
+ return false;
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.onap.ccsdk.sli.resource.dblib.DbLibService#getData(java.lang.String, java.util.ArrayList, java.lang.String)
+ */
+ @Override
+ public CachedRowSet getData(String statement, ArrayList<String> arguments, String preferredDS) throws SQLException {
+ ArrayList<Object> newList= new ArrayList<>();
+ if(arguments != null && !arguments.isEmpty()) {
+ newList.addAll(arguments);
+ }
+ if(recoveryMode)
+ return requestDataWithRecovery(statement, newList, preferredDS);
+ else
+ return requestDataNoRecovery(statement, newList, preferredDS);
+ }
+
+ private CachedRowSet requestDataWithRecovery(String statement, ArrayList<Object> arguments, String preferredDS) throws SQLException {
+ Throwable lastException = null;
+ CachedDataSource active = null;
+
+ // test if there are any connection pools available
+ LinkedList<CachedDataSource> sources = new LinkedList<>(this.dsQueue);
+ if(sources.isEmpty()){
+ LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available.");
+ throw new DBLibException("No active DB connection pools are available in RequestDataWithRecovery call.");
+ }
+ if(preferredDS != null && !sources.peek().getDbConnectionName().equals(preferredDS)) {
+ Collections.reverse(sources);
+ }
+
+
+ // loop through available data sources to retrieve data.
+ while(!sources.isEmpty())
+ {
+ active = sources.peek();
+
+ long time = System.currentTimeMillis();
+ try {
+ if(!active.isFabric()) {
+ CachedDataSource master = findMaster();
+ if(master != null) {
+ active = master;
+ master = null;
+ }
+ }
+ sources.remove(active);
+ return active.getData(statement, arguments);
+ } catch(SQLDataException | SQLSyntaxErrorException | SQLIntegrityConstraintViolationException exc){
+ throw exc;
+ } catch(Throwable exc){
+ lastException = exc;
+ String message = exc.getMessage();
+ if(message == null) {
+ if(exc.getCause() != null) {
+ message = exc.getCause().getMessage();
+ }
+ if(message == null)
+ message = exc.getClass().getName();
+ }
+ LOGGER.error("Generated alarm: "+active.getDbConnectionName()+" - "+message);
+ handleGetConnectionException(active, exc);
+ if(sources.contains(active))
+ sources.remove(active);
+ } finally {
+ if(LOGGER.isDebugEnabled()){
+ time = System.currentTimeMillis() - time;
+ LOGGER.debug("getData processing time : "+ active.getDbConnectionName()+" "+time+" miliseconds.");
+ }
+ }
+ }
+ if(lastException instanceof SQLException){
+ throw (SQLException)lastException;
+ }
+ // repackage the exception
+ // you are here because either you run out of available data sources
+ // or the last exception was not of SQLException type.
+ // repackage the exception
+ if(lastException == null) {
+ throw new DBLibException("The operation timed out while waiting to acquire a new connection." );
+ } else {
+ SQLException exception = new DBLibException(lastException.getMessage());
+ exception.setStackTrace(lastException.getStackTrace());
+ if(lastException.getCause() instanceof SQLException) {
+ throw (SQLException)lastException.getCause();
+ }
+ throw exception;
+ }
+ }
+
+ private CachedRowSet requestDataNoRecovery(String statement, ArrayList<Object> arguments, String preferredDS) throws SQLException {
+ if(dsQueue.isEmpty()){
+ LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available.");
+ throw new DBLibException("No active DB connection pools are available in RequestDataNoRecovery call.");
+ }
+ CachedDataSource active = this.dsQueue.peek();
+ long time = System.currentTimeMillis();
+ try {
+ if(!active.isFabric()) {
+ CachedDataSource master = findMaster();
+ if(master != null)
+ active = master;
+ }
+ return active.getData(statement, arguments);
+// } catch(SQLDataException exc){
+// throw exc;
+ } catch(Throwable exc){
+ String message = exc.getMessage();
+ if(message == null)
+ message = exc.getClass().getName();
+ LOGGER.error("Generated alarm: "+active.getDbConnectionName()+" - "+message);
+ if(exc instanceof SQLException)
+ throw (SQLException)exc;
+ else {
+ DBLibException excptn = new DBLibException(exc.getMessage());
+ excptn.setStackTrace(exc.getStackTrace());
+ throw excptn;
+ }
+ } finally {
+ if(LOGGER.isDebugEnabled()){
+ time = System.currentTimeMillis() - time;
+ LOGGER.debug(">> getData : "+ active.getDbConnectionName()+" "+time+" miliseconds.");
+ }
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.onap.ccsdk.sli.resource.dblib.DbLibService#writeData(java.lang.String, java.util.ArrayList, java.lang.String)
+ */
+ @Override
+ public boolean writeData(String statement, ArrayList<String> arguments, String preferredDS) throws SQLException
+ {
+ ArrayList<Object> newList= new ArrayList<>();
+ if(arguments != null && !arguments.isEmpty()) {
+ newList.addAll(arguments);
+ }
+
+ return writeDataNoRecovery(statement, newList, preferredDS);
+ }
+
+ CachedDataSource findMaster() throws PoolExhaustedException {
+ CachedDataSource master = null;
+ CachedDataSource[] dss = this.dsQueue.toArray(new CachedDataSource[0]);
+ for(int i=0; i<dss.length; i++) {
+ if(!dss[i].isSlave()) {
+ master = dss[i];
+ if(i != 0) {
+ dsQueue.remove(master);
+ dsQueue.add(master);
+ }
+ return master;
+ }
+ }
+ LOGGER.warn("MASTER not found.");
+ return null;
+ }
+
+
+ private boolean writeDataNoRecovery(String statement, ArrayList<Object> arguments, String preferredDS) throws SQLException {
+ if(dsQueue.isEmpty()){
+ LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available.");
+ throw new DBLibException("No active DB connection pools are available in RequestDataNoRecovery call.");
+ }
+
+ boolean initialRequest = true;
+ boolean retryAllowed = true;
+ CachedDataSource active = this.dsQueue.peek();
+ long time = System.currentTimeMillis();
+ while(initialRequest) {
+ initialRequest = false;
+ try {
+ if(!active.isFabric()) {
+ CachedDataSource master = findMaster();
+ if(master != null) {
+ active = master;
+ }
+ }
+
+ return active.writeData(statement, arguments);
+ } catch(Throwable exc){
+ String message = exc.getMessage();
+ if(message == null)
+ message = exc.getClass().getName();
+ LOGGER.error("Generated alarm: "+active.getDbConnectionName()+" - "+message);
+ if(exc instanceof SQLException) {
+ SQLException sqlExc = SQLException.class.cast(exc);
+ // handle read-only exception
+ if(sqlExc.getErrorCode() == 1290 && "HY000".equals(sqlExc.getSQLState())) {
+ LOGGER.warn("retrying due to: " + sqlExc.getMessage());
+ dsQueue.remove(active);
+ dsQueue.add(active);
+ if(retryAllowed){
+ retryAllowed = false;
+ initialRequest = true;
+ continue;
+ }
+ }
+ throw (SQLException)exc;
+ } else {
+ DBLibException excptn = new DBLibException(exc.getMessage());
+ excptn.setStackTrace(exc.getStackTrace());
+ throw excptn;
+ }
+ } finally {
+ if(LOGGER.isDebugEnabled()){
+ time = System.currentTimeMillis() - time;
+ LOGGER.debug("writeData processing time : "+ active.getDbConnectionName()+" "+time+" miliseconds.");
+ }
+ }
+ }
+ return true;
+ }
+
+ public void setDataSource(CachedDataSource dataSource) {
+ if(this.dsQueue.contains(dataSource))
+ return;
+ if(this.broken.contains(dataSource))
+ return;
+
+ if(dataSource.testConnection(true)){
+ this.dsQueue.add(dataSource);
+ } else {
+ this.broken.add(dataSource);
+ }
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ Throwable lastException = null;
+ CachedDataSource active = null;
+
+ if(dsQueue.isEmpty()){
+ throw new DBLibException("No active DB connection pools are available in GetConnection call.");
+ }
+
+ try {
+ active = dsQueue.peek();
+ CachedDataSource tmpActive = findMaster();
+ if(tmpActive != null) {
+ active = tmpActive;
+ }
+ return new DBLibConnection(active.getConnection(), active);
+ } catch(javax.sql.rowset.spi.SyncFactoryException exc){
+ LOGGER.debug("Free memory (bytes): " + Runtime.getRuntime().freeMemory());
+ LOGGER.warn("CLASSPATH issue. Allowing retry", exc);
+ lastException = exc;
+ } catch(PoolExhaustedException exc) {
+ throw new NoAvailableConnectionsException(exc);
+ } catch(Exception exc){
+ lastException = exc;
+ if(recoveryMode){
+ handleGetConnectionException(active, exc);
+ } else {
+ if(exc instanceof SQLException) {
+ throw (SQLException)exc;
+ } else {
+ DBLibException excptn = new DBLibException(exc.getMessage());
+ excptn.setStackTrace(exc.getStackTrace());
+ throw excptn;
+ }
+ }
+ } catch (Throwable trwb) {
+ DBLibException excptn = new DBLibException(trwb.getMessage());
+ excptn.setStackTrace(trwb.getStackTrace());
+ throw excptn;
+ } finally {
+ if(LOGGER.isDebugEnabled()){
+ displayState();
+ }
+ }
+
+ if(lastException instanceof SQLException){
+ throw (SQLException)lastException;
+ }
+ // repackage the exception
+ if(lastException == null) {
+ throw new DBLibException("The operation timed out while waiting to acquire a new connection." );
+ } else {
+ SQLException exception = new DBLibException(lastException.getMessage());
+ exception.setStackTrace(lastException.getStackTrace());
+ if(lastException.getCause() instanceof SQLException) {
+// exception.setNextException((SQLException)lastException.getCause());
+ throw (SQLException)lastException.getCause();
+ }
+ throw exception;
+ }
+ }
+
+ @Override
+ public Connection getConnection(String username, String password)
+ throws SQLException {
+ CachedDataSource active = null;
+
+ if(dsQueue.isEmpty()){
+ throw new DBLibException("No active DB connection pools are available in GetConnection call.");
+ }
+
+
+ try {
+ active = dsQueue.peek();
+ CachedDataSource tmpActive = findMaster();
+ if(tmpActive != null) {
+ active = tmpActive;
+ }
+ return active.getConnection(username, password);
+ } catch(Throwable exc){
+ if(recoveryMode){
+ handleGetConnectionException(active, exc);
+ } else {
+ if(exc instanceof SQLException)
+ throw (SQLException)exc;
+ else {
+ DBLibException excptn = new DBLibException(exc.getMessage());
+ excptn.setStackTrace(exc.getStackTrace());
+ throw excptn;
+ }
+ }
+
+ }
+
+ throw new DBLibException("No connections available in DBResourceManager in GetConnection call.");
+ }
+
+ private void handleGetConnectionException(CachedDataSource source, Throwable exc) {
+ try {
+ if(!source.canTakeOffLine())
+ {
+ LOGGER.error("Could not switch due to blocking");
+ return;
+ }
+
+ boolean removed = dsQueue.remove(source);
+ if(!broken.contains(source))
+ {
+ if(broken.add(source))
+ {
+ LOGGER.warn("DB Recovery: DataSource <" + source.getDbConnectionName() + "> put in the recovery mode. Reason : " + exc.getMessage());
+ } else {
+ LOGGER.warn("Error putting DataSource <" +source.getDbConnectionName()+ "> in recovery mode.");
+ }
+ } else {
+ LOGGER.info("DB Recovery: DataSource <" + source.getDbConnectionName() + "> already in recovery queue");
+ }
+ if(removed)
+ {
+ if(!dsQueue.isEmpty())
+ {
+ LOGGER.warn("DB DataSource <" + dsQueue.peek().getDbConnectionName() + "> became active");
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("", e);
+ }
+ }
+
+ public void cleanUp() {
+ for(Iterator<CachedDataSource> it=dsQueue.iterator();it.hasNext();){
+ CachedDataSource cds = it.next();
+ it.remove();
+ cds.cleanUp();
+ }
+
+ try {
+ this.terminating = true;
+ if(broken != null)
+ {
+ try {
+ broken.add( new TerminatingCachedDataSource(null));
+ } catch(Exception exc){
+ LOGGER.error("Waiting for Worker to stop", exc);
+ }
+ }
+ worker.join(terminationTimeOut);
+ LOGGER.info("DBResourceManager.RecoveryMgr <"+worker.toString() +"> termination was successful: " + worker.getState());
+ } catch(Exception exc){
+ LOGGER.error("Waiting for Worker thread to terminate ", exc);
+ }
+ }
+
+ @Override
+ public PrintWriter getLogWriter() throws SQLException {
+ return this.dsQueue.peek().getLogWriter();
+ }
+
+ @Override
+ public int getLoginTimeout() throws SQLException {
+ return this.dsQueue.peek().getLoginTimeout();
+ }
+
+ @Override
+ public void setLogWriter(PrintWriter out) throws SQLException {
+ this.dsQueue.peek().setLogWriter(out);
+ }
+
+ @Override
+ public void setLoginTimeout(int seconds) throws SQLException {
+ this.dsQueue.peek().setLoginTimeout(seconds);
+ }
+
+ public void displayState(){
+ if(LOGGER.isDebugEnabled()){
+ LOGGER.debug("POOLS : Active = "+dsQueue.size() + ";\t Broken = "+broken.size());
+ CachedDataSource current = dsQueue.peek();
+ if(current != null) {
+ LOGGER.debug("POOL : Active name = \'"+current.getDbConnectionName()+ "\'");
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.onap.ccsdk.sli.resource.dblib.DbLibService#isActive()
+ */
+ @Override
+ public boolean isActive() {
+ return this.dsQueue.size()>0;
+ }
+
+ public String getActiveStatus(){
+ return "Connected: " + dsQueue.size()+"\tIn-recovery: "+broken.size();
+ }
+
+ public String getDBStatus(boolean htmlFormat) {
+ StringBuilder buffer = new StringBuilder();
+
+ ArrayList<CachedDataSource> list = new ArrayList<>();
+ list.addAll(dsQueue);
+ list.addAll(broken);
+ if (htmlFormat)
+ {
+ buffer.append("<tr class=\"headerRow\"><th id=\"header1\">")
+ .append("Name:").append("</th>");
+ for (int i = 0; i < list.size(); i++) {
+ buffer.append("<th id=\"header").append(2 + i).append("\">");
+ buffer.append(list.get(i).getDbConnectionName()).append("</th>");
+ }
+ buffer.append("</tr>");
+
+ buffer.append("<tr><td>State:</td>");
+ for (int i = 0; i < list.size(); i++) {
+ if (broken.contains(list.get(i))) {
+ buffer.append("<td>in recovery</td>");
+ }
+ if (dsQueue.contains(list.get(i))) {
+ if (dsQueue.peek() == list.get(i))
+ buffer.append("<td>active</td>");
+ else
+ buffer.append("<td>standby</td>");
+ }
+ }
+ buffer.append("</tr>");
+
+ } else {
+ for (int i = 0; i < list.size(); i++) {
+ buffer.append("Name: ").append(list.get(i).getDbConnectionName());
+ buffer.append("\tState: ");
+ if (broken.contains(list.get(i))) {
+ buffer.append("in recovery");
+ } else
+ if (dsQueue.contains(list.get(i))) {
+ if (dsQueue.peek() == list.get(i))
+ buffer.append("active");
+ else
+ buffer.append("standby");
+ }
+
+ buffer.append("\n");
+
+ }
+ }
+ return buffer.toString();
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ return null;
+ }
+
+ /**
+ * @return the monitorDbResponse
+ */
+ @Override
+ public final boolean isMonitorDbResponse() {
+ return recoveryMode && monitorDbResponse;
+ }
+
+ public void test(){
+ CachedDataSource obj = dsQueue.peek();
+ Exception ption = new Exception();
+ try {
+ for(int i=0; i<5; i++)
+ {
+ handleGetConnectionException(obj, ption);
+ }
+ } catch(Throwable exc){
+ LOGGER.warn("", exc);
+ }
+ }
+
+ public String getPreferredDSName(){
+ if(isActive()){
+ return getPreferredDataSourceName(dsSelector);
+ }
+ return "";
+ }
+
+ public String getPreferredDataSourceName(AtomicBoolean flipper) {
+
+ LinkedList<CachedDataSource> snapshot = new LinkedList<>(dsQueue);
+ if(snapshot.size() > 1){
+ CachedDataSource first = snapshot.getFirst();
+ CachedDataSource last = snapshot.getLast();
+
+ int delta = first.getMonitor().getProcessedConnectionsCount() - last.getMonitor().getProcessedConnectionsCount();
+ if(delta < 0) {
+ flipper.set(false);
+ } else if(delta > 0) {
+ flipper.set(true);
+ } else {
+ // check the last value and return !last
+ flipper.getAndSet(!flipper.get());
+ }
+
+ if (flipper.get())
+ Collections.reverse(snapshot);
+ }
+ return snapshot.peek().getDbConnectionName();
+ }
+
+ @Override
+ public java.util.logging.Logger getParentLogger()
+ throws SQLFeatureNotSupportedException {
+ return null;
+ }
+
+ public String getMasterName() {
+ if(isActive()){
+ return getMasterDataSourceName(dsSelector);
+ }
+ return "";
+ }
+
+
+ private String getMasterDataSourceName(AtomicBoolean flipper) {
+
+ LinkedList<CachedDataSource> snapshot = new LinkedList<>(dsQueue);
+ if(snapshot.size() > 1){
+ CachedDataSource first = snapshot.getFirst();
+ CachedDataSource last = snapshot.getLast();
+
+ int delta = first.getMonitor().getProcessedConnectionsCount() - last.getMonitor().getProcessedConnectionsCount();
+ if(delta < 0) {
+ flipper.set(false);
+ } else if(delta > 0) {
+ flipper.set(true);
+ } else {
+ // check the last value and return !last
+ flipper.getAndSet(!flipper.get());
+ }
+
+ if (flipper.get())
+ Collections.reverse(snapshot);
+ }
+ return snapshot.peek().getDbConnectionName();
+ }
class RemindTask extends TimerTask {
@Override
- public void run() {
- CachedDataSource ds = dsQueue.peek();
- if(ds != null)
- ds.getPoolInfo(false);
+ public void run() {
+ CachedDataSource ds = dsQueue.peek();
+ if(ds != null)
+ ds.getPoolInfo(false);
}
}
- public int poolSize() {
- return dsQueue.size();
- }
+ public int poolSize() {
+ return dsQueue.size();
+ }
}