summaryrefslogtreecommitdiffstats
path: root/feature-distributed-locking/src
diff options
context:
space:
mode:
Diffstat (limited to 'feature-distributed-locking/src')
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java207
-rw-r--r--feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java104
2 files changed, 84 insertions, 227 deletions
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java
index 7ee786b9..6f83ea15 100644
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java
@@ -30,9 +30,9 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -42,15 +42,15 @@ import lombok.Setter;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.dbcp2.BasicDataSourceFactory;
import org.onap.policy.common.utils.network.NetworkUtil;
-import org.onap.policy.drools.core.lock.AlwaysFailLock;
-import org.onap.policy.drools.core.lock.Lock;
import org.onap.policy.drools.core.lock.LockCallback;
-import org.onap.policy.drools.core.lock.LockImpl;
import org.onap.policy.drools.core.lock.LockState;
import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
import org.onap.policy.drools.features.PolicyEngineFeatureApi;
import org.onap.policy.drools.persistence.SystemPersistenceConstants;
import org.onap.policy.drools.system.PolicyEngine;
+import org.onap.policy.drools.system.PolicyEngineConstants;
+import org.onap.policy.drools.system.internal.FeatureLockImpl;
+import org.onap.policy.drools.system.internal.LockManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,13 +78,12 @@ import org.slf4j.LoggerFactory;
* instance.</li>
* </dl>
*/
-public class DistributedLockManager implements PolicyResourceLockManager, PolicyEngineFeatureApi {
+public class DistributedLockManager extends LockManager<DistributedLockManager.DistributedLock>
+ implements PolicyEngineFeatureApi {
private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class);
private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
- private static final String LOCK_LOST_MSG = "lock lost";
- private static final String NOT_LOCKED_MSG = "not locked";
@Getter(AccessLevel.PROTECTED)
@Setter(AccessLevel.PROTECTED)
@@ -108,23 +107,23 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
* lock is added to the map, it remains in the map until the lock is lost or until the
* unlock request completes.
*/
- private final Map<String, DistributedLock> resource2lock = new ConcurrentHashMap<>();
+ private final Map<String, DistributedLock> resource2lock;
/**
- * Engine with which this manager is associated.
+ * Thread pool used to check for lock expiration and to notify owners when locks are
+ * granted or lost.
*/
- private PolicyEngine engine;
+ private ScheduledExecutorService exsvc = null;
/**
- * Feature properties.
+ * Used to cancel the expiration checker on shutdown.
*/
- private DistributedLockProperties featProps;
+ private ScheduledFuture<?> checker = null;
/**
- * Thread pool used to check for lock expiration and to notify owners when locks are
- * granted or lost.
+ * Feature properties.
*/
- private ScheduledExecutorService exsvc = null;
+ private DistributedLockProperties featProps;
/**
* Data source used to connect to the DB.
@@ -137,6 +136,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
*/
public DistributedLockManager() {
this.hostName = NetworkUtil.getHostname();
+ this.resource2lock = getResource2lock();
}
@Override
@@ -145,49 +145,10 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
}
@Override
- public boolean isAlive() {
- return (exsvc != null);
- }
-
- @Override
- public boolean start() {
- // handled via engine API
- return true;
- }
-
- @Override
- public boolean stop() {
- // handled via engine API
- return true;
- }
-
- @Override
- public void shutdown() {
- // handled via engine API
- }
-
- @Override
- public boolean isLocked() {
- return false;
- }
-
- @Override
- public boolean lock() {
- return true;
- }
-
- @Override
- public boolean unlock() {
- return true;
- }
-
- @Override
public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) {
try {
- this.engine = engine;
this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME));
- this.exsvc = getThreadPool();
this.dataSource = makeDataSource();
return this;
@@ -201,8 +162,9 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
public boolean afterStart(PolicyEngine engine) {
try {
+ exsvc = PolicyEngineConstants.getManager().getExecutorService();
exsvc.execute(this::deleteExpiredDbLocks);
- exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
+ checker = exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
setLatestInstance(this);
@@ -257,6 +219,11 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
@Override
public boolean afterStop(PolicyEngine engine) {
exsvc = null;
+
+ if (checker != null) {
+ checker.cancel(true);
+ }
+
closeDataSource();
return false;
}
@@ -278,49 +245,36 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
}
@Override
- public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback,
- boolean waitForLock) {
-
- if (latestInstance != this) {
- AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback);
- lock.notifyUnavailable();
- return lock;
- }
-
- DistributedLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback);
-
- DistributedLock existingLock = resource2lock.putIfAbsent(resourceId, lock);
-
- // do these outside of compute() to avoid blocking other map operations
- if (existingLock == null) {
- logger.debug("added lock to map {}", lock);
- lock.scheduleRequest(lock::doLock);
- } else {
- lock.deny("resource is busy", true);
- }
+ protected boolean hasInstanceChanged() {
+ return (getLatestInstance() != this);
+ }
- return lock;
+ @Override
+ protected void finishLock(DistributedLock lock) {
+ lock.scheduleRequest(lock::doLock);
}
/**
* Checks for expired locks.
*/
private void checkExpired() {
-
try {
logger.info("checking for expired locks");
Set<String> expiredIds = new HashSet<>(resource2lock.keySet());
identifyDbLocks(expiredIds);
expireLocks(expiredIds);
- exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
+ checker = exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
} catch (RejectedExecutionException e) {
logger.warn("thread pool is no longer accepting requests", e);
} catch (SQLException | RuntimeException e) {
logger.error("error checking expired locks", e);
- exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS);
+
+ if (isAlive()) {
+ checker = exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS);
+ }
}
logger.info("done checking for expired locks");
@@ -387,7 +341,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
DistributedLock lock = lockref.get();
if (lock != null) {
logger.debug("removed lock from map {}", lock);
- lock.deny(LOCK_LOST_MSG, false);
+ lock.deny(DistributedLock.LOCK_LOST_MSG, false);
}
}
}
@@ -395,7 +349,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
/**
* Distributed Lock implementation.
*/
- public static class DistributedLock extends LockImpl {
+ public static class DistributedLock extends FeatureLockImpl {
private static final String SQL_FAILED_MSG = "request failed for lock: {}";
private static final long serialVersionUID = 1L;
@@ -439,6 +393,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
* Constructs the object.
*/
public DistributedLock() {
+ this.feature = null;
this.hostName = "";
this.uuidString = "";
}
@@ -464,58 +419,9 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
this.uuidString = feature.uuidString;
}
- /**
- * Grants this lock. The notification is <i>always</i> invoked via the
- * <i>foreground</i> thread.
- */
- protected void grant() {
- synchronized (this) {
- if (isUnavailable()) {
- return;
- }
-
- setState(LockState.ACTIVE);
- }
-
- logger.info("lock granted: {}", this);
-
- notifyAvailable();
- }
-
- /**
- * Permanently denies this lock.
- *
- * @param reason the reason the lock was denied
- * @param foreground {@code true} if the callback can be invoked in the current
- * (i.e., foreground) thread, {@code false} if it should be invoked via the
- * executor
- */
- protected void deny(String reason, boolean foreground) {
- synchronized (this) {
- setState(LockState.UNAVAILABLE);
- }
-
- logger.info("{}: {}", reason, this);
-
- if (feature == null || foreground) {
- notifyUnavailable();
-
- } else {
- feature.exsvc.execute(this::notifyUnavailable);
- }
- }
-
@Override
public boolean free() {
- // do a quick check of the state
- if (isUnavailable()) {
- return false;
- }
-
- logger.info("releasing lock: {}", this);
-
- if (!attachFeature()) {
- setState(LockState.UNAVAILABLE);
+ if (!freeAllowed()) {
return false;
}
@@ -546,16 +452,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
@Override
public void extend(int holdSec, LockCallback callback) {
- if (holdSec < 0) {
- throw new IllegalArgumentException("holdSec is negative");
- }
-
- setHoldSec(holdSec);
- setCallback(callback);
-
- // do a quick check of the state
- if (isUnavailable() || !attachFeature()) {
- deny(LOCK_LOST_MSG, true);
+ if (!extendAllowed(holdSec, callback)) {
return;
}
@@ -580,19 +477,9 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
}
}
- /**
- * Attaches to the feature instance, if not already attached.
- *
- * @return {@code true} if the lock is now attached to a feature, {@code false}
- * otherwise
- */
- private synchronized boolean attachFeature() {
- if (feature != null) {
- // already attached
- return true;
- }
-
- feature = latestInstance;
+ @Override
+ protected boolean addToFeature() {
+ feature = getLatestInstance();
if (feature == null) {
logger.warn("no feature yet for {}", this);
return false;
@@ -613,7 +500,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
logger.debug("schedule lock action {}", this);
nretries = 0;
request = schedreq;
- feature.exsvc.execute(this::doRequest);
+ getThreadPool().execute(this::doRequest);
}
/**
@@ -633,7 +520,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
if (nretries++ < feature.featProps.getMaxRetries()) {
logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this);
request = req;
- feature.exsvc.schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS);
+ getThreadPool().schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS);
return;
}
}
@@ -752,7 +639,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
}
if (success) {
- grant();
+ grant(true);
return;
}
}
@@ -803,7 +690,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
* the record, thus we have to try to insert, if the update fails
*/
if (doDbUpdate(conn) || doDbInsert(conn)) {
- grant();
+ grant(true);
return;
}
}
@@ -927,10 +814,6 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy
return SystemPersistenceConstants.getManager().getProperties(fileName);
}
- protected ScheduledExecutorService getThreadPool() {
- return engine.getExecutorService();
- }
-
protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
LockCallback callback) {
return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this);
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java
index 0ba92c51..c996d8d9 100644
--- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java
+++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java
@@ -30,6 +30,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doThrow;
@@ -55,6 +56,7 @@ import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -115,16 +117,19 @@ public class DistributedLockManagerTest {
private static ScheduledExecutorService realExec;
@Mock
+ private PolicyEngine engine;
+
+ @Mock
private ScheduledExecutorService exsvc;
@Mock
- private LockCallback callback;
+ private ScheduledFuture<?> checker;
@Mock
- private BasicDataSource datasrc;
+ private LockCallback callback;
@Mock
- private PolicyEngine engine;
+ private BasicDataSource datasrc;
private DistributedLock lock;
@@ -153,7 +158,6 @@ public class DistributedLockManagerTest {
saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
realExec = Executors.newScheduledThreadPool(3);
- Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
}
/**
@@ -181,8 +185,6 @@ public class DistributedLockManagerTest {
cleanDb();
- when(engine.getExecutorService()).thenReturn(exsvc);
-
feature = new MyLockingFeature(true);
}
@@ -214,52 +216,12 @@ public class DistributedLockManagerTest {
.anyMatch(obj -> obj instanceof DistributedLockManager));
}
- /**
- * Tests constructor() when properties are invalid.
- */
- @Test
- public void testDistributedLockManagerInvalidProperties() {
- // use properties containing an invalid value
- Properties props = new Properties();
- props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "abc");
-
- feature = new MyLockingFeature(false) {
- @Override
- protected Properties getProperties(String fileName) {
- return props;
- }
- };
-
- assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
- }
-
@Test
public void testGetSequenceNumber() {
assertEquals(1000, feature.getSequenceNumber());
}
@Test
- public void testStartableApi() {
- assertTrue(feature.isAlive());
- assertTrue(feature.start());
- assertTrue(feature.stop());
- feature.shutdown();
-
- // above should have had no effect
- assertTrue(feature.isAlive());
-
- feature.afterStop(engine);
- assertFalse(feature.isAlive());
- }
-
- @Test
- public void testLockApi() {
- assertFalse(feature.isLocked());
- assertTrue(feature.lock());
- assertTrue(feature.unlock());
- }
-
- @Test
public void testBeforeCreateLockManager() {
assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
}
@@ -298,8 +260,7 @@ public class DistributedLockManagerTest {
feature = new MyLockingFeature(false);
- when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
- .thenThrow(new IllegalArgumentException(EXPECTED_EXCEPTION));
+ doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
}
@@ -350,6 +311,7 @@ public class DistributedLockManagerTest {
@Test
public void testAfterStop() {
shutdownFeature();
+ verify(checker).cancel(anyBoolean());
feature = new DistributedLockManager();
@@ -424,7 +386,7 @@ public class DistributedLockManagerTest {
}
/**
- * Tests lock() when the feature is not the latest instance.
+ * Tests createLock() when the feature is not the latest instance.
*/
@Test
public void testCreateLockNotLatestInstance() {
@@ -522,6 +484,27 @@ public class DistributedLockManagerTest {
runChecker(0, 0, RETRY_SEC);
}
+ /**
+ * Tests checkExpired(), when getConnection() throws an exception and the feature is
+ * no longer alive.
+ */
+ @Test
+ public void testCheckExpiredSqlExFeatureStopped() {
+ // use a data source that throws an exception when getConnection() is called
+ feature = new InvalidDbLockingFeature(TRANSIENT) {
+ @Override
+ protected SQLException makeEx() {
+ this.stop();
+ return super.makeEx();
+ }
+ };
+
+ runChecker(0, 0, EXPIRE_SEC);
+
+ // it should NOT have scheduled another check
+ verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
+ }
+
@Test
public void testExpireLocks() throws SQLException {
AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
@@ -637,20 +620,6 @@ public class DistributedLockManagerTest {
verify(callback).lockAvailable(lock);
}
- /**
- * Tests grant() when the lock is already unavailable.
- */
- @Test
- public void testDistributedLockGrantUnavailable() {
- DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
- lock.setState(LockState.UNAVAILABLE);
- lock.grant();
-
- assertTrue(lock.isUnavailable());
- verify(callback, never()).lockAvailable(any());
- verify(callback, never()).lockUnavailable(any());
- }
-
@Test
public void testDistributedLockDeny() {
// get a lock
@@ -1464,6 +1433,8 @@ public class DistributedLockManagerTest {
*/
@Test
public void testMultiThreaded() throws InterruptedException {
+ Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
+
feature = new DistributedLockManager();
feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
feature.afterStart(PolicyEngineConstants.getManager());
@@ -1661,10 +1632,12 @@ public class DistributedLockManagerTest {
shutdownFeature();
exsvc = mock(ScheduledExecutorService.class);
- when(engine.getExecutorService()).thenReturn(exsvc);
+ when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
+ Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
if (init) {
beforeCreateLockManager(engine, new Properties());
+ start();
afterStart(engine);
}
}
@@ -1685,6 +1658,7 @@ public class DistributedLockManagerTest {
this.isTransient = isTransient;
this.beforeCreateLockManager(engine, new Properties());
+ this.start();
this.afterStart(engine);
}
@@ -1704,7 +1678,7 @@ public class DistributedLockManagerTest {
return datasrc;
}
- private SQLException makeEx() {
+ protected SQLException makeEx() {
if (isTransient) {
return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));