diff options
Diffstat (limited to 'feature-distributed-locking/src/test')
-rw-r--r-- | feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerExceptionTest.java (renamed from feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureExceptionTest.java) | 12 | ||||
-rw-r--r-- | feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java | 1818 | ||||
-rw-r--r-- | feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java | 81 | ||||
-rw-r--r-- | feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java | 107 | ||||
-rw-r--r-- | feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java | 345 | ||||
-rw-r--r-- | feature-distributed-locking/src/test/resources/feature-distributed-locking.properties | 13 |
6 files changed, 1913 insertions, 463 deletions
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureExceptionTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerExceptionTest.java index 7ba2384a..cfd6a151 100644 --- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureExceptionTest.java +++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerExceptionTest.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * feature-distributed-locking * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -24,13 +24,13 @@ import static org.junit.Assert.assertEquals; import org.junit.Test; import org.onap.policy.common.utils.test.ExceptionsTester; -import org.onap.policy.distributed.locking.DistributedLockingFeatureException; +import org.onap.policy.distributed.locking.DistributedLockManagerException; -public class DistributedLockingFeatureExceptionTest extends ExceptionsTester { +public class DistributedLockManagerExceptionTest extends ExceptionsTester { @Test public void test() { - assertEquals(1, test(DistributedLockingFeatureException.class)); + assertEquals(1, test(DistributedLockManagerException.class)); } } diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java new file mode 100644 index 00000000..59b56224 --- /dev/null +++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java @@ -0,0 +1,1818 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distributed.locking; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.dbcp2.BasicDataSource; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.common.utils.services.OrderedServiceImpl; +import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockState; +import org.onap.policy.drools.features.PolicyEngineFeatureApi; +import org.onap.policy.drools.persistence.SystemPersistenceConstants; +import org.onap.policy.drools.system.PolicyEngine; +import org.onap.policy.drools.system.PolicyEngineConstants; +import org.powermock.reflect.Whitebox; + +public class DistributedLockManagerTest { + private static final long EXPIRE_SEC = 900L; + private static final long RETRY_SEC = 60L; + private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService"; + private static final String OTHER_HOST = "other-host"; + private static final String OTHER_OWNER = "other-owner"; + private static final String EXPECTED_EXCEPTION = "expected exception"; + private static final String DB_CONNECTION = + "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling"; + private static final String DB_USER = "user"; + private static final String DB_PASSWORD = "password"; + private static final String OWNER_KEY = "my key"; + private static final String RESOURCE = "my resource"; + private static final String RESOURCE2 = "my resource #2"; + private static final String RESOURCE3 = "my resource #3"; + private static final String RESOURCE4 = "my resource #4"; + private static final String RESOURCE5 = "my resource #5"; + private static final int HOLD_SEC = 100; + private static final int HOLD_SEC2 = 120; + private static final int MAX_THREADS = 5; + private static final int MAX_LOOPS = 100; + private static final int TRANSIENT = 500; + private static final int PERMANENT = 600; + + // number of execute() calls before the first lock attempt + private static final int PRE_LOCK_EXECS = 1; + + // number of execute() calls before the first schedule attempt + private static final int PRE_SCHED_EXECS = 1; + + private static Connection conn = null; + private static ScheduledExecutorService saveExec; + private static ScheduledExecutorService realExec; + + @Mock + private ScheduledExecutorService exsvc; + + @Mock + private LockCallback callback; + + @Mock + private BasicDataSource datasrc; + + @Mock + private PolicyEngine engine; + + private DistributedLock lock; + + private AtomicInteger nactive; + private AtomicInteger nsuccesses; + private DistributedLockManager feature; + + + /** + * Configures the location of the property files and creates the DB. + * + * @throws SQLException if the DB cannot be created + */ + @BeforeClass + public static void setUpBeforeClass() throws SQLException { + SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources"); + + conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD); + + try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks " + + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), " + + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) { + createStmt.executeUpdate(); + } + + saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD); + + realExec = Executors.newScheduledThreadPool(3); + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec); + } + + /** + * Restores static fields. + */ + @AfterClass + public static void tearDownAfterClass() throws SQLException { + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec); + realExec.shutdown(); + conn.close(); + } + + /** + * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute + * tasks. + * + * @throws SQLException if the lock records cannot be deleted from the DB + */ + @Before + public void setUp() throws SQLException { + MockitoAnnotations.initMocks(this); + + nactive = new AtomicInteger(0); + nsuccesses = new AtomicInteger(0); + + cleanDb(); + + when(engine.getExecutorService()).thenReturn(exsvc); + + feature = new MyLockingFeature(true); + } + + @After + public void tearDown() throws SQLException { + shutdownFeature(); + cleanDb(); + } + + private void cleanDb() throws SQLException { + try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) { + stmt.executeUpdate(); + } + } + + private void shutdownFeature() { + if (feature != null) { + feature.afterStop(engine); + feature = null; + } + } + + /** + * Tests that the feature is found in the expected service sets. + */ + @Test + public void testServiceApis() { + assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream() + .anyMatch(obj -> obj instanceof DistributedLockManager)); + } + + /** + * Tests constructor() when properties are invalid. + */ + @Test + public void testDistributedLockManagerInvalidProperties() { + // use properties containing an invalid value + Properties props = new Properties(); + props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "abc"); + + feature = new MyLockingFeature(false) { + @Override + protected Properties getProperties(String fileName) { + return props; + } + }; + + assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class); + } + + @Test + public void testGetSequenceNumber() { + assertEquals(1000, feature.getSequenceNumber()); + } + + @Test + public void testStartableApi() { + assertTrue(feature.isAlive()); + assertTrue(feature.start()); + assertTrue(feature.stop()); + feature.shutdown(); + + // above should have had no effect + assertTrue(feature.isAlive()); + + feature.afterStop(engine); + assertFalse(feature.isAlive()); + } + + @Test + public void testLockApi() { + assertFalse(feature.isLocked()); + assertTrue(feature.lock()); + assertTrue(feature.unlock()); + } + + @Test + public void testBeforeCreateLockManager() { + assertSame(feature, feature.beforeCreateLockManager(engine, new Properties())); + } + + /** + * Tests beforeCreate(), when getProperties() throws a runtime exception. + */ + @Test + public void testBeforeCreateLockManagerEx() { + shutdownFeature(); + + feature = new MyLockingFeature(false) { + @Override + protected Properties getProperties(String fileName) { + throw new IllegalArgumentException(EXPECTED_EXCEPTION); + } + }; + + assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, new Properties())) + .isInstanceOf(DistributedLockManagerException.class); + } + + @Test + public void testAfterStart() { + // verify that cleanup & expire check are both added to the queue + verify(exsvc).execute(any()); + verify(exsvc).schedule(any(Runnable.class), anyLong(), any()); + } + + /** + * Tests afterStart(), when thread pool throws a runtime exception. + */ + @Test + public void testAfterStartExInThreadPool() { + shutdownFeature(); + + feature = new MyLockingFeature(false); + + when(exsvc.schedule(any(Runnable.class), anyLong(), any())) + .thenThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)); + + assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class); + } + + @Test + public void testDeleteExpiredDbLocks() throws SQLException { + // add records: two expired, one not + insertRecord(RESOURCE, feature.getUuidString(), -1); + insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2); + insertRecord(RESOURCE3, OTHER_OWNER, 0); + insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC); + + // get the clean-up function and execute it + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc).execute(captor.capture()); + + long tbegin = System.currentTimeMillis(); + Runnable action = captor.getValue(); + action.run(); + + assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin)); + assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin)); + assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin)); + assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin)); + + assertEquals(2, getRecordCount()); + } + + /** + * Tests deleteExpiredDbLocks(), when getConnection() throws an exception. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDeleteExpiredDbLocksEx() throws SQLException { + feature = new InvalidDbLockingFeature(TRANSIENT); + + // get the clean-up function and execute it + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc).execute(captor.capture()); + + Runnable action = captor.getValue(); + + // should not throw an exception + action.run(); + } + + @Test + public void testAfterStop() { + shutdownFeature(); + + feature = new DistributedLockManager(); + + // shutdown without calling afterStart() + + shutdownFeature(); + } + + /** + * Tests afterStop(), when the data source throws an exception when close() is called. + * + * @throws SQLException if an error occurs + */ + @Test + public void testAfterStopEx() throws SQLException { + shutdownFeature(); + + // use a data source that throws an exception when closed + feature = new InvalidDbLockingFeature(TRANSIENT); + + shutdownFeature(); + } + + @Test + public void testCreateLock() throws SQLException { + verify(exsvc).execute(any()); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock.isWaiting()); + + verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any()); + + // this lock should fail + LockCallback callback2 = mock(LockCallback.class); + DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false); + assertTrue(lock2.isUnavailable()); + verify(callback2, never()).lockAvailable(lock2); + verify(callback2).lockUnavailable(lock2); + + // this should fail, too + LockCallback callback3 = mock(LockCallback.class); + DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false); + assertTrue(lock3.isUnavailable()); + verify(callback3, never()).lockAvailable(lock3); + verify(callback3).lockUnavailable(lock3); + + // no change to first + assertTrue(lock.isWaiting()); + + // no callbacks to the first lock + verify(callback, never()).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + assertTrue(lock.isWaiting()); + assertEquals(0, getRecordCount()); + + runLock(0, 0); + assertTrue(lock.isActive()); + assertEquals(1, getRecordCount()); + + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // this should succeed + DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock4.isWaiting()); + + // after running checker, original records should still remain + runChecker(0, 0, EXPIRE_SEC); + assertEquals(1, getRecordCount()); + verify(callback, never()).lockUnavailable(lock); + } + + /** + * Tests lock() when the feature is not the latest instance. + */ + @Test + public void testCreateLockNotLatestInstance() { + DistributedLockManager.setLatestInstance(null); + + Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock.isUnavailable()); + verify(callback, never()).lockAvailable(any()); + verify(callback).lockUnavailable(lock); + } + + @Test + public void testCheckExpired() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + LockCallback callback2 = mock(LockCallback.class); + final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false); + runLock(1, 0); + + LockCallback callback3 = mock(LockCallback.class); + final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false); + runLock(2, 0); + + LockCallback callback4 = mock(LockCallback.class); + final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false); + runLock(3, 0); + + LockCallback callback5 = mock(LockCallback.class); + final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false); + runLock(4, 0); + + assertEquals(5, getRecordCount()); + + // expire one record + updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1); + + // change host of another record + updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC); + + // change uuid of another record + updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC); + + + // run the checker + runChecker(0, 0, EXPIRE_SEC); + + + // check lock states + assertTrue(lock.isUnavailable()); + assertTrue(lock2.isActive()); + assertTrue(lock3.isUnavailable()); + assertTrue(lock4.isActive()); + assertTrue(lock5.isUnavailable()); + + // allow callbacks + runLock(5, 2); + runLock(6, 1); + runLock(7, 0); + verify(callback).lockUnavailable(lock); + verify(callback3).lockUnavailable(lock3); + verify(callback5).lockUnavailable(lock5); + + verify(callback2, never()).lockUnavailable(lock2); + verify(callback4, never()).lockUnavailable(lock4); + + + // another check should have been scheduled, with the normal interval + runChecker(1, 0, EXPIRE_SEC); + } + + /** + * Tests checkExpired(), when schedule() throws an exception. + */ + @Test + public void testCheckExpiredExecRejected() { + // arrange for execution to be rejected + when(exsvc.schedule(any(Runnable.class), anyLong(), any())) + .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION)); + + runChecker(0, 0, EXPIRE_SEC); + } + + /** + * Tests checkExpired(), when getConnection() throws an exception. + */ + @Test + public void testCheckExpiredSqlEx() { + // use a data source that throws an exception when getConnection() is called + feature = new InvalidDbLockingFeature(TRANSIENT); + + runChecker(0, 0, EXPIRE_SEC); + + // it should have scheduled another check, sooner + runChecker(0, 0, RETRY_SEC); + } + + @Test + public void testExpireLocks() throws SQLException { + AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null); + + feature = new MyLockingFeature(true) { + @Override + protected BasicDataSource makeDataSource() throws Exception { + // get the real data source + BasicDataSource src2 = super.makeDataSource(); + + when(datasrc.getConnection()).thenAnswer(answer -> { + DistributedLock lck = freeLock.getAndSet(null); + if (lck != null) { + // free it + lck.free(); + + // run its doUnlock + runLock(4, 0); + } + + return src2.getConnection(); + }); + + return datasrc; + } + }; + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + LockCallback callback2 = mock(LockCallback.class); + final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false); + runLock(1, 0); + + LockCallback callback3 = mock(LockCallback.class); + final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false); + // don't run doLock for lock3 - leave it in the waiting state + + LockCallback callback4 = mock(LockCallback.class); + final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false); + runLock(3, 0); + + assertEquals(3, getRecordCount()); + + // expire one record + updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1); + + // arrange to free lock4 while the checker is running + freeLock.set(lock4); + + // run the checker + runChecker(0, 0, EXPIRE_SEC); + + + // check lock states + assertTrue(lock.isUnavailable()); + assertTrue(lock2.isActive()); + assertTrue(lock3.isWaiting()); + assertTrue(lock4.isUnavailable()); + + runLock(5, 0); + verify(exsvc, times(PRE_LOCK_EXECS + 6)).execute(any()); + + verify(callback).lockUnavailable(lock); + verify(callback2, never()).lockUnavailable(lock2); + verify(callback3, never()).lockUnavailable(lock3); + verify(callback4, never()).lockUnavailable(lock4); + } + + @Test + public void testDistributedLockNoArgs() { + DistributedLock lock = new DistributedLock(); + assertNull(lock.getResourceId()); + assertNull(lock.getOwnerKey()); + assertNull(lock.getCallback()); + assertEquals(0, lock.getHoldSec()); + } + + @Test + public void testDistributedLock() { + assertThatIllegalArgumentException() + .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false)) + .withMessageContaining("holdSec"); + + // should generate no exception + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + } + + @Test + public void testDistributedLockSerializable() throws Exception { + DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + lock = roundTrip(lock); + + assertTrue(lock.isWaiting()); + + assertEquals(RESOURCE, lock.getResourceId()); + assertEquals(OWNER_KEY, lock.getOwnerKey()); + assertNull(lock.getCallback()); + assertEquals(HOLD_SEC, lock.getHoldSec()); + } + + @Test + public void testGrant() { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertFalse(lock.isActive()); + + // execute the doLock() call + runLock(0, 0); + + assertTrue(lock.isActive()); + + // the callback for the lock should have been run in the foreground thread + verify(callback).lockAvailable(lock); + } + + /** + * Tests grant() when the lock is already unavailable. + */ + @Test + public void testDistributedLockGrantUnavailable() { + DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + lock.setState(LockState.UNAVAILABLE); + lock.grant(); + + assertTrue(lock.isUnavailable()); + verify(callback, never()).lockAvailable(any()); + verify(callback, never()).lockUnavailable(any()); + } + + @Test + public void testDistributedLockDeny() { + // get a lock + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // get another lock - should fail + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + assertTrue(lock.isUnavailable()); + + // the callback for the second lock should have been run in the foreground thread + verify(callback).lockUnavailable(lock); + + // should only have a request for the first lock + verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any()); + } + + @Test + public void testDistributedLockFree() { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + assertTrue(lock.free()); + assertTrue(lock.isUnavailable()); + + // run both requests associated with the lock + runLock(0, 1); + runLock(1, 0); + + // should not have changed state + assertTrue(lock.isUnavailable()); + + // attempt to free it again + assertFalse(lock.free()); + + // should not have queued anything else + verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any()); + + // new lock should succeed + DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock2 != lock); + assertTrue(lock2.isWaiting()); + } + + /** + * Tests that free() works on a serialized lock with a new feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testDistributedLockFreeSerialized() throws Exception { + DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + feature = new MyLockingFeature(true); + + lock = roundTrip(lock); + assertTrue(lock.free()); + assertTrue(lock.isUnavailable()); + } + + /** + * Tests free() on a serialized lock without a feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testDistributedLockFreeNoFeature() throws Exception { + DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + DistributedLockManager.setLatestInstance(null); + + lock = roundTrip(lock); + assertFalse(lock.free()); + assertTrue(lock.isUnavailable()); + } + + /** + * Tests the case where the lock is freed and doUnlock called between the call to + * isUnavailable() and the call to compute(). + */ + @Test + public void testDistributedLockFreeUnlocked() { + feature = new FreeWithFreeLockingFeature(true); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + assertFalse(lock.free()); + assertTrue(lock.isUnavailable()); + } + + /** + * Tests the case where the lock is freed, but doUnlock is not completed, between the + * call to isUnavailable() and the call to compute(). + */ + @Test + public void testDistributedLockFreeLockFreed() { + feature = new FreeWithFreeLockingFeature(false); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + assertFalse(lock.free()); + assertTrue(lock.isUnavailable()); + } + + @Test + public void testDistributedLockExtend() { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // lock2 should be denied - called back by this thread + DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + verify(callback, never()).lockAvailable(lock2); + verify(callback).lockUnavailable(lock2); + + // lock2 will still be denied - called back by this thread + lock2.extend(HOLD_SEC, callback); + verify(callback, times(2)).lockUnavailable(lock2); + + // force lock2 to be active - should still be denied + Whitebox.setInternalState(lock2, "state", LockState.ACTIVE); + lock2.extend(HOLD_SEC, callback); + verify(callback, times(3)).lockUnavailable(lock2); + + assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback)) + .withMessageContaining("holdSec"); + + // execute doLock() + runLock(0, 0); + assertTrue(lock.isActive()); + + // now extend the first lock + LockCallback callback2 = mock(LockCallback.class); + lock.extend(HOLD_SEC2, callback2); + assertTrue(lock.isWaiting()); + + // execute doExtend() + runLock(1, 0); + lock.extend(HOLD_SEC2, callback2); + assertEquals(HOLD_SEC2, lock.getHoldSec()); + verify(callback2).lockAvailable(lock); + verify(callback2, never()).lockUnavailable(lock); + } + + /** + * Tests that extend() works on a serialized lock with a new feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testDistributedLockExtendSerialized() throws Exception { + DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // run doLock + runLock(0, 0); + assertTrue(lock.isActive()); + + feature = new MyLockingFeature(true); + + lock = roundTrip(lock); + assertTrue(lock.isActive()); + + LockCallback scallback = mock(LockCallback.class); + + lock.extend(HOLD_SEC, scallback); + assertTrue(lock.isWaiting()); + + // run doExtend (in new feature) + runLock(0, 0); + assertTrue(lock.isActive()); + + verify(scallback).lockAvailable(lock); + verify(scallback, never()).lockUnavailable(lock); + } + + /** + * Tests extend() on a serialized lock without a feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testDistributedLockExtendNoFeature() throws Exception { + DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // run doLock + runLock(0, 0); + assertTrue(lock.isActive()); + + DistributedLockManager.setLatestInstance(null); + + lock = roundTrip(lock); + assertTrue(lock.isActive()); + + LockCallback scallback = mock(LockCallback.class); + + lock.extend(HOLD_SEC, scallback); + assertTrue(lock.isUnavailable()); + + verify(scallback, never()).lockAvailable(lock); + verify(scallback).lockUnavailable(lock); + } + + /** + * Tests the case where the lock is freed and doUnlock called between the call to + * isUnavailable() and the call to compute(). + */ + @Test + public void testDistributedLockExtendUnlocked() { + feature = new FreeWithFreeLockingFeature(true); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + lock.extend(HOLD_SEC2, callback); + assertTrue(lock.isUnavailable()); + verify(callback).lockUnavailable(lock); + } + + /** + * Tests the case where the lock is freed, but doUnlock is not completed, between the + * call to isUnavailable() and the call to compute(). + */ + @Test + public void testDistributedLockExtendLockFreed() { + feature = new FreeWithFreeLockingFeature(false); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + lock.extend(HOLD_SEC2, callback); + assertTrue(lock.isUnavailable()); + verify(callback).lockUnavailable(lock); + } + + @Test + public void testDistributedLockScheduleRequest() { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + verify(callback).lockAvailable(lock); + } + + @Test + public void testDistributedLockRescheduleRequest() throws SQLException { + // use a data source that throws an exception when getConnection() is called + InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT); + feature = invfeat; + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should fail and reschedule + runLock(0, 0); + + // should still be waiting + assertTrue(lock.isWaiting()); + verify(callback, never()).lockUnavailable(lock); + + // free the lock while doLock is executing + invfeat.freeLock = true; + + // try scheduled request - should just invoke doUnlock + runSchedule(0, 0); + + // should still be waiting + assertTrue(lock.isUnavailable()); + verify(callback, never()).lockUnavailable(lock); + + // should have scheduled a retry of doUnlock + verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any()); + } + + @Test + public void testDistributedLockGetNextRequest() { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + /* + * run doLock. This should cause getNextRequest() to be called twice, once with a + * request in the queue, and the second time with request=null. + */ + runLock(0, 0); + } + + /** + * Tests getNextRequest(), where the same request is still in the queue the second + * time it's called. + */ + @Test + public void testDistributedLockGetNextRequestSameRequest() { + // force reschedule to be invoked + feature = new InvalidDbLockingFeature(TRANSIENT); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + /* + * run doLock. This should cause getNextRequest() to be called twice, once with a + * request in the queue, and the second time with the same request again. + */ + runLock(0, 0); + + verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any()); + } + + @Test + public void testDistributedLockDoRequest() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + assertTrue(lock.isWaiting()); + + // run doLock via doRequest + runLock(0, 0); + + assertTrue(lock.isActive()); + } + + /** + * Tests doRequest(), when doRequest() is already running within another thread. + */ + @Test + public void testDistributedLockDoRequestBusy() { + /* + * this feature will invoke a request in a background thread while it's being run + * in a foreground thread. + */ + AtomicBoolean running = new AtomicBoolean(false); + AtomicBoolean returned = new AtomicBoolean(false); + + feature = new MyLockingFeature(true) { + @Override + protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) { + private static final long serialVersionUID = 1L; + + @Override + protected boolean doDbInsert(Connection conn) throws SQLException { + if (running.get()) { + // already inside the thread - don't recurse any further + return super.doDbInsert(conn); + } + + running.set(true); + + Thread thread = new Thread(() -> { + // run doLock from within the new thread + runLock(0, 0); + }); + thread.setDaemon(true); + thread.start(); + + // wait for the background thread to complete before continuing + try { + thread.join(5000); + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + + returned.set(!thread.isAlive()); + + return super.doDbInsert(conn); + } + }; + } + }; + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // run doLock + runLock(0, 0); + + assertTrue(returned.get()); + } + + /** + * Tests doRequest() when an exception occurs while the lock is in the WAITING state. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoRequestRunExWaiting() throws SQLException { + // throw run-time exception + when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION)); + + // use a data source that throws an exception when getConnection() is called + feature = new MyLockingFeature(true) { + @Override + protected BasicDataSource makeDataSource() throws Exception { + return datasrc; + } + }; + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should NOT reschedule + runLock(0, 0); + + assertTrue(lock.isUnavailable()); + verify(callback).lockUnavailable(lock); + + verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any()); + } + + /** + * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE + * state. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoRequestRunExUnavailable() throws SQLException { + // throw run-time exception + when(datasrc.getConnection()).thenAnswer(answer -> { + lock.free(); + throw new IllegalStateException(EXPECTED_EXCEPTION); + }); + + // use a data source that throws an exception when getConnection() is called + feature = new MyLockingFeature(true) { + @Override + protected BasicDataSource makeDataSource() throws Exception { + return datasrc; + } + }; + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should NOT reschedule + runLock(0, 0); + + assertTrue(lock.isUnavailable()); + verify(callback, never()).lockUnavailable(lock); + + verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any()); + } + + /** + * Tests doRequest() when the retry count gets exhausted. + */ + @Test + public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() { + // use a data source that throws an exception when getConnection() is called + feature = new InvalidDbLockingFeature(TRANSIENT); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should fail and reschedule + runLock(0, 0); + + // should still be waiting + assertTrue(lock.isWaiting()); + verify(callback, never()).lockUnavailable(lock); + + // try again, via SCHEDULER - first retry fails + runSchedule(0, 0); + + // should still be waiting + assertTrue(lock.isWaiting()); + verify(callback, never()).lockUnavailable(lock); + + // try again, via SCHEDULER - final retry fails + runSchedule(1, 0); + assertTrue(lock.isUnavailable()); + + // now callback should have been called + verify(callback).lockUnavailable(lock); + } + + /** + * Tests doRequest() when a non-transient DB exception is thrown. + */ + @Test + public void testDistributedLockDoRequestNotTransient() { + /* + * use a data source that throws a PERMANENT exception when getConnection() is + * called + */ + feature = new InvalidDbLockingFeature(PERMANENT); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should fail + runLock(0, 0); + + assertTrue(lock.isUnavailable()); + verify(callback).lockUnavailable(lock); + + // should not have scheduled anything new + verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any()); + verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any()); + } + + @Test + public void testDistributedLockDoLock() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should simply do an insert + long tbegin = System.currentTimeMillis(); + runLock(0, 0); + + assertEquals(1, getRecordCount()); + assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin)); + verify(callback).lockAvailable(lock); + } + + /** + * Tests doLock() when the lock is freed before doLock runs. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoLockFreed() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + lock.setState(LockState.UNAVAILABLE); + + // invoke doLock - should do nothing + runLock(0, 0); + + assertEquals(0, getRecordCount()); + + verify(callback, never()).lockAvailable(lock); + } + + /** + * Tests doLock() when a DB exception is thrown. + */ + @Test + public void testDistributedLockDoLockEx() { + // use a data source that throws an exception when getConnection() is called + feature = new InvalidDbLockingFeature(PERMANENT); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should simply do an insert + runLock(0, 0); + + // lock should have failed due to exception + verify(callback).lockUnavailable(lock); + } + + /** + * Tests doLock() when an (expired) record already exists, thus requiring doUpdate() + * to be called. + */ + @Test + public void testDistributedLockDoLockNeedingUpdate() throws SQLException { + // insert an expired record + insertRecord(RESOURCE, feature.getUuidString(), 0); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should simply do an update + runLock(0, 0); + verify(callback).lockAvailable(lock); + } + + /** + * Tests doLock() when a locked record already exists. + */ + @Test + public void testDistributedLockDoLockAlreadyLocked() throws SQLException { + // insert an expired record + insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock + runLock(0, 0); + + // lock should have failed because it's already locked + verify(callback).lockUnavailable(lock); + } + + @Test + public void testDistributedLockDoUnlock() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock() + runLock(0, 0); + + lock.free(); + + // invoke doUnlock() + long tbegin = System.currentTimeMillis(); + runLock(1, 0); + + assertEquals(0, getRecordCount()); + assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin)); + + assertTrue(lock.isUnavailable()); + + // no more callbacks should have occurred + verify(callback, times(1)).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + } + + /** + * Tests doUnlock() when a DB exception is thrown. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoUnlockEx() throws SQLException { + feature = new InvalidDbLockingFeature(PERMANENT); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // do NOT invoke doLock() - it will fail without a DB connection + + lock.free(); + + // invoke doUnlock() + runLock(1, 0); + + assertTrue(lock.isUnavailable()); + + // no more callbacks should have occurred + verify(callback, never()).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + } + + @Test + public void testDistributedLockDoExtend() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + LockCallback callback2 = mock(LockCallback.class); + lock.extend(HOLD_SEC2, callback2); + + // call doExtend() + long tbegin = System.currentTimeMillis(); + runLock(1, 0); + + assertEquals(1, getRecordCount()); + assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin)); + + assertTrue(lock.isActive()); + + // no more callbacks should have occurred + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // extension should have succeeded + verify(callback2).lockAvailable(lock); + verify(callback2, never()).lockUnavailable(lock); + } + + /** + * Tests doExtend() when the lock is freed before doExtend runs. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoExtendFreed() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + lock.extend(HOLD_SEC2, callback); + + lock.setState(LockState.UNAVAILABLE); + + // invoke doExtend - should do nothing + runLock(1, 0); + + assertEquals(0, getRecordCount()); + + verify(callback, never()).lockAvailable(lock); + } + + /** + * Tests doExtend() when the lock record is missing from the DB, thus requiring an + * insert. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoExtendInsertNeeded() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + LockCallback callback2 = mock(LockCallback.class); + lock.extend(HOLD_SEC2, callback2); + + // delete the record so it's forced to re-insert it + cleanDb(); + + // call doExtend() + long tbegin = System.currentTimeMillis(); + runLock(1, 0); + + assertEquals(1, getRecordCount()); + assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin)); + + assertTrue(lock.isActive()); + + // no more callbacks should have occurred + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // extension should have succeeded + verify(callback2).lockAvailable(lock); + verify(callback2, never()).lockUnavailable(lock); + } + + /** + * Tests doExtend() when both update and insert fail. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException { + /* + * this feature will create a lock that returns false when doDbUpdate() is + * invoked, or when doDbInsert() is invoked a second time + */ + feature = new MyLockingFeature(true) { + @Override + protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) { + private static final long serialVersionUID = 1L; + private int ntimes = 0; + + @Override + protected boolean doDbInsert(Connection conn) throws SQLException { + if (ntimes++ > 0) { + return false; + } + + return super.doDbInsert(conn); + } + + @Override + protected boolean doDbUpdate(Connection conn) { + return false; + } + }; + } + }; + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + LockCallback callback2 = mock(LockCallback.class); + lock.extend(HOLD_SEC2, callback2); + + // call doExtend() + runLock(1, 0); + + assertTrue(lock.isUnavailable()); + + // no more callbacks should have occurred + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // extension should have failed + verify(callback2, never()).lockAvailable(lock); + verify(callback2).lockUnavailable(lock); + } + + /** + * Tests doExtend() when an exception occurs. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoExtendEx() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + /* + * delete the record and insert one with a different owner, which will cause + * doDbInsert() to throw an exception + */ + cleanDb(); + insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC); + + LockCallback callback2 = mock(LockCallback.class); + lock.extend(HOLD_SEC2, callback2); + + // call doExtend() + runLock(1, 0); + + assertTrue(lock.isUnavailable()); + + // no more callbacks should have occurred + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // extension should have failed + verify(callback2, never()).lockAvailable(lock); + verify(callback2).lockUnavailable(lock); + } + + @Test + public void testDistributedLockToString() { + String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString(); + assertNotNull(text); + assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback"); + } + + @Test + public void testMakeThreadPool() { + // use a REAL feature to test this + feature = new DistributedLockManager(); + + // this should create a thread pool + feature.beforeCreateLockManager(engine, new Properties()); + feature.afterStart(engine); + + shutdownFeature(); + } + + /** + * Performs a multi-threaded test of the locking facility. + * + * @throws InterruptedException if the current thread is interrupted while waiting for + * the background threads to complete + */ + @Test + public void testMultiThreaded() throws InterruptedException { + feature = new DistributedLockManager(); + feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties()); + feature.afterStart(PolicyEngineConstants.getManager()); + + List<MyThread> threads = new ArrayList<>(MAX_THREADS); + for (int x = 0; x < MAX_THREADS; ++x) { + threads.add(new MyThread()); + } + + threads.forEach(Thread::start); + + for (MyThread thread : threads) { + thread.join(6000); + assertFalse(thread.isAlive()); + } + + for (MyThread thread : threads) { + if (thread.err != null) { + throw thread.err; + } + } + + assertTrue(nsuccesses.get() > 0); + } + + private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback, + boolean waitForLock) { + return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock); + } + + private DistributedLock roundTrip(DistributedLock lock) throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(lock); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + try (ObjectInputStream ois = new ObjectInputStream(bais)) { + return (DistributedLock) ois.readObject(); + } + } + + /** + * Runs the checkExpired() action. + * + * @param nskip number of actions in the work queue to skip + * @param nadditional number of additional actions that appear in the work queue + * <i>after</i> the checkExpired action + * @param schedSec number of seconds for which the checker should have been scheduled + */ + private void runChecker(int nskip, int nadditional, long schedSec) { + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS)); + Runnable action = captor.getAllValues().get(nskip); + action.run(); + } + + /** + * Runs a lock action (e.g., doLock, doUnlock). + * + * @param nskip number of actions in the work queue to skip + * @param nadditional number of additional actions that appear in the work queue + * <i>after</i> the desired action + */ + void runLock(int nskip, int nadditional) { + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture()); + + Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip); + action.run(); + } + + /** + * Runs a scheduled action (e.g., "retry" action). + * + * @param nskip number of actions in the work queue to skip + * @param nadditional number of additional actions that appear in the work queue + * <i>after</i> the desired action + */ + void runSchedule(int nskip, int nadditional) { + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any()); + + Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip); + action.run(); + } + + /** + * Gets a count of the number of lock records in the DB. + * + * @return the number of lock records in the DB + * @throws SQLException if an error occurs accessing the DB + */ + private int getRecordCount() throws SQLException { + try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks"); + ResultSet result = stmt.executeQuery()) { + + if (result.next()) { + return result.getInt(1); + + } else { + return 0; + } + } + } + + /** + * Determines if there is a record for the given resource whose expiration time is in + * the expected range. + * + * @param resourceId ID of the resource of interest + * @param uuidString UUID string of the owner + * @param holdSec seconds for which the lock was to be held + * @param tbegin earliest time, in milliseconds, at which the record could have been + * inserted into the DB + * @return {@code true} if a record is found, {@code false} otherwise + * @throws SQLException if an error occurs accessing the DB + */ + private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException { + try (PreparedStatement stmt = + conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks" + + " WHERE resourceId=? AND host=? AND owner=?")) { + + stmt.setString(1, resourceId); + stmt.setString(2, feature.getHostName()); + stmt.setString(3, uuidString); + + try (ResultSet result = stmt.executeQuery()) { + if (result.next()) { + int remaining = result.getInt(1); + long maxDiff = System.currentTimeMillis() - tbegin; + return (remaining >= 0 && holdSec - remaining <= maxDiff); + + } else { + return false; + } + } + } + } + + /** + * Inserts a record into the DB. + * + * @param resourceId ID of the resource of interest + * @param uuidString UUID string of the owner + * @param expireOffset offset, in seconds, from "now", at which the lock should expire + * @throws SQLException if an error occurs accessing the DB + */ + private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException { + this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset); + } + + private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset) + throws SQLException { + try (PreparedStatement stmt = + conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) " + + "values (?, ?, ?, timestampadd(second, ?, now()))")) { + + stmt.setString(1, resourceId); + stmt.setString(2, hostName); + stmt.setString(3, uuidString); + stmt.setInt(4, expireOffset); + + assertEquals(1, stmt.executeUpdate()); + } + } + + /** + * Updates a record in the DB. + * + * @param resourceId ID of the resource of interest + * @param newUuid UUID string of the <i>new</i> owner + * @param expireOffset offset, in seconds, from "now", at which the lock should expire + * @throws SQLException if an error occurs accessing the DB + */ + private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException { + try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?," + + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) { + + stmt.setString(1, newHost); + stmt.setString(2, newUuid); + stmt.setInt(3, expireOffset); + stmt.setString(4, resourceId); + + assertEquals(1, stmt.executeUpdate()); + } + } + + /** + * Feature that uses <i>exsvc</i> to execute requests. + */ + private class MyLockingFeature extends DistributedLockManager { + + public MyLockingFeature(boolean init) { + shutdownFeature(); + + exsvc = mock(ScheduledExecutorService.class); + when(engine.getExecutorService()).thenReturn(exsvc); + + if (init) { + beforeCreateLockManager(engine, new Properties()); + afterStart(engine); + } + } + } + + /** + * Feature whose data source all throws exceptions. + */ + private class InvalidDbLockingFeature extends MyLockingFeature { + private int errcode; + private boolean freeLock = false; + + public InvalidDbLockingFeature(int errcode) { + // pass "false" because we have to set the error code BEFORE calling + // afterStart() + super(false); + + this.errcode = errcode; + + this.beforeCreateLockManager(engine, new Properties()); + this.afterStart(engine); + } + + @Override + protected BasicDataSource makeDataSource() throws Exception { + when(datasrc.getConnection()).thenAnswer(answer -> { + if (freeLock) { + freeLock = false; + lock.free(); + } + + throw new SQLException(EXPECTED_EXCEPTION, "", errcode); + }); + + doThrow(new SQLException(EXPECTED_EXCEPTION, "", errcode)).when(datasrc).close(); + + return datasrc; + } + } + + /** + * Feature whose locks free themselves while free() is already running. + */ + private class FreeWithFreeLockingFeature extends MyLockingFeature { + private boolean relock; + + public FreeWithFreeLockingFeature(boolean relock) { + super(true); + this.relock = relock; + } + + @Override + protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + + return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) { + private static final long serialVersionUID = 1L; + private boolean checked = false; + + @Override + public boolean isUnavailable() { + if (checked) { + return super.isUnavailable(); + } + + checked = true; + + // release and relock + free(); + + if (relock) { + // run doUnlock + runLock(1, 0); + + // relock it + createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false); + } + + return false; + } + }; + } + } + + /** + * Thread used with the multi-threaded test. It repeatedly attempts to get a lock, + * extend it, and then unlock it. + */ + private class MyThread extends Thread { + AssertionError err = null; + + public MyThread() { + setDaemon(true); + } + + @Override + public void run() { + try { + for (int x = 0; x < MAX_LOOPS; ++x) { + makeAttempt(); + } + + } catch (AssertionError e) { + err = e; + } + } + + private void makeAttempt() { + try { + Semaphore sem = new Semaphore(0); + + LockCallback cb = new LockCallback() { + @Override + public void lockAvailable(Lock lock) { + sem.release(); + } + + @Override + public void lockUnavailable(Lock lock) { + sem.release(); + } + }; + + Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false); + + // wait for callback, whether available or unavailable + assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS)); + if (!lock.isActive()) { + return; + } + + nsuccesses.incrementAndGet(); + + assertEquals(1, nactive.incrementAndGet()); + + lock.extend(HOLD_SEC2, cb); + assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS)); + assertTrue(lock.isActive()); + + // decrement BEFORE free() + nactive.decrementAndGet(); + + assertTrue(lock.free()); + assertTrue(lock.isUnavailable()); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("interrupted", e); + } + } + } +} diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java new file mode 100644 index 00000000..5f76d657 --- /dev/null +++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java @@ -0,0 +1,81 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distributed.locking; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Properties; +import java.util.TreeSet; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.utils.properties.exception.PropertyException; + +public class DistributedLockPropertiesTest { + + private Properties props; + + /** + * Populates {@link #props}. + */ + @Before + public void setUp() { + props = new Properties(); + + props.setProperty(DistributedLockProperties.DB_DRIVER, "my driver"); + props.setProperty(DistributedLockProperties.DB_URL, "my url"); + props.setProperty(DistributedLockProperties.DB_USER, "my user"); + props.setProperty(DistributedLockProperties.DB_PASS, "my pass"); + props.setProperty(DistributedLockProperties.TRANSIENT_ERROR_CODES, "10,-20,,,30"); + props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "100"); + props.setProperty(DistributedLockProperties.RETRY_SEC, "200"); + props.setProperty(DistributedLockProperties.MAX_RETRIES, "300"); + } + + @Test + public void test() throws PropertyException { + DistributedLockProperties dlp = new DistributedLockProperties(props); + + assertEquals("my driver", dlp.getDbDriver()); + assertEquals("my url", dlp.getDbUrl()); + assertEquals("my user", dlp.getDbUser()); + assertEquals("my pass", dlp.getDbPwd()); + assertEquals("10,-20,,,30", dlp.getErrorCodeStrings()); + assertEquals("[-20, 10, 30]", new TreeSet<>(dlp.getTransientErrorCodes()).toString()); + assertEquals(100, dlp.getExpireCheckSec()); + assertEquals(200, dlp.getRetrySec()); + assertEquals(300, dlp.getMaxRetries()); + + assertTrue(dlp.isTransient(10)); + assertTrue(dlp.isTransient(-20)); + assertTrue(dlp.isTransient(30)); + + assertFalse(dlp.isTransient(-10)); + + // invalid value + props.setProperty(DistributedLockProperties.TRANSIENT_ERROR_CODES, "10,abc,30"); + + assertThatThrownBy(() -> new DistributedLockProperties(props)).isInstanceOf(PropertyException.class) + .hasMessageContaining(DistributedLockProperties.TRANSIENT_ERROR_CODES); + } +} diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java deleted file mode 100644 index 68a5a31b..00000000 --- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.distributed.locking; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.sql.SQLException; -import org.apache.commons.dbcp2.BasicDataSource; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.onap.policy.common.utils.properties.exception.PropertyException; -import org.onap.policy.drools.persistence.SystemPersistenceConstants; - -/** - * Partially tests DistributedLockingFeature; most of the methods are tested via - * {@link TargetLockTest}. - */ -public class DistributedLockingFeatureTest { - private static final String EXPECTED = "expected exception"; - - private BasicDataSource dataSrc; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources"); - } - - @Before - public void setUp() throws Exception { - dataSrc = mock(BasicDataSource.class); - } - - @Test - public void testGetSequenceNumber() { - assertEquals(1000, new DistributedLockingFeature().getSequenceNumber()); - } - - @Test(expected = DistributedLockingFeatureException.class) - public void testAfterStart_PropEx() { - new DistributedLockingFeatureImpl(new PropertyException("prop", "val")).afterStart(null); - } - - @Test(expected = DistributedLockingFeatureException.class) - public void testAfterStart_InterruptEx() { - new DistributedLockingFeatureImpl(new InterruptedException(EXPECTED)).afterStart(null); - } - - @Test(expected = DistributedLockingFeatureException.class) - public void testAfterStart_OtherEx() { - new DistributedLockingFeatureImpl(new RuntimeException(EXPECTED)).afterStart(null); - } - - @Test - public void testCleanLockTable() throws Exception { - when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED)); - - new DistributedLockingFeatureImpl().afterStart(null); - } - - /** - * Feature that overrides {@link #makeDataSource()}. - */ - private class DistributedLockingFeatureImpl extends DistributedLockingFeature { - /** - * Exception to throw when {@link #makeDataSource()} is invoked. - */ - private final Exception makeEx; - - public DistributedLockingFeatureImpl() { - makeEx = null; - } - - public DistributedLockingFeatureImpl(Exception ex) { - this.makeEx = ex; - } - - @Override - protected BasicDataSource makeDataSource() throws Exception { - if (makeEx != null) { - throw makeEx; - } - - return dataSrc; - } - } -} diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java deleted file mode 100644 index 84eba6b6..00000000 --- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java +++ /dev/null @@ -1,345 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * feature-distributed-locking - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.distributed.locking; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import org.apache.commons.dbcp2.BasicDataSource; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi.OperResult; -import org.onap.policy.drools.persistence.SystemPersistenceConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TargetLockTest { - private static final Logger logger = LoggerFactory.getLogger(TargetLockTest.class); - private static final int MAX_AGE_SEC = 4 * 60; - private static final String DB_CONNECTION = - "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling"; - private static final String DB_USER = "user"; - private static final String DB_PASSWORD = "password"; - private static final String EXPECTED = "expected exception"; - private static final String MY_RESOURCE = "my-resource-id"; - private static final String MY_OWNER = "my-owner"; - private static final UUID MY_UUID = UUID.randomUUID(); - private static Connection conn = null; - private static DistributedLockingFeature distLockFeat; - - /** - * Setup the database. - */ - @BeforeClass - public static void setup() { - getDbConnection(); - createTable(); - SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources"); - distLockFeat = new DistributedLockingFeature(); - distLockFeat.afterStart(null); - } - - /** - * Cleanup the lock. - */ - @AfterClass - public static void cleanUp() { - distLockFeat.beforeShutdown(null); - try { - conn.close(); - } catch (SQLException e) { - logger.error("Error in TargetLockTest.cleanUp()", e); - } - } - - /** - * Wipe the database. - */ - @Before - public void wipeDb() { - try (PreparedStatement lockDelete = conn.prepareStatement("DELETE FROM pooling.locks"); ) { - lockDelete.executeUpdate(); - } catch (SQLException e) { - logger.error("Error in TargetLockTest.wipeDb()", e); - throw new RuntimeException(e); - } - } - - @Test - public void testGrabLockSuccess() throws InterruptedException, ExecutionException { - assertEquals( - OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC)); - - // attempt to grab expiredLock - try (PreparedStatement updateStatement = - conn.prepareStatement( - "UPDATE pooling.locks SET expirationTime = timestampadd(second, -1, now()) WHERE resourceId = ?"); ) { - updateStatement.setString(1, "resource1"); - updateStatement.executeUpdate(); - - } catch (SQLException e) { - logger.error("Error in TargetLockTest.testGrabLockSuccess()", e); - throw new RuntimeException(e); - } - - assertEquals( - OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC)); - - // cannot re-lock - assertEquals( - OperResult.OPER_DENIED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC)); - - assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - } - - @Test - public void testExpiredLocks() throws Exception { - - // grab lock - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - - // force lock to expire - try (PreparedStatement lockExpire = - conn.prepareStatement( - "UPDATE pooling.locks SET expirationTime = timestampadd(second, -?, now())"); ) { - lockExpire.setInt(1, MAX_AGE_SEC + 1); - lockExpire.executeUpdate(); - } - - assertEquals( - OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner2", MAX_AGE_SEC)); - } - - @Test - public void testGrabLockFail() throws InterruptedException, ExecutionException { - - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - - assertEquals( - OperResult.OPER_DENIED, distLockFeat.beforeLock("resource1", "owner2", MAX_AGE_SEC)); - } - - @Test - public void testSecondGrab_UpdateOk() throws Exception { - PreparedStatement grabLockInsert = mock(PreparedStatement.class); - when(grabLockInsert.executeUpdate()).thenThrow(new SQLException(EXPECTED)); - - PreparedStatement secondGrabUpdate = mock(PreparedStatement.class); - when(secondGrabUpdate.executeUpdate()).thenReturn(1); - - Connection connMock = mock(Connection.class); - when(connMock.prepareStatement(anyString())).thenReturn(grabLockInsert, secondGrabUpdate); - - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenReturn(connMock); - - assertTrue(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).lock(MAX_AGE_SEC)); - } - - @Test - public void testSecondGrab_UpdateFail_InsertOk() throws Exception { - PreparedStatement grabLockInsert = mock(PreparedStatement.class); - when(grabLockInsert.executeUpdate()).thenThrow(new SQLException(EXPECTED)); - - PreparedStatement secondGrabUpdate = mock(PreparedStatement.class); - when(secondGrabUpdate.executeUpdate()).thenReturn(0); - - PreparedStatement secondGrabInsert = mock(PreparedStatement.class); - when(secondGrabInsert.executeUpdate()).thenReturn(1); - - Connection connMock = mock(Connection.class); - when(connMock.prepareStatement(anyString())).thenReturn(grabLockInsert, secondGrabUpdate, secondGrabInsert); - - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenReturn(connMock); - - assertTrue(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).lock(MAX_AGE_SEC)); - } - - @Test - public void testSecondGrab_UpdateFail_InsertFail() throws Exception { - PreparedStatement grabLockInsert = mock(PreparedStatement.class); - when(grabLockInsert.executeUpdate()).thenThrow(new SQLException(EXPECTED)); - - PreparedStatement secondGrabUpdate = mock(PreparedStatement.class); - when(secondGrabUpdate.executeUpdate()).thenReturn(0); - - PreparedStatement secondGrabInsert = mock(PreparedStatement.class); - when(secondGrabInsert.executeUpdate()).thenReturn(0); - - Connection connMock = mock(Connection.class); - when(connMock.prepareStatement(anyString())).thenReturn(grabLockInsert, secondGrabUpdate, secondGrabInsert); - - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenReturn(connMock); - - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).lock(MAX_AGE_SEC)); - } - - @Test - public void testUpdateLock() throws Exception { - // not locked yet - refresh should fail - assertEquals( - OperResult.OPER_DENIED, distLockFeat.beforeRefresh("resource1", "owner1", MAX_AGE_SEC)); - - // now lock it - assertEquals( - OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC)); - - // refresh should work now - assertEquals( - OperResult.OPER_ACCEPTED, distLockFeat.beforeRefresh("resource1", "owner1", MAX_AGE_SEC)); - - assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - - // expire the lock - try (PreparedStatement updateStatement = - conn.prepareStatement( - "UPDATE pooling.locks SET expirationTime = timestampadd(second, -1, now()) WHERE resourceId = ?"); ) { - updateStatement.setString(1, "resource1"); - updateStatement.executeUpdate(); - } - - // refresh should fail now - assertEquals( - OperResult.OPER_DENIED, distLockFeat.beforeRefresh("resource1", "owner1", MAX_AGE_SEC)); - - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - - // test exception case - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED)); - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).refresh(MAX_AGE_SEC)); - } - - @Test - public void testUnlock() throws Exception { - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - - assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeUnlock("resource1", "owner1")); - assertEquals( - OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner2", MAX_AGE_SEC)); - - // test exception case - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED)); - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).unlock()); - } - - @Test - public void testIsActive() throws Exception { - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner2")); - - // isActive on expiredLock - try (PreparedStatement updateStatement = - conn.prepareStatement( - "UPDATE pooling.locks SET expirationTime = timestampadd(second, -5, now()) WHERE resourceId = ?"); ) { - updateStatement.setString(1, "resource1"); - updateStatement.executeUpdate(); - } - - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - // Unlock record, next isActive attempt should fail - distLockFeat.beforeUnlock("resource1", "owner1"); - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - - // test exception case for outer "try" - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED)); - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isActive()); - - // test exception case for inner "try" - PreparedStatement stmt = mock(PreparedStatement.class); - when(stmt.executeQuery()).thenThrow(new SQLException(EXPECTED)); - Connection connMock = mock(Connection.class); - when(connMock.prepareStatement(anyString())).thenReturn(stmt); - dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenReturn(connMock); - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isActive()); - } - - @Test - public void unlockBeforeLock() { - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeUnlock("resource1", "owner1")); - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeUnlock("resource1", "owner1")); - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeUnlock("resource1", "owner1")); - } - - @Test - public void testIsLocked() throws Exception { - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLocked("resource1")); - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLocked("resource1")); - - // test exception case for outer "try" - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED)); - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isLocked()); - - // test exception case for inner "try" - PreparedStatement stmt = mock(PreparedStatement.class); - when(stmt.executeQuery()).thenThrow(new SQLException(EXPECTED)); - Connection connMock = mock(Connection.class); - when(connMock.prepareStatement(anyString())).thenReturn(stmt); - dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenReturn(connMock); - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isLocked()); - } - - private static void getDbConnection() { - try { - conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD); - } catch (SQLException e) { - logger.error("Error in TargetLockTest.getDBConnection()", e); - } - } - - private static void createTable() { - String createString = - "create table if not exists pooling.locks " - + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), " - + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))"; - try (PreparedStatement createStmt = conn.prepareStatement(createString); ) { - createStmt.executeUpdate(); - - } catch (SQLException e) { - logger.error("Error in TargetLockTest.createTable()", e); - } - } -} diff --git a/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties b/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties index d1a07e82..061cc608 100644 --- a/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties +++ b/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties @@ -2,14 +2,14 @@ # ============LICENSE_START======================================================= # feature-distributed-locking # ================================================================================ -# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. +# Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,5 +22,8 @@ javax.persistence.jdbc.driver=org.h2.Driver javax.persistence.jdbc.url=jdbc:h2:mem:pooling javax.persistence.jdbc.user=user javax.persistence.jdbc.password=password -distributed.locking.lock.aging=150 -distributed.locking.heartbeat.interval=500
\ No newline at end of file + +distributed.locking.transient.error.codes=500 +distributed.locking.expire.check.seconds=900 +distributed.locking.retry.seconds=60 +distributed.locking.max.retries=2 |