summaryrefslogtreecommitdiffstats
path: root/feature-distributed-locking/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'feature-distributed-locking/src/main')
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java207
1 files changed, 45 insertions, 162 deletions
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java
index 7ee786b9..6f83ea15 100644
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java
@@ -30,9 +30,9 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -42,15 +42,15 @@ import lombok.Setter;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.dbcp2.BasicDataSourceFactory;
import org.onap.policy.common.utils.network.NetworkUtil;
-import org.onap.policy.drools.core.lock.AlwaysFailLock;
-import org.onap.policy.drools.core.lock.Lock;
import org.onap.policy.drools.core.lock.LockCallback;
-import org.onap.policy.drools.core.lock.LockImpl;
import org.onap.policy.drools.core.lock.LockState;
import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
import org.onap.policy.drools.features.PolicyEngineFeatureApi;
import org.onap.policy.drools.persistence.SystemPersistenceConstants;
import org.onap.policy.drools.system.PolicyEngine;
+import org.onap.policy.drools.system.PolicyEngineConstants;
+import org.onap.policy.drools.system.internal.FeatureLockImpl;
+import org.onap.policy.drools.system.internal.LockManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,13 +78,12 @@ import org.slf4j.LoggerFactory;
* instance.</li>
* </dl>
*/
-public class DistributedLockManager implements PolicyResourceLockManager, PolicyEngineFeatureApi {
+public class DistributedLockManager extends LockManager<DistributedLockManager.DistributedLock>
+ implements PolicyEngineFeatureApi {
private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class);
private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
- private static final String LOCK_LOST_MSG = "lock lost";
- private static final String NOT_LOCKED_MSG = "not locked";
@Getter(AccessLevel.PROTECTED)
@Setter(AccessLevel.PROTECTED)
@@ -108,23 +107,23 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
* lock is added to the map, it remains in the map until the lock is lost or until the
* unlock request completes.
*/
- private final Map<String, DistributedLock> resource2lock = new ConcurrentHashMap<>();
+ private final Map<String, DistributedLock> resource2lock;
/**
- * Engine with which this manager is associated.
+ * Thread pool used to check for lock expiration and to notify owners when locks are
+ * granted or lost.
*/
- private PolicyEngine engine;
+ private ScheduledExecutorService exsvc = null;
/**
- * Feature properties.
+ * Used to cancel the expiration checker on shutdown.
*/
- private DistributedLockProperties featProps;
+ private ScheduledFuture<?> checker = null;
/**
- * Thread pool used to check for lock expiration and to notify owners when locks are
- * granted or lost.
+ * Feature properties.
*/
- private ScheduledExecutorService exsvc = null;
+ private DistributedLockProperties featProps;
/**
* Data source used to connect to the DB.
@@ -137,6 +136,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
*/
public DistributedLockManager() {
this.hostName = NetworkUtil.getHostname();
+ this.resource2lock = getResource2lock();
}
@Override
@@ -145,49 +145,10 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
}
@Override
- public boolean isAlive() {
- return (exsvc != null);
- }
-
- @Override
- public boolean start() {
- // handled via engine API
- return true;
- }
-
- @Override
- public boolean stop() {
- // handled via engine API
- return true;
- }
-
- @Override
- public void shutdown() {
- // handled via engine API
- }
-
- @Override
- public boolean isLocked() {
- return false;
- }
-
- @Override
- public boolean lock() {
- return true;
- }
-
- @Override
- public boolean unlock() {
- return true;
- }
-
- @Override
public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) {
try {
- this.engine = engine;
this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME));
- this.exsvc = getThreadPool();
this.dataSource = makeDataSource();
return this;
@@ -201,8 +162,9 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
public boolean afterStart(PolicyEngine engine) {
try {
+ exsvc = PolicyEngineConstants.getManager().getExecutorService();
exsvc.execute(this::deleteExpiredDbLocks);
- exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
+ checker = exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
setLatestInstance(this);
@@ -257,6 +219,11 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
@Override
public boolean afterStop(PolicyEngine engine) {
exsvc = null;
+
+ if (checker != null) {
+ checker.cancel(true);
+ }
+
closeDataSource();
return false;
}
@@ -278,49 +245,36 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
}
@Override
- public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback,
- boolean waitForLock) {
-
- if (latestInstance != this) {
- AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback);
- lock.notifyUnavailable();
- return lock;
- }
-
- DistributedLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback);
-
- DistributedLock existingLock = resource2lock.putIfAbsent(resourceId, lock);
-
- // do these outside of compute() to avoid blocking other map operations
- if (existingLock == null) {
- logger.debug("added lock to map {}", lock);
- lock.scheduleRequest(lock::doLock);
- } else {
- lock.deny("resource is busy", true);
- }
+ protected boolean hasInstanceChanged() {
+ return (getLatestInstance() != this);
+ }
- return lock;
+ @Override
+ protected void finishLock(DistributedLock lock) {
+ lock.scheduleRequest(lock::doLock);
}
/**
* Checks for expired locks.
*/
private void checkExpired() {
-
try {
logger.info("checking for expired locks");
Set<String> expiredIds = new HashSet<>(resource2lock.keySet());
identifyDbLocks(expiredIds);
expireLocks(expiredIds);
- exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
+ checker = exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
} catch (RejectedExecutionException e) {
logger.warn("thread pool is no longer accepting requests", e);
} catch (SQLException | RuntimeException e) {
logger.error("error checking expired locks", e);
- exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS);
+
+ if (isAlive()) {
+ checker = exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS);
+ }
}
logger.info("done checking for expired locks");
@@ -387,7 +341,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
DistributedLock lock = lockref.get();
if (lock != null) {
logger.debug("removed lock from map {}", lock);
- lock.deny(LOCK_LOST_MSG, false);
+ lock.deny(DistributedLock.LOCK_LOST_MSG, false);
}
}
}
@@ -395,7 +349,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
/**
* Distributed Lock implementation.
*/
- public static class DistributedLock extends LockImpl {
+ public static class DistributedLock extends FeatureLockImpl {
private static final String SQL_FAILED_MSG = "request failed for lock: {}";
private static final long serialVersionUID = 1L;
@@ -439,6 +393,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
* Constructs the object.
*/
public DistributedLock() {
+ this.feature = null;
this.hostName = "";
this.uuidString = "";
}
@@ -464,58 +419,9 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
this.uuidString = feature.uuidString;
}
- /**
- * Grants this lock. The notification is <i>always</i> invoked via the
- * <i>foreground</i> thread.
- */
- protected void grant() {
- synchronized (this) {
- if (isUnavailable()) {
- return;
- }
-
- setState(LockState.ACTIVE);
- }
-
- logger.info("lock granted: {}", this);
-
- notifyAvailable();
- }
-
- /**
- * Permanently denies this lock.
- *
- * @param reason the reason the lock was denied
- * @param foreground {@code true} if the callback can be invoked in the current
- * (i.e., foreground) thread, {@code false} if it should be invoked via the
- * executor
- */
- protected void deny(String reason, boolean foreground) {
- synchronized (this) {
- setState(LockState.UNAVAILABLE);
- }
-
- logger.info("{}: {}", reason, this);
-
- if (feature == null || foreground) {
- notifyUnavailable();
-
- } else {
- feature.exsvc.execute(this::notifyUnavailable);
- }
- }
-
@Override
public boolean free() {
- // do a quick check of the state
- if (isUnavailable()) {
- return false;
- }
-
- logger.info("releasing lock: {}", this);
-
- if (!attachFeature()) {
- setState(LockState.UNAVAILABLE);
+ if (!freeAllowed()) {
return false;
}
@@ -546,16 +452,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
@Override
public void extend(int holdSec, LockCallback callback) {
- if (holdSec < 0) {
- throw new IllegalArgumentException("holdSec is negative");
- }
-
- setHoldSec(holdSec);
- setCallback(callback);
-
- // do a quick check of the state
- if (isUnavailable() || !attachFeature()) {
- deny(LOCK_LOST_MSG, true);
+ if (!extendAllowed(holdSec, callback)) {
return;
}
@@ -580,19 +477,9 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
}
}
- /**
- * Attaches to the feature instance, if not already attached.
- *
- * @return {@code true} if the lock is now attached to a feature, {@code false}
- * otherwise
- */
- private synchronized boolean attachFeature() {
- if (feature != null) {
- // already attached
- return true;
- }
-
- feature = latestInstance;
+ @Override
+ protected boolean addToFeature() {
+ feature = getLatestInstance();
if (feature == null) {
logger.warn("no feature yet for {}", this);
return false;
@@ -613,7 +500,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
logger.debug("schedule lock action {}", this);
nretries = 0;
request = schedreq;
- feature.exsvc.execute(this::doRequest);
+ getThreadPool().execute(this::doRequest);
}
/**
@@ -633,7 +520,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
if (nretries++ < feature.featProps.getMaxRetries()) {
logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this);
request = req;
- feature.exsvc.schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS);
+ getThreadPool().schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS);
return;
}
}
@@ -752,7 +639,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
}
if (success) {
- grant();
+ grant(true);
return;
}
}
@@ -803,7 +690,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
* the record, thus we have to try to insert, if the update fails
*/
if (doDbUpdate(conn) || doDbInsert(conn)) {
- grant();
+ grant(true);
return;
}
}
@@ -927,10 +814,6 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
return SystemPersistenceConstants.getManager().getProperties(fileName);
}
- protected ScheduledExecutorService getThreadPool() {
- return engine.getExecutorService();
- }
-
protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
LockCallback callback) {
return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this);