diff options
Diffstat (limited to 'feature-distributed-locking/src/main')
5 files changed, 36 insertions, 221 deletions
diff --git a/feature-distributed-locking/src/main/feature/config/feature-distributed-locking.properties b/feature-distributed-locking/src/main/feature/config/feature-distributed-locking.properties index 665f8227..33e2d789 100644 --- a/feature-distributed-locking/src/main/feature/config/feature-distributed-locking.properties +++ b/feature-distributed-locking/src/main/feature/config/feature-distributed-locking.properties @@ -23,12 +23,3 @@ javax.persistence.jdbc.driver= org.mariadb.jdbc.Driver javax.persistence.jdbc.url=jdbc:mariadb://${{SQL_HOST}}:3306/pooling javax.persistence.jdbc.user=${{SQL_USER}} javax.persistence.jdbc.password=${{SQL_PASSWORD}} - -#This value is added to System.currentTimeMs to -#set expirationTime when a lock is obtained. -#distributed.locking.lock.aging=1000 - -#The frequency (in milliseconds) that the heartbeat -#thread refreshes locks owned by the current host -#distributed.locking.heartbeat.interval=5000 - diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java index b30fca76..03872e1e 100644 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java @@ -24,14 +24,9 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Properties; import java.util.UUID; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.dbcp2.BasicDataSourceFactory; import org.onap.policy.common.utils.properties.exception.PropertyException; -import org.onap.policy.drools.core.lock.LockRequestFuture; import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI; import org.onap.policy.drools.features.PolicyEngineFeatureAPI; import org.onap.policy.drools.persistence.SystemPersistence; @@ -57,11 +52,6 @@ public class DistributedLockingFeature implements PolicyEngineFeatureAPI, Policy private DistributedLockingProperties lockProps; /** - *ScheduledExecutorService for LockHeartbeat - */ - private ScheduledExecutorService scheduledExecutorService; - - /** * Data source used to connect to the DB containing locks. */ private BasicDataSource dataSource; @@ -71,43 +61,36 @@ public class DistributedLockingFeature implements PolicyEngineFeatureAPI, Policy */ private static final UUID uuid = UUID.randomUUID(); - - /** - * Reference to Heartbeat - */ - private static Heartbeat heartbeat = null; - @Override public int getSequenceNumber() { return 1000; } @Override - public Future<Boolean> beforeLock(String resourceId, String owner, Callback callback) { - - TargetLock tLock = new TargetLock(resourceId, uuid, owner, lockProps, dataSource); + public OperResult beforeLock(String resourceId, String owner, int holdSec) { - return new LockRequestFuture(resourceId, owner, tLock.lock()); - + TargetLock tLock = new TargetLock(resourceId, uuid, owner, dataSource); + + return(tLock.lock(holdSec) ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); } @Override public OperResult beforeUnlock(String resourceId, String owner) { - TargetLock tLock = new TargetLock(resourceId, uuid, owner, lockProps, dataSource); + TargetLock tLock = new TargetLock(resourceId, uuid, owner, dataSource); return(tLock.unlock() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); } @Override public OperResult beforeIsLockedBy(String resourceId, String owner) { - TargetLock tLock = new TargetLock(resourceId, uuid, owner, lockProps, dataSource); + TargetLock tLock = new TargetLock(resourceId, uuid, owner, dataSource); return(tLock.isActive() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); } @Override public OperResult beforeIsLocked(String resourceId) { - TargetLock tLock = new TargetLock(resourceId, uuid, "dummyOwner", lockProps, dataSource); + TargetLock tLock = new TargetLock(resourceId, uuid, "dummyOwner", dataSource); return(tLock.isLocked() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); } @@ -130,13 +113,8 @@ public class DistributedLockingFeature implements PolicyEngineFeatureAPI, Policy throw new DistributedLockingFeatureException(e); } - long heartbeatInterval = this.lockProps.getHeartBeatIntervalProperty(); - cleanLockTable(); - initHeartbeat(); - this.scheduledExecutorService = Executors.newScheduledThreadPool(1); - this.scheduledExecutorService.scheduleAtFixedRate(heartbeat, heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS); return false; } @@ -164,7 +142,6 @@ public class DistributedLockingFeature implements PolicyEngineFeatureAPI, Policy */ @Override public boolean beforeShutdown(PolicyEngine engine) { - scheduledExecutorService.shutdown(); cleanLockTable(); return false; } @@ -186,18 +163,5 @@ public class DistributedLockingFeature implements PolicyEngineFeatureAPI, Policy logger.error("error in refreshLockTable()", e); } - } - - /** - * Initialize the static heartbeat object - */ - private void initHeartbeat() { - heartbeat = new Heartbeat(uuid, lockProps, dataSource); - - } - - public static Heartbeat getHeartbeat() { - return heartbeat; - } - + } } diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java index b82f4b00..1f55a4cb 100644 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java @@ -35,8 +35,6 @@ public class DistributedLockingProperties extends PropertyConfiguration { public static final String DB_URL = "javax.persistence.jdbc.url"; public static final String DB_USER = "javax.persistence.jdbc.user"; public static final String DB_PWD = "javax.persistence.jdbc.password"; - public static final String AGING_PROPERTY = PREFIX + "lock.aging"; - public static final String HEARTBEAT_INTERVAL_PROPERTY = PREFIX + "heartbeat.interval"; /** * Properties from which this was constructed. @@ -67,18 +65,6 @@ public class DistributedLockingProperties extends PropertyConfiguration { @Property(name = DB_PWD) private String dbPwd; - /** - * Used to set expiration time for lock. - */ - @Property(name = AGING_PROPERTY, defaultValue = "300000") - private long agingProperty; - - /** - * Indicates intervals at which we refresh locks. - */ - @Property(name = HEARTBEAT_INTERVAL_PROPERTY, defaultValue = "60000") - private long heartBeatIntervalProperty; - public DistributedLockingProperties(Properties props) throws PropertyException { super(props); source = props; @@ -110,16 +96,6 @@ public class DistributedLockingProperties extends PropertyConfiguration { } - public long getAgingProperty() { - return agingProperty; - } - - - public long getHeartBeatIntervalProperty() { - return heartBeatIntervalProperty; - } - - public void setDbDriver(String dbDriver) { this.dbDriver = dbDriver; } @@ -139,14 +115,4 @@ public class DistributedLockingProperties extends PropertyConfiguration { this.dbPwd = dbPwd; } - - public void setAgingProperty(long agingProperty) { - this.agingProperty = agingProperty; - } - - - public void setHeartBeatIntervalProperty(long heartBeatIntervalProperty) { - this.heartBeatIntervalProperty = heartBeatIntervalProperty; - } - } diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java deleted file mode 100644 index ccfb4c7d..00000000 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * feature-distributed-locking - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.distributed.locking; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import org.apache.commons.dbcp2.BasicDataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - * This runnable class scans the locks table for all locks owned by this host. - * It refreshes the expiration time of each lock using the locking.distributed.aging - * property - * - */ -public class Heartbeat implements Runnable{ - - private static final Logger logger = LoggerFactory.getLogger(Heartbeat.class); - - /** - * Properties object containing properties needed by class - */ - private DistributedLockingProperties lockProps; - - /** - * Data source used to connect to the DB containing locks. - */ - private BasicDataSource dataSource; - - /** - * UUID - */ - private UUID uuid; - - /** - * Countdown latch for testing - */ - private volatile CountDownLatch latch; - - /** - * - * @param uuid - * @param lockProps - * @param dataSource - */ - public Heartbeat(UUID uuid, DistributedLockingProperties lockProps, BasicDataSource dataSource) { - this.lockProps = lockProps; - this.dataSource = dataSource; - this.uuid = uuid; - this.latch = new CountDownLatch(1); - } - /** - * - * @param latch - * Used for testing purposes - */ - protected void giveLatch(CountDownLatch latch) { - this.latch = latch; - } - - @Override - public void run() { - - - long expirationAge = lockProps.getAgingProperty(); - - try (Connection conn = dataSource.getConnection(); - PreparedStatement statement = conn - .prepareStatement("UPDATE pooling.locks SET expirationTime = ? WHERE host = ?");) { - - statement.setLong(1, System.currentTimeMillis() + expirationAge); - statement.setString(2, this.uuid.toString()); - statement.executeUpdate(); - - latch.countDown(); - - } catch (SQLException e) { - logger.error("error in Heartbeat.run()", e); - } - - } - - -} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java index d57de1f7..0853f125 100644 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java @@ -25,6 +25,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.UUID; import org.apache.commons.dbcp2.BasicDataSource; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,11 +37,6 @@ public class TargetLock { * The Target resource we want to lock */ private String resourceId; - - /** - * Properties object containing properties needed by class - */ - private DistributedLockingProperties lockProps; /** * Data source used to connect to the DB containing locks. @@ -61,23 +57,22 @@ public class TargetLock { * Constructs a TargetLock object. * * @param resourceId ID of the entity we want to lock - * @param lockProps Properties object containing properties needed by class * @param dataSource used to connect to the DB containing locks */ - public TargetLock (String resourceId, UUID uuid, String owner, DistributedLockingProperties lockProps, BasicDataSource dataSource) { + public TargetLock (String resourceId, UUID uuid, String owner, BasicDataSource dataSource) { this.resourceId = resourceId; this.uuid = uuid; this.owner = owner; - this.lockProps = lockProps; this.dataSource = dataSource; } /** * obtain a lock + * @param holdSec the amount of time, in seconds, that the lock should be held */ - public boolean lock() { + public boolean lock(int holdSec) { - return grabLock(); + return grabLock(TimeUnit.SECONDS.toMillis(holdSec)); } /** @@ -91,8 +86,9 @@ public class TargetLock { * "Grabs" lock by attempting to insert a new record in the db. * If the insert fails due to duplicate key error resource is already locked * so we call secondGrab. + * @param holdMs the amount of time, in milliseconds, that the lock should be held */ - private boolean grabLock() { + private boolean grabLock(long holdMs) { // try to insert a record into the table(thereby grabbing the lock) try (Connection conn = dataSource.getConnection(); @@ -100,16 +96,17 @@ public class TargetLock { PreparedStatement statement = conn.prepareStatement( "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) values (?, ?, ?, ?)")) { - statement.setString(1, this.resourceId); - statement.setString(2, this.uuid.toString()); - statement.setString(3, this.owner); - statement.setLong(4, System.currentTimeMillis() + lockProps.getAgingProperty()); + int i = 1; + statement.setString(i++, this.resourceId); + statement.setString(i++, this.uuid.toString()); + statement.setString(i++, this.owner); + statement.setLong(i++, System.currentTimeMillis() + holdMs); statement.executeUpdate(); } catch (SQLException e) { logger.error("error in TargetLock.grabLock()", e); - return secondGrab(); + return secondGrab(holdMs); } return true; @@ -118,22 +115,25 @@ public class TargetLock { /** * A second attempt at grabbing a lock. It first attempts to update the lock in case it is expired. * If that fails, it attempts to insert a new record again + * @param holdMs the amount of time, in milliseconds, that the lock should be held */ - private boolean secondGrab() { + private boolean secondGrab(long holdMs) { try (Connection conn = dataSource.getConnection(); PreparedStatement updateStatement = conn.prepareStatement( - "UPDATE pooling.locks SET host = ?, owner = ?, expirationTime = ? WHERE expirationTime <= ? AND resourceId = ?"); + "UPDATE pooling.locks SET host = ?, owner = ?, expirationTime = ? WHERE resourceId = ? AND (owner = ? OR expirationTime <= ?)"); PreparedStatement insertStatement = conn.prepareStatement( "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) values (?, ?, ?, ?)");) { - updateStatement.setString(1, this.uuid.toString()); - updateStatement.setString(2, this.owner); - updateStatement.setLong(3, System.currentTimeMillis() + lockProps.getAgingProperty()); - updateStatement.setLong(4, System.currentTimeMillis()); - updateStatement.setString(5, this.resourceId); + int i = 1; + updateStatement.setString(i++, this.uuid.toString()); + updateStatement.setString(i++, this.owner); + updateStatement.setLong(i++, System.currentTimeMillis() + holdMs); + updateStatement.setString(i++, this.resourceId); + updateStatement.setString(i++, this.owner); + updateStatement.setLong(i++, System.currentTimeMillis()); // The lock was expired and we grabbed it. // return true @@ -144,10 +144,11 @@ public class TargetLock { // If our update does not return 1 row, the lock either has not expired // or it was removed. Try one last grab else { - insertStatement.setString(1, this.resourceId); - insertStatement.setString(2, this.uuid.toString()); - insertStatement.setString(3, this.owner); - insertStatement.setLong(4, System.currentTimeMillis() + lockProps.getAgingProperty()); + i = 1; + insertStatement.setString(i++, this.resourceId); + insertStatement.setString(i++, this.uuid.toString()); + insertStatement.setString(i++, this.owner); + insertStatement.setLong(i++, System.currentTimeMillis() + holdMs); // If our insert returns 1 we successfully grabbed the lock return (insertStatement.executeUpdate() == 1); |