summaryrefslogtreecommitdiffstats
path: root/feature-distributed-locking/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-distributed-locking/src/main/java')
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java52
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java34
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java107
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java55
4 files changed, 36 insertions, 212 deletions
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java
index b30fca76..03872e1e 100644
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java
@@ -24,14 +24,9 @@ import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;
import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.dbcp2.BasicDataSourceFactory;
import org.onap.policy.common.utils.properties.exception.PropertyException;
-import org.onap.policy.drools.core.lock.LockRequestFuture;
import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI;
import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
import org.onap.policy.drools.persistence.SystemPersistence;
@@ -57,11 +52,6 @@ public class DistributedLockingFeature implements PolicyEngineFeatureAPI, Policy
private DistributedLockingProperties lockProps;
/**
- *ScheduledExecutorService for LockHeartbeat
- */
- private ScheduledExecutorService scheduledExecutorService;
-
- /**
* Data source used to connect to the DB containing locks.
*/
private BasicDataSource dataSource;
@@ -71,43 +61,36 @@ public class DistributedLockingFeature implements PolicyEngineFeatureAPI, Policy
*/
private static final UUID uuid = UUID.randomUUID();
-
- /**
- * Reference to Heartbeat
- */
- private static Heartbeat heartbeat = null;
-
@Override
public int getSequenceNumber() {
return 1000;
}
@Override
- public Future<Boolean> beforeLock(String resourceId, String owner, Callback callback) {
-
- TargetLock tLock = new TargetLock(resourceId, uuid, owner, lockProps, dataSource);
+ public OperResult beforeLock(String resourceId, String owner, int holdSec) {
- return new LockRequestFuture(resourceId, owner, tLock.lock());
-
+ TargetLock tLock = new TargetLock(resourceId, uuid, owner, dataSource);
+
+ return(tLock.lock(holdSec) ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
}
@Override
public OperResult beforeUnlock(String resourceId, String owner) {
- TargetLock tLock = new TargetLock(resourceId, uuid, owner, lockProps, dataSource);
+ TargetLock tLock = new TargetLock(resourceId, uuid, owner, dataSource);
return(tLock.unlock() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
}
@Override
public OperResult beforeIsLockedBy(String resourceId, String owner) {
- TargetLock tLock = new TargetLock(resourceId, uuid, owner, lockProps, dataSource);
+ TargetLock tLock = new TargetLock(resourceId, uuid, owner, dataSource);
return(tLock.isActive() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
}
@Override
public OperResult beforeIsLocked(String resourceId) {
- TargetLock tLock = new TargetLock(resourceId, uuid, "dummyOwner", lockProps, dataSource);
+ TargetLock tLock = new TargetLock(resourceId, uuid, "dummyOwner", dataSource);
return(tLock.isLocked() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
}
@@ -130,13 +113,8 @@ public class DistributedLockingFeature implements PolicyEngineFeatureAPI, Policy
throw new DistributedLockingFeatureException(e);
}
- long heartbeatInterval = this.lockProps.getHeartBeatIntervalProperty();
-
cleanLockTable();
- initHeartbeat();
- this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
- this.scheduledExecutorService.scheduleAtFixedRate(heartbeat, heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS);
return false;
}
@@ -164,7 +142,6 @@ public class DistributedLockingFeature implements PolicyEngineFeatureAPI, Policy
*/
@Override
public boolean beforeShutdown(PolicyEngine engine) {
- scheduledExecutorService.shutdown();
cleanLockTable();
return false;
}
@@ -186,18 +163,5 @@ public class DistributedLockingFeature implements PolicyEngineFeatureAPI, Policy
logger.error("error in refreshLockTable()", e);
}
- }
-
- /**
- * Initialize the static heartbeat object
- */
- private void initHeartbeat() {
- heartbeat = new Heartbeat(uuid, lockProps, dataSource);
-
- }
-
- public static Heartbeat getHeartbeat() {
- return heartbeat;
- }
-
+ }
}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java
index b82f4b00..1f55a4cb 100644
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java
@@ -35,8 +35,6 @@ public class DistributedLockingProperties extends PropertyConfiguration {
public static final String DB_URL = "javax.persistence.jdbc.url";
public static final String DB_USER = "javax.persistence.jdbc.user";
public static final String DB_PWD = "javax.persistence.jdbc.password";
- public static final String AGING_PROPERTY = PREFIX + "lock.aging";
- public static final String HEARTBEAT_INTERVAL_PROPERTY = PREFIX + "heartbeat.interval";
/**
* Properties from which this was constructed.
@@ -67,18 +65,6 @@ public class DistributedLockingProperties extends PropertyConfiguration {
@Property(name = DB_PWD)
private String dbPwd;
- /**
- * Used to set expiration time for lock.
- */
- @Property(name = AGING_PROPERTY, defaultValue = "300000")
- private long agingProperty;
-
- /**
- * Indicates intervals at which we refresh locks.
- */
- @Property(name = HEARTBEAT_INTERVAL_PROPERTY, defaultValue = "60000")
- private long heartBeatIntervalProperty;
-
public DistributedLockingProperties(Properties props) throws PropertyException {
super(props);
source = props;
@@ -110,16 +96,6 @@ public class DistributedLockingProperties extends PropertyConfiguration {
}
- public long getAgingProperty() {
- return agingProperty;
- }
-
-
- public long getHeartBeatIntervalProperty() {
- return heartBeatIntervalProperty;
- }
-
-
public void setDbDriver(String dbDriver) {
this.dbDriver = dbDriver;
}
@@ -139,14 +115,4 @@ public class DistributedLockingProperties extends PropertyConfiguration {
this.dbPwd = dbPwd;
}
-
- public void setAgingProperty(long agingProperty) {
- this.agingProperty = agingProperty;
- }
-
-
- public void setHeartBeatIntervalProperty(long heartBeatIntervalProperty) {
- this.heartBeatIntervalProperty = heartBeatIntervalProperty;
- }
-
}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java
deleted file mode 100644
index ccfb4c7d..00000000
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import org.apache.commons.dbcp2.BasicDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * This runnable class scans the locks table for all locks owned by this host.
- * It refreshes the expiration time of each lock using the locking.distributed.aging
- * property
- *
- */
-public class Heartbeat implements Runnable{
-
- private static final Logger logger = LoggerFactory.getLogger(Heartbeat.class);
-
- /**
- * Properties object containing properties needed by class
- */
- private DistributedLockingProperties lockProps;
-
- /**
- * Data source used to connect to the DB containing locks.
- */
- private BasicDataSource dataSource;
-
- /**
- * UUID
- */
- private UUID uuid;
-
- /**
- * Countdown latch for testing
- */
- private volatile CountDownLatch latch;
-
- /**
- *
- * @param uuid
- * @param lockProps
- * @param dataSource
- */
- public Heartbeat(UUID uuid, DistributedLockingProperties lockProps, BasicDataSource dataSource) {
- this.lockProps = lockProps;
- this.dataSource = dataSource;
- this.uuid = uuid;
- this.latch = new CountDownLatch(1);
- }
- /**
- *
- * @param latch
- * Used for testing purposes
- */
- protected void giveLatch(CountDownLatch latch) {
- this.latch = latch;
- }
-
- @Override
- public void run() {
-
-
- long expirationAge = lockProps.getAgingProperty();
-
- try (Connection conn = dataSource.getConnection();
- PreparedStatement statement = conn
- .prepareStatement("UPDATE pooling.locks SET expirationTime = ? WHERE host = ?");) {
-
- statement.setLong(1, System.currentTimeMillis() + expirationAge);
- statement.setString(2, this.uuid.toString());
- statement.executeUpdate();
-
- latch.countDown();
-
- } catch (SQLException e) {
- logger.error("error in Heartbeat.run()", e);
- }
-
- }
-
-
-}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java
index d57de1f7..0853f125 100644
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java
@@ -25,6 +25,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.UUID;
import org.apache.commons.dbcp2.BasicDataSource;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,11 +37,6 @@ public class TargetLock {
* The Target resource we want to lock
*/
private String resourceId;
-
- /**
- * Properties object containing properties needed by class
- */
- private DistributedLockingProperties lockProps;
/**
* Data source used to connect to the DB containing locks.
@@ -61,23 +57,22 @@ public class TargetLock {
* Constructs a TargetLock object.
*
* @param resourceId ID of the entity we want to lock
- * @param lockProps Properties object containing properties needed by class
* @param dataSource used to connect to the DB containing locks
*/
- public TargetLock (String resourceId, UUID uuid, String owner, DistributedLockingProperties lockProps, BasicDataSource dataSource) {
+ public TargetLock (String resourceId, UUID uuid, String owner, BasicDataSource dataSource) {
this.resourceId = resourceId;
this.uuid = uuid;
this.owner = owner;
- this.lockProps = lockProps;
this.dataSource = dataSource;
}
/**
* obtain a lock
+ * @param holdSec the amount of time, in seconds, that the lock should be held
*/
- public boolean lock() {
+ public boolean lock(int holdSec) {
- return grabLock();
+ return grabLock(TimeUnit.SECONDS.toMillis(holdSec));
}
/**
@@ -91,8 +86,9 @@ public class TargetLock {
* "Grabs" lock by attempting to insert a new record in the db.
* If the insert fails due to duplicate key error resource is already locked
* so we call secondGrab.
+ * @param holdMs the amount of time, in milliseconds, that the lock should be held
*/
- private boolean grabLock() {
+ private boolean grabLock(long holdMs) {
// try to insert a record into the table(thereby grabbing the lock)
try (Connection conn = dataSource.getConnection();
@@ -100,16 +96,17 @@ public class TargetLock {
PreparedStatement statement = conn.prepareStatement(
"INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) values (?, ?, ?, ?)")) {
- statement.setString(1, this.resourceId);
- statement.setString(2, this.uuid.toString());
- statement.setString(3, this.owner);
- statement.setLong(4, System.currentTimeMillis() + lockProps.getAgingProperty());
+ int i = 1;
+ statement.setString(i++, this.resourceId);
+ statement.setString(i++, this.uuid.toString());
+ statement.setString(i++, this.owner);
+ statement.setLong(i++, System.currentTimeMillis() + holdMs);
statement.executeUpdate();
}
catch (SQLException e) {
logger.error("error in TargetLock.grabLock()", e);
- return secondGrab();
+ return secondGrab(holdMs);
}
return true;
@@ -118,22 +115,25 @@ public class TargetLock {
/**
* A second attempt at grabbing a lock. It first attempts to update the lock in case it is expired.
* If that fails, it attempts to insert a new record again
+ * @param holdMs the amount of time, in milliseconds, that the lock should be held
*/
- private boolean secondGrab() {
+ private boolean secondGrab(long holdMs) {
try (Connection conn = dataSource.getConnection();
PreparedStatement updateStatement = conn.prepareStatement(
- "UPDATE pooling.locks SET host = ?, owner = ?, expirationTime = ? WHERE expirationTime <= ? AND resourceId = ?");
+ "UPDATE pooling.locks SET host = ?, owner = ?, expirationTime = ? WHERE resourceId = ? AND (owner = ? OR expirationTime <= ?)");
PreparedStatement insertStatement = conn.prepareStatement(
"INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) values (?, ?, ?, ?)");) {
- updateStatement.setString(1, this.uuid.toString());
- updateStatement.setString(2, this.owner);
- updateStatement.setLong(3, System.currentTimeMillis() + lockProps.getAgingProperty());
- updateStatement.setLong(4, System.currentTimeMillis());
- updateStatement.setString(5, this.resourceId);
+ int i = 1;
+ updateStatement.setString(i++, this.uuid.toString());
+ updateStatement.setString(i++, this.owner);
+ updateStatement.setLong(i++, System.currentTimeMillis() + holdMs);
+ updateStatement.setString(i++, this.resourceId);
+ updateStatement.setString(i++, this.owner);
+ updateStatement.setLong(i++, System.currentTimeMillis());
// The lock was expired and we grabbed it.
// return true
@@ -144,10 +144,11 @@ public class TargetLock {
// If our update does not return 1 row, the lock either has not expired
// or it was removed. Try one last grab
else {
- insertStatement.setString(1, this.resourceId);
- insertStatement.setString(2, this.uuid.toString());
- insertStatement.setString(3, this.owner);
- insertStatement.setLong(4, System.currentTimeMillis() + lockProps.getAgingProperty());
+ i = 1;
+ insertStatement.setString(i++, this.resourceId);
+ insertStatement.setString(i++, this.uuid.toString());
+ insertStatement.setString(i++, this.owner);
+ insertStatement.setLong(i++, System.currentTimeMillis() + holdMs);
// If our insert returns 1 we successfully grabbed the lock
return (insertStatement.executeUpdate() == 1);