diff options
author | Jim Hahn <jrh3@att.com> | 2019-10-30 16:28:55 -0400 |
---|---|---|
committer | Jim Hahn <jrh3@att.com> | 2019-10-31 16:24:04 -0400 |
commit | d437070115d2e7c151deb2dea680007e20651413 (patch) | |
tree | c3a1a7c02919f69bc747b7b465c1c5cf5bffe1c6 /feature-distributed-locking/src | |
parent | 01abf11819c0cd797bc3170fbcce04cffb020b27 (diff) |
Refactor duplicate code from lock managers
Change-Id: I8910a1a4267d824f064b52c6ad08945590bd9617
Issue-ID: POLICY-2203
Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'feature-distributed-locking/src')
2 files changed, 84 insertions, 227 deletions
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java index 7ee786b9..6f83ea15 100644 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java @@ -30,9 +30,9 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -42,15 +42,15 @@ import lombok.Setter; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.dbcp2.BasicDataSourceFactory; import org.onap.policy.common.utils.network.NetworkUtil; -import org.onap.policy.drools.core.lock.AlwaysFailLock; -import org.onap.policy.drools.core.lock.Lock; import org.onap.policy.drools.core.lock.LockCallback; -import org.onap.policy.drools.core.lock.LockImpl; import org.onap.policy.drools.core.lock.LockState; import org.onap.policy.drools.core.lock.PolicyResourceLockManager; import org.onap.policy.drools.features.PolicyEngineFeatureApi; import org.onap.policy.drools.persistence.SystemPersistenceConstants; import org.onap.policy.drools.system.PolicyEngine; +import org.onap.policy.drools.system.PolicyEngineConstants; +import org.onap.policy.drools.system.internal.FeatureLockImpl; +import org.onap.policy.drools.system.internal.LockManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,13 +78,12 @@ import org.slf4j.LoggerFactory; * instance.</li> * </dl> */ -public class DistributedLockManager implements PolicyResourceLockManager, PolicyEngineFeatureApi { +public class DistributedLockManager extends LockManager<DistributedLockManager.DistributedLock> + implements PolicyEngineFeatureApi { private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class); private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking"; - private static final String LOCK_LOST_MSG = "lock lost"; - private static final String NOT_LOCKED_MSG = "not locked"; @Getter(AccessLevel.PROTECTED) @Setter(AccessLevel.PROTECTED) @@ -108,23 +107,23 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy * lock is added to the map, it remains in the map until the lock is lost or until the * unlock request completes. */ - private final Map<String, DistributedLock> resource2lock = new ConcurrentHashMap<>(); + private final Map<String, DistributedLock> resource2lock; /** - * Engine with which this manager is associated. + * Thread pool used to check for lock expiration and to notify owners when locks are + * granted or lost. */ - private PolicyEngine engine; + private ScheduledExecutorService exsvc = null; /** - * Feature properties. + * Used to cancel the expiration checker on shutdown. */ - private DistributedLockProperties featProps; + private ScheduledFuture<?> checker = null; /** - * Thread pool used to check for lock expiration and to notify owners when locks are - * granted or lost. + * Feature properties. */ - private ScheduledExecutorService exsvc = null; + private DistributedLockProperties featProps; /** * Data source used to connect to the DB. @@ -137,6 +136,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy */ public DistributedLockManager() { this.hostName = NetworkUtil.getHostname(); + this.resource2lock = getResource2lock(); } @Override @@ -145,49 +145,10 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy } @Override - public boolean isAlive() { - return (exsvc != null); - } - - @Override - public boolean start() { - // handled via engine API - return true; - } - - @Override - public boolean stop() { - // handled via engine API - return true; - } - - @Override - public void shutdown() { - // handled via engine API - } - - @Override - public boolean isLocked() { - return false; - } - - @Override - public boolean lock() { - return true; - } - - @Override - public boolean unlock() { - return true; - } - - @Override public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) { try { - this.engine = engine; this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME)); - this.exsvc = getThreadPool(); this.dataSource = makeDataSource(); return this; @@ -201,8 +162,9 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy public boolean afterStart(PolicyEngine engine) { try { + exsvc = PolicyEngineConstants.getManager().getExecutorService(); exsvc.execute(this::deleteExpiredDbLocks); - exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS); + checker = exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS); setLatestInstance(this); @@ -257,6 +219,11 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy @Override public boolean afterStop(PolicyEngine engine) { exsvc = null; + + if (checker != null) { + checker.cancel(true); + } + closeDataSource(); return false; } @@ -278,49 +245,36 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy } @Override - public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback, - boolean waitForLock) { - - if (latestInstance != this) { - AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback); - lock.notifyUnavailable(); - return lock; - } - - DistributedLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback); - - DistributedLock existingLock = resource2lock.putIfAbsent(resourceId, lock); - - // do these outside of compute() to avoid blocking other map operations - if (existingLock == null) { - logger.debug("added lock to map {}", lock); - lock.scheduleRequest(lock::doLock); - } else { - lock.deny("resource is busy", true); - } + protected boolean hasInstanceChanged() { + return (getLatestInstance() != this); + } - return lock; + @Override + protected void finishLock(DistributedLock lock) { + lock.scheduleRequest(lock::doLock); } /** * Checks for expired locks. */ private void checkExpired() { - try { logger.info("checking for expired locks"); Set<String> expiredIds = new HashSet<>(resource2lock.keySet()); identifyDbLocks(expiredIds); expireLocks(expiredIds); - exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS); + checker = exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS); } catch (RejectedExecutionException e) { logger.warn("thread pool is no longer accepting requests", e); } catch (SQLException | RuntimeException e) { logger.error("error checking expired locks", e); - exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS); + + if (isAlive()) { + checker = exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS); + } } logger.info("done checking for expired locks"); @@ -387,7 +341,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy DistributedLock lock = lockref.get(); if (lock != null) { logger.debug("removed lock from map {}", lock); - lock.deny(LOCK_LOST_MSG, false); + lock.deny(DistributedLock.LOCK_LOST_MSG, false); } } } @@ -395,7 +349,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy /** * Distributed Lock implementation. */ - public static class DistributedLock extends LockImpl { + public static class DistributedLock extends FeatureLockImpl { private static final String SQL_FAILED_MSG = "request failed for lock: {}"; private static final long serialVersionUID = 1L; @@ -439,6 +393,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy * Constructs the object. */ public DistributedLock() { + this.feature = null; this.hostName = ""; this.uuidString = ""; } @@ -464,58 +419,9 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy this.uuidString = feature.uuidString; } - /** - * Grants this lock. The notification is <i>always</i> invoked via the - * <i>foreground</i> thread. - */ - protected void grant() { - synchronized (this) { - if (isUnavailable()) { - return; - } - - setState(LockState.ACTIVE); - } - - logger.info("lock granted: {}", this); - - notifyAvailable(); - } - - /** - * Permanently denies this lock. - * - * @param reason the reason the lock was denied - * @param foreground {@code true} if the callback can be invoked in the current - * (i.e., foreground) thread, {@code false} if it should be invoked via the - * executor - */ - protected void deny(String reason, boolean foreground) { - synchronized (this) { - setState(LockState.UNAVAILABLE); - } - - logger.info("{}: {}", reason, this); - - if (feature == null || foreground) { - notifyUnavailable(); - - } else { - feature.exsvc.execute(this::notifyUnavailable); - } - } - @Override public boolean free() { - // do a quick check of the state - if (isUnavailable()) { - return false; - } - - logger.info("releasing lock: {}", this); - - if (!attachFeature()) { - setState(LockState.UNAVAILABLE); + if (!freeAllowed()) { return false; } @@ -546,16 +452,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy @Override public void extend(int holdSec, LockCallback callback) { - if (holdSec < 0) { - throw new IllegalArgumentException("holdSec is negative"); - } - - setHoldSec(holdSec); - setCallback(callback); - - // do a quick check of the state - if (isUnavailable() || !attachFeature()) { - deny(LOCK_LOST_MSG, true); + if (!extendAllowed(holdSec, callback)) { return; } @@ -580,19 +477,9 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy } } - /** - * Attaches to the feature instance, if not already attached. - * - * @return {@code true} if the lock is now attached to a feature, {@code false} - * otherwise - */ - private synchronized boolean attachFeature() { - if (feature != null) { - // already attached - return true; - } - - feature = latestInstance; + @Override + protected boolean addToFeature() { + feature = getLatestInstance(); if (feature == null) { logger.warn("no feature yet for {}", this); return false; @@ -613,7 +500,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy logger.debug("schedule lock action {}", this); nretries = 0; request = schedreq; - feature.exsvc.execute(this::doRequest); + getThreadPool().execute(this::doRequest); } /** @@ -633,7 +520,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy if (nretries++ < feature.featProps.getMaxRetries()) { logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this); request = req; - feature.exsvc.schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS); + getThreadPool().schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS); return; } } @@ -752,7 +639,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy } if (success) { - grant(); + grant(true); return; } } @@ -803,7 +690,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy * the record, thus we have to try to insert, if the update fails */ if (doDbUpdate(conn) || doDbInsert(conn)) { - grant(); + grant(true); return; } } @@ -927,10 +814,6 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy return SystemPersistenceConstants.getManager().getProperties(fileName); } - protected ScheduledExecutorService getThreadPool() { - return engine.getExecutorService(); - } - protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback) { return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this); diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java index 0ba92c51..c996d8d9 100644 --- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java +++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java @@ -30,6 +30,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doThrow; @@ -55,6 +56,7 @@ import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -115,16 +117,19 @@ public class DistributedLockManagerTest { private static ScheduledExecutorService realExec; @Mock + private PolicyEngine engine; + + @Mock private ScheduledExecutorService exsvc; @Mock - private LockCallback callback; + private ScheduledFuture<?> checker; @Mock - private BasicDataSource datasrc; + private LockCallback callback; @Mock - private PolicyEngine engine; + private BasicDataSource datasrc; private DistributedLock lock; @@ -153,7 +158,6 @@ public class DistributedLockManagerTest { saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD); realExec = Executors.newScheduledThreadPool(3); - Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec); } /** @@ -181,8 +185,6 @@ public class DistributedLockManagerTest { cleanDb(); - when(engine.getExecutorService()).thenReturn(exsvc); - feature = new MyLockingFeature(true); } @@ -214,52 +216,12 @@ public class DistributedLockManagerTest { .anyMatch(obj -> obj instanceof DistributedLockManager)); } - /** - * Tests constructor() when properties are invalid. - */ - @Test - public void testDistributedLockManagerInvalidProperties() { - // use properties containing an invalid value - Properties props = new Properties(); - props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "abc"); - - feature = new MyLockingFeature(false) { - @Override - protected Properties getProperties(String fileName) { - return props; - } - }; - - assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class); - } - @Test public void testGetSequenceNumber() { assertEquals(1000, feature.getSequenceNumber()); } @Test - public void testStartableApi() { - assertTrue(feature.isAlive()); - assertTrue(feature.start()); - assertTrue(feature.stop()); - feature.shutdown(); - - // above should have had no effect - assertTrue(feature.isAlive()); - - feature.afterStop(engine); - assertFalse(feature.isAlive()); - } - - @Test - public void testLockApi() { - assertFalse(feature.isLocked()); - assertTrue(feature.lock()); - assertTrue(feature.unlock()); - } - - @Test public void testBeforeCreateLockManager() { assertSame(feature, feature.beforeCreateLockManager(engine, new Properties())); } @@ -298,8 +260,7 @@ public class DistributedLockManagerTest { feature = new MyLockingFeature(false); - when(exsvc.schedule(any(Runnable.class), anyLong(), any())) - .thenThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)); + doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any()); assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class); } @@ -350,6 +311,7 @@ public class DistributedLockManagerTest { @Test public void testAfterStop() { shutdownFeature(); + verify(checker).cancel(anyBoolean()); feature = new DistributedLockManager(); @@ -424,7 +386,7 @@ public class DistributedLockManagerTest { } /** - * Tests lock() when the feature is not the latest instance. + * Tests createLock() when the feature is not the latest instance. */ @Test public void testCreateLockNotLatestInstance() { @@ -522,6 +484,27 @@ public class DistributedLockManagerTest { runChecker(0, 0, RETRY_SEC); } + /** + * Tests checkExpired(), when getConnection() throws an exception and the feature is + * no longer alive. + */ + @Test + public void testCheckExpiredSqlExFeatureStopped() { + // use a data source that throws an exception when getConnection() is called + feature = new InvalidDbLockingFeature(TRANSIENT) { + @Override + protected SQLException makeEx() { + this.stop(); + return super.makeEx(); + } + }; + + runChecker(0, 0, EXPIRE_SEC); + + // it should NOT have scheduled another check + verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any()); + } + @Test public void testExpireLocks() throws SQLException { AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null); @@ -637,20 +620,6 @@ public class DistributedLockManagerTest { verify(callback).lockAvailable(lock); } - /** - * Tests grant() when the lock is already unavailable. - */ - @Test - public void testDistributedLockGrantUnavailable() { - DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); - lock.setState(LockState.UNAVAILABLE); - lock.grant(); - - assertTrue(lock.isUnavailable()); - verify(callback, never()).lockAvailable(any()); - verify(callback, never()).lockUnavailable(any()); - } - @Test public void testDistributedLockDeny() { // get a lock @@ -1464,6 +1433,8 @@ public class DistributedLockManagerTest { */ @Test public void testMultiThreaded() throws InterruptedException { + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec); + feature = new DistributedLockManager(); feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties()); feature.afterStart(PolicyEngineConstants.getManager()); @@ -1661,10 +1632,12 @@ public class DistributedLockManagerTest { shutdownFeature(); exsvc = mock(ScheduledExecutorService.class); - when(engine.getExecutorService()).thenReturn(exsvc); + when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker); + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc); if (init) { beforeCreateLockManager(engine, new Properties()); + start(); afterStart(engine); } } @@ -1685,6 +1658,7 @@ public class DistributedLockManagerTest { this.isTransient = isTransient; this.beforeCreateLockManager(engine, new Properties()); + this.start(); this.afterStart(engine); } @@ -1704,7 +1678,7 @@ public class DistributedLockManagerTest { return datasrc; } - private SQLException makeEx() { + protected SQLException makeEx() { if (isTransient) { return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION)); |