From da0b06cd2b7363faf28e9647ee043ec78082f142 Mon Sep 17 00:00:00 2001 From: Jim Hahn Date: Mon, 25 Jun 2018 11:29:41 -0400 Subject: Use connection pooling for locking Modified distributed locking to use connection pooling. Add comment for new dataSource parameter. Change-Id: I5dc33605797f95072af9b6911a468457f6fd9f3d Issue-ID: POLICY-910 Signed-off-by: Jim Hahn --- .../locking/DistributedLockingFeature.java | 54 +++++++++++++++++----- .../onap/policy/distributed/locking/Heartbeat.java | 15 ++++-- .../policy/distributed/locking/TargetLock.java | 27 +++++------ 3 files changed, 66 insertions(+), 30 deletions(-) (limited to 'feature-distributed-locking/src/main/java/org/onap') diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java index 019452a0..b30fca76 100644 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java @@ -20,15 +20,16 @@ package org.onap.policy.distributed.locking; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.Properties; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.commons.dbcp2.BasicDataSourceFactory; import org.onap.policy.common.utils.properties.exception.PropertyException; import org.onap.policy.drools.core.lock.LockRequestFuture; import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI; @@ -60,6 +61,11 @@ public class DistributedLockingFeature implements PolicyEngineFeatureAPI, Policy */ private ScheduledExecutorService scheduledExecutorService; + /** + * Data source used to connect to the DB containing locks. + */ + private BasicDataSource dataSource; + /** * UUID */ @@ -79,7 +85,7 @@ public class DistributedLockingFeature implements PolicyEngineFeatureAPI, Policy @Override public Future beforeLock(String resourceId, String owner, Callback callback) { - TargetLock tLock = new TargetLock(resourceId, uuid, owner, lockProps); + TargetLock tLock = new TargetLock(resourceId, uuid, owner, lockProps, dataSource); return new LockRequestFuture(resourceId, owner, tLock.lock()); @@ -87,21 +93,21 @@ public class DistributedLockingFeature implements PolicyEngineFeatureAPI, Policy @Override public OperResult beforeUnlock(String resourceId, String owner) { - TargetLock tLock = new TargetLock(resourceId, uuid, owner, lockProps); + TargetLock tLock = new TargetLock(resourceId, uuid, owner, lockProps, dataSource); return(tLock.unlock() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); } @Override public OperResult beforeIsLockedBy(String resourceId, String owner) { - TargetLock tLock = new TargetLock(resourceId, uuid, owner, lockProps); + TargetLock tLock = new TargetLock(resourceId, uuid, owner, lockProps, dataSource); return(tLock.isActive() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); } @Override public OperResult beforeIsLocked(String resourceId) { - TargetLock tLock = new TargetLock(resourceId, uuid, "dummyOwner", lockProps); + TargetLock tLock = new TargetLock(resourceId, uuid, "dummyOwner", lockProps, dataSource); return(tLock.isLocked() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); } @@ -111,15 +117,23 @@ public class DistributedLockingFeature implements PolicyEngineFeatureAPI, Policy try { this.lockProps = new DistributedLockingProperties(SystemPersistence.manager.getProperties(DistributedLockingFeature.CONFIGURATION_PROPERTIES_NAME)); + this.dataSource = makeDataSource(); } catch (PropertyException e) { logger.error("DistributedLockingFeature feature properies have not been loaded", e); throw new DistributedLockingFeatureException(e); + } catch(InterruptedException e) { + logger.error("DistributedLockingFeature failed to create data source", e); + Thread.currentThread().interrupt(); + throw new DistributedLockingFeatureException(e); + } catch(Exception e) { + logger.error("DistributedLockingFeature failed to create data source", e); + throw new DistributedLockingFeatureException(e); } long heartbeatInterval = this.lockProps.getHeartBeatIntervalProperty(); cleanLockTable(); - initHeartbeat(lockProps); + initHeartbeat(); this.scheduledExecutorService = Executors.newScheduledThreadPool(1); this.scheduledExecutorService.scheduleAtFixedRate(heartbeat, heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS); @@ -127,6 +141,24 @@ public class DistributedLockingFeature implements PolicyEngineFeatureAPI, Policy } /** + * @return a new, pooled data source + * @throws Exception + */ + private BasicDataSource makeDataSource() throws Exception { + Properties props = new Properties(); + props.put("driverClassName", lockProps.getDbDriver()); + props.put("url", lockProps.getDbUrl()); + props.put("username", lockProps.getDbUser()); + props.put("password", lockProps.getDbPwd()); + props.put("testOnBorrow", "true"); + props.put("poolPreparedStatements", "true"); + + // additional properties are listed in the GenericObjectPool API + + return BasicDataSourceFactory.createDataSource(props); + } + + /** * This method kills the heartbeat thread and calls refreshLockTable which removes * any records from the db where the current host is the owner. */ @@ -142,9 +174,7 @@ public class DistributedLockingFeature implements PolicyEngineFeatureAPI, Policy */ private void cleanLockTable() { - try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), - lockProps.getDbUser(), - lockProps.getDbPwd()); + try (Connection conn = dataSource.getConnection(); PreparedStatement statement = conn.prepareStatement("DELETE FROM pooling.locks WHERE host = ? OR expirationTime < ?"); ){ @@ -161,8 +191,8 @@ public class DistributedLockingFeature implements PolicyEngineFeatureAPI, Policy /** * Initialize the static heartbeat object */ - private static void initHeartbeat(DistributedLockingProperties lockProps) { - heartbeat = new Heartbeat(uuid, lockProps); + private void initHeartbeat() { + heartbeat = new Heartbeat(uuid, lockProps, dataSource); } diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java index edd0782e..ccfb4c7d 100644 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java @@ -21,12 +21,11 @@ package org.onap.policy.distributed.locking; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.UUID; import java.util.concurrent.CountDownLatch; - +import org.apache.commons.dbcp2.BasicDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +44,11 @@ public class Heartbeat implements Runnable{ * Properties object containing properties needed by class */ private DistributedLockingProperties lockProps; + + /** + * Data source used to connect to the DB containing locks. + */ + private BasicDataSource dataSource; /** * UUID @@ -60,9 +64,11 @@ public class Heartbeat implements Runnable{ * * @param uuid * @param lockProps + * @param dataSource */ - public Heartbeat(UUID uuid, DistributedLockingProperties lockProps) { + public Heartbeat(UUID uuid, DistributedLockingProperties lockProps, BasicDataSource dataSource) { this.lockProps = lockProps; + this.dataSource = dataSource; this.uuid = uuid; this.latch = new CountDownLatch(1); } @@ -81,8 +87,7 @@ public class Heartbeat implements Runnable{ long expirationAge = lockProps.getAgingProperty(); - try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(), - lockProps.getDbPwd()); + try (Connection conn = dataSource.getConnection(); PreparedStatement statement = conn .prepareStatement("UPDATE pooling.locks SET expirationTime = ? WHERE host = ?");) { diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java index 4f09dc2a..d57de1f7 100644 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java @@ -20,12 +20,11 @@ package org.onap.policy.distributed.locking; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.UUID; - +import org.apache.commons.dbcp2.BasicDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +41,11 @@ public class TargetLock { * Properties object containing properties needed by class */ private DistributedLockingProperties lockProps; + + /** + * Data source used to connect to the DB containing locks. + */ + private BasicDataSource dataSource; /** * UUID @@ -58,12 +62,14 @@ public class TargetLock { * * @param resourceId ID of the entity we want to lock * @param lockProps Properties object containing properties needed by class + * @param dataSource used to connect to the DB containing locks */ - public TargetLock (String resourceId, UUID uuid, String owner, DistributedLockingProperties lockProps) { + public TargetLock (String resourceId, UUID uuid, String owner, DistributedLockingProperties lockProps, BasicDataSource dataSource) { this.resourceId = resourceId; this.uuid = uuid; this.owner = owner; this.lockProps = lockProps; + this.dataSource = dataSource; } /** @@ -89,8 +95,7 @@ public class TargetLock { private boolean grabLock() { // try to insert a record into the table(thereby grabbing the lock) - try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(), - lockProps.getDbPwd()); + try (Connection conn = dataSource.getConnection(); PreparedStatement statement = conn.prepareStatement( "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) values (?, ?, ?, ?)")) { @@ -116,8 +121,7 @@ public class TargetLock { */ private boolean secondGrab() { - try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(), - lockProps.getDbPwd()); + try (Connection conn = dataSource.getConnection(); PreparedStatement updateStatement = conn.prepareStatement( "UPDATE pooling.locks SET host = ?, owner = ?, expirationTime = ? WHERE expirationTime <= ? AND resourceId = ?"); @@ -161,8 +165,7 @@ public class TargetLock { */ private boolean deleteLock() { - try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(), - lockProps.getDbPwd()); + try (Connection conn = dataSource.getConnection(); PreparedStatement deleteStatement = conn.prepareStatement( "DELETE FROM pooling.locks WHERE resourceId = ? AND owner = ? AND host = ?")) { @@ -184,8 +187,7 @@ public class TargetLock { * Is the lock active */ public boolean isActive() { - try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(), - lockProps.getDbPwd()); + try (Connection conn = dataSource.getConnection(); PreparedStatement selectStatement = conn.prepareStatement( "SELECT * FROM pooling.locks WHERE resourceId = ? AND host = ? AND owner= ? AND expirationTime >= ?")) { @@ -215,8 +217,7 @@ public class TargetLock { */ public boolean isLocked() { - try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(), - lockProps.getDbPwd()); + try (Connection conn = dataSource.getConnection(); PreparedStatement selectStatement = conn .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND expirationTime >= ?")) { -- cgit 1.2.3-korg