diff options
Diffstat (limited to 'feature-distributed-locking/src/main/java')
-rw-r--r-- | feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java | 207 |
1 files changed, 45 insertions, 162 deletions
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java index 7ee786b9..6f83ea15 100644 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java @@ -30,9 +30,9 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -42,15 +42,15 @@ import lombok.Setter; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.dbcp2.BasicDataSourceFactory; import org.onap.policy.common.utils.network.NetworkUtil; -import org.onap.policy.drools.core.lock.AlwaysFailLock; -import org.onap.policy.drools.core.lock.Lock; import org.onap.policy.drools.core.lock.LockCallback; -import org.onap.policy.drools.core.lock.LockImpl; import org.onap.policy.drools.core.lock.LockState; import org.onap.policy.drools.core.lock.PolicyResourceLockManager; import org.onap.policy.drools.features.PolicyEngineFeatureApi; import org.onap.policy.drools.persistence.SystemPersistenceConstants; import org.onap.policy.drools.system.PolicyEngine; +import org.onap.policy.drools.system.PolicyEngineConstants; +import org.onap.policy.drools.system.internal.FeatureLockImpl; +import org.onap.policy.drools.system.internal.LockManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,13 +78,12 @@ import org.slf4j.LoggerFactory; * instance.</li> * </dl> */ -public class DistributedLockManager implements PolicyResourceLockManager, PolicyEngineFeatureApi { +public class DistributedLockManager extends LockManager<DistributedLockManager.DistributedLock> + implements PolicyEngineFeatureApi { private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class); private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking"; - private static final String LOCK_LOST_MSG = "lock lost"; - private static final String NOT_LOCKED_MSG = "not locked"; @Getter(AccessLevel.PROTECTED) @Setter(AccessLevel.PROTECTED) @@ -108,23 +107,23 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy * lock is added to the map, it remains in the map until the lock is lost or until the * unlock request completes. */ - private final Map<String, DistributedLock> resource2lock = new ConcurrentHashMap<>(); + private final Map<String, DistributedLock> resource2lock; /** - * Engine with which this manager is associated. + * Thread pool used to check for lock expiration and to notify owners when locks are + * granted or lost. */ - private PolicyEngine engine; + private ScheduledExecutorService exsvc = null; /** - * Feature properties. + * Used to cancel the expiration checker on shutdown. */ - private DistributedLockProperties featProps; + private ScheduledFuture<?> checker = null; /** - * Thread pool used to check for lock expiration and to notify owners when locks are - * granted or lost. + * Feature properties. */ - private ScheduledExecutorService exsvc = null; + private DistributedLockProperties featProps; /** * Data source used to connect to the DB. @@ -137,6 +136,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy */ public DistributedLockManager() { this.hostName = NetworkUtil.getHostname(); + this.resource2lock = getResource2lock(); } @Override @@ -145,49 +145,10 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy } @Override - public boolean isAlive() { - return (exsvc != null); - } - - @Override - public boolean start() { - // handled via engine API - return true; - } - - @Override - public boolean stop() { - // handled via engine API - return true; - } - - @Override - public void shutdown() { - // handled via engine API - } - - @Override - public boolean isLocked() { - return false; - } - - @Override - public boolean lock() { - return true; - } - - @Override - public boolean unlock() { - return true; - } - - @Override public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) { try { - this.engine = engine; this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME)); - this.exsvc = getThreadPool(); this.dataSource = makeDataSource(); return this; @@ -201,8 +162,9 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy public boolean afterStart(PolicyEngine engine) { try { + exsvc = PolicyEngineConstants.getManager().getExecutorService(); exsvc.execute(this::deleteExpiredDbLocks); - exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS); + checker = exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS); setLatestInstance(this); @@ -257,6 +219,11 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy @Override public boolean afterStop(PolicyEngine engine) { exsvc = null; + + if (checker != null) { + checker.cancel(true); + } + closeDataSource(); return false; } @@ -278,49 +245,36 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy } @Override - public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback, - boolean waitForLock) { - - if (latestInstance != this) { - AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback); - lock.notifyUnavailable(); - return lock; - } - - DistributedLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback); - - DistributedLock existingLock = resource2lock.putIfAbsent(resourceId, lock); - - // do these outside of compute() to avoid blocking other map operations - if (existingLock == null) { - logger.debug("added lock to map {}", lock); - lock.scheduleRequest(lock::doLock); - } else { - lock.deny("resource is busy", true); - } + protected boolean hasInstanceChanged() { + return (getLatestInstance() != this); + } - return lock; + @Override + protected void finishLock(DistributedLock lock) { + lock.scheduleRequest(lock::doLock); } /** * Checks for expired locks. */ private void checkExpired() { - try { logger.info("checking for expired locks"); Set<String> expiredIds = new HashSet<>(resource2lock.keySet()); identifyDbLocks(expiredIds); expireLocks(expiredIds); - exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS); + checker = exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS); } catch (RejectedExecutionException e) { logger.warn("thread pool is no longer accepting requests", e); } catch (SQLException | RuntimeException e) { logger.error("error checking expired locks", e); - exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS); + + if (isAlive()) { + checker = exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS); + } } logger.info("done checking for expired locks"); @@ -387,7 +341,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy DistributedLock lock = lockref.get(); if (lock != null) { logger.debug("removed lock from map {}", lock); - lock.deny(LOCK_LOST_MSG, false); + lock.deny(DistributedLock.LOCK_LOST_MSG, false); } } } @@ -395,7 +349,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy /** * Distributed Lock implementation. */ - public static class DistributedLock extends LockImpl { + public static class DistributedLock extends FeatureLockImpl { private static final String SQL_FAILED_MSG = "request failed for lock: {}"; private static final long serialVersionUID = 1L; @@ -439,6 +393,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy * Constructs the object. */ public DistributedLock() { + this.feature = null; this.hostName = ""; this.uuidString = ""; } @@ -464,58 +419,9 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy this.uuidString = feature.uuidString; } - /** - * Grants this lock. The notification is <i>always</i> invoked via the - * <i>foreground</i> thread. - */ - protected void grant() { - synchronized (this) { - if (isUnavailable()) { - return; - } - - setState(LockState.ACTIVE); - } - - logger.info("lock granted: {}", this); - - notifyAvailable(); - } - - /** - * Permanently denies this lock. - * - * @param reason the reason the lock was denied - * @param foreground {@code true} if the callback can be invoked in the current - * (i.e., foreground) thread, {@code false} if it should be invoked via the - * executor - */ - protected void deny(String reason, boolean foreground) { - synchronized (this) { - setState(LockState.UNAVAILABLE); - } - - logger.info("{}: {}", reason, this); - - if (feature == null || foreground) { - notifyUnavailable(); - - } else { - feature.exsvc.execute(this::notifyUnavailable); - } - } - @Override public boolean free() { - // do a quick check of the state - if (isUnavailable()) { - return false; - } - - logger.info("releasing lock: {}", this); - - if (!attachFeature()) { - setState(LockState.UNAVAILABLE); + if (!freeAllowed()) { return false; } @@ -546,16 +452,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy @Override public void extend(int holdSec, LockCallback callback) { - if (holdSec < 0) { - throw new IllegalArgumentException("holdSec is negative"); - } - - setHoldSec(holdSec); - setCallback(callback); - - // do a quick check of the state - if (isUnavailable() || !attachFeature()) { - deny(LOCK_LOST_MSG, true); + if (!extendAllowed(holdSec, callback)) { return; } @@ -580,19 +477,9 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy } } - /** - * Attaches to the feature instance, if not already attached. - * - * @return {@code true} if the lock is now attached to a feature, {@code false} - * otherwise - */ - private synchronized boolean attachFeature() { - if (feature != null) { - // already attached - return true; - } - - feature = latestInstance; + @Override + protected boolean addToFeature() { + feature = getLatestInstance(); if (feature == null) { logger.warn("no feature yet for {}", this); return false; @@ -613,7 +500,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy logger.debug("schedule lock action {}", this); nretries = 0; request = schedreq; - feature.exsvc.execute(this::doRequest); + getThreadPool().execute(this::doRequest); } /** @@ -633,7 +520,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy if (nretries++ < feature.featProps.getMaxRetries()) { logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this); request = req; - feature.exsvc.schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS); + getThreadPool().schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS); return; } } @@ -752,7 +639,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy } if (success) { - grant(); + grant(true); return; } } @@ -803,7 +690,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy * the record, thus we have to try to insert, if the update fails */ if (doDbUpdate(conn) || doDbInsert(conn)) { - grant(); + grant(true); return; } } @@ -927,10 +814,6 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy return SystemPersistenceConstants.getManager().getProperties(fileName); } - protected ScheduledExecutorService getThreadPool() { - return engine.getExecutorService(); - } - protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback) { return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this); |