diff options
author | Jim Hahn <jrh3@att.com> | 2019-11-14 15:16:13 -0500 |
---|---|---|
committer | Jim Hahn <jrh3@att.com> | 2019-11-14 16:42:52 -0500 |
commit | 58c3811bfba7e421af8c9d2d72f55e95b4b01a50 (patch) | |
tree | 23e35bdb304e17665aae1e5662494bac77a30772 | |
parent | 8bb11a84b833c7db1342af0c5823ee8309f15c1a (diff) |
Invoke lock callback in session thread
Injects the callback as a DroolsRunnable into the session, if
there is one. Otherwise, it invokes it via the engine's
thread pool.
Issue-ID: POLICY-2246
Signed-off-by: Jim Hahn <jrh3@att.com>
Change-Id: I214480ae675d89e7335dde4eb4abe2684f7ef8ab
Signed-off-by: Jim Hahn <jrh3@att.com>
8 files changed, 160 insertions, 89 deletions
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java index 7d58b8d5..528fa7cb 100644 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java @@ -341,7 +341,7 @@ public class DistributedLockManager extends LockManager<DistributedLockManager.D DistributedLock lock = lockref.get(); if (lock != null) { logger.debug("removed lock from map {}", lock); - lock.deny(DistributedLock.LOCK_LOST_MSG, false); + lock.deny(DistributedLock.LOCK_LOST_MSG); } } } @@ -473,7 +473,7 @@ public class DistributedLockManager extends LockManager<DistributedLockManager.D scheduleRequest(this::doExtend); } else { - deny(NOT_LOCKED_MSG, true); + deny(NOT_LOCKED_MSG); } } @@ -639,7 +639,7 @@ public class DistributedLockManager extends LockManager<DistributedLockManager.D } if (success) { - grant(true); + grant(); return; } } @@ -690,7 +690,7 @@ public class DistributedLockManager extends LockManager<DistributedLockManager.D * the record, thus we have to try to insert, if the update fails */ if (doDbUpdate(conn) || doDbInsert(conn)) { - grant(true); + grant(); return; } } @@ -790,7 +790,7 @@ public class DistributedLockManager extends LockManager<DistributedLockManager.D synchronized (this) { if (!isUnavailable()) { - deny(LOCK_LOST_MSG, true); + deny(LOCK_LOST_MSG); } } } diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java index c996d8d9..5351f004 100644 --- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java +++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java @@ -68,11 +68,13 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.kie.api.runtime.KieSession; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.onap.policy.common.utils.services.OrderedServiceImpl; import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock; +import org.onap.policy.drools.core.PolicySession; import org.onap.policy.drools.core.lock.Lock; import org.onap.policy.drools.core.lock.LockCallback; import org.onap.policy.drools.core.lock.LockState; @@ -120,6 +122,9 @@ public class DistributedLockManagerTest { private PolicyEngine engine; @Mock + private KieSession kieSess; + + @Mock private ScheduledExecutorService exsvc; @Mock @@ -132,6 +137,7 @@ public class DistributedLockManagerTest { private BasicDataSource datasrc; private DistributedLock lock; + private PolicySession session; private AtomicInteger nactive; private AtomicInteger nsuccesses; @@ -180,6 +186,16 @@ public class DistributedLockManagerTest { public void setUp() throws SQLException { MockitoAnnotations.initMocks(this); + // grant() and deny() calls will come through here and be immediately executed + session = new PolicySession(null, null, kieSess) { + @Override + public void insertDrools(Object object) { + ((Runnable) object).run(); + } + }; + + session.setPolicySession(); + nactive = new AtomicInteger(0); nsuccesses = new AtomicInteger(0); @@ -443,9 +459,9 @@ public class DistributedLockManagerTest { assertTrue(lock5.isUnavailable()); // allow callbacks - runLock(5, 2); - runLock(6, 1); - runLock(7, 0); + runLock(2, 2); + runLock(3, 1); + runLock(4, 0); verify(callback).lockUnavailable(lock); verify(callback3).lockUnavailable(lock3); verify(callback5).lockUnavailable(lock5); @@ -565,8 +581,8 @@ public class DistributedLockManagerTest { assertTrue(lock3.isWaiting()); assertTrue(lock4.isUnavailable()); - runLock(5, 0); - verify(exsvc, times(PRE_LOCK_EXECS + 6)).execute(any()); + runLock(4, 0); + verify(exsvc, times(PRE_LOCK_EXECS + 5)).execute(any()); verify(callback).lockUnavailable(lock); verify(callback2, never()).lockUnavailable(lock2); diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/FeatureLockImpl.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/FeatureLockImpl.java index d4e4f5fc..5690b187 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/internal/FeatureLockImpl.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/FeatureLockImpl.java @@ -21,6 +21,8 @@ package org.onap.policy.drools.system.internal; import java.util.concurrent.ScheduledExecutorService; +import org.onap.policy.drools.core.DroolsRunnable; +import org.onap.policy.drools.core.PolicySession; import org.onap.policy.drools.core.lock.LockCallback; import org.onap.policy.drools.core.lock.LockImpl; import org.onap.policy.drools.core.lock.LockState; @@ -66,13 +68,9 @@ public abstract class FeatureLockImpl extends LockImpl { } /** - * Grants this lock. The notification is <i>always</i> invoked via a background - * thread. - * - * @param foreground {@code true} if to invoke the callback in the foreground thread, - * {@code false} otherwise + * Grants this lock. */ - protected synchronized void grant(boolean foreground) { + protected synchronized void grant() { if (isUnavailable()) { return; } @@ -81,32 +79,37 @@ public abstract class FeatureLockImpl extends LockImpl { updateGrant(); logger.info("lock granted: {}", this); - - if (foreground) { - notifyAvailable(); - } else { - getThreadPool().execute(this::notifyAvailable); - } + doNotify(this::notifyAvailable); } /** * Permanently denies this lock. * * @param reason the reason the lock was denied - * @param foreground {@code true} if to invoke the callback in the foreground thread, - * {@code false} otherwise */ - public void deny(String reason, boolean foreground) { + public void deny(String reason) { synchronized (this) { setState(LockState.UNAVAILABLE); } logger.info("{}: {}", reason, this); + doNotify(this::notifyUnavailable); + } + + /** + * Notifies the session of a change in the lock state. If a session is attached, then + * it simply injects the notifier into the session. Otherwise, it executes it via a + * background thread. + * + * @param notifier function to invoke the callback + */ + private void doNotify(DroolsRunnable notifier) { + PolicySession sess = getSession(); + if (sess != null) { + sess.insertDrools(notifier); - if (foreground) { - notifyUnavailable(); } else { - getThreadPool().execute(this::notifyUnavailable); + getThreadPool().execute(notifier); } } @@ -164,7 +167,7 @@ public abstract class FeatureLockImpl extends LockImpl { // do a quick check of the state if (isUnavailable() || !attachFeature()) { - deny(LOCK_LOST_MSG, true); + deny(LOCK_LOST_MSG); return false; } @@ -200,12 +203,13 @@ public abstract class FeatureLockImpl extends LockImpl { */ protected abstract boolean addToFeature(); - /** - * Gets the thread pool. - * - * @return the thread pool - */ + // these may be overridden by junit tests + protected ScheduledExecutorService getThreadPool() { return PolicyEngineConstants.getManager().getExecutorService(); } + + protected PolicySession getSession() { + return PolicySession.getCurrentSession(); + } } diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/LockManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/LockManager.java index 7e4505be..ef6b48d2 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/internal/LockManager.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/LockManager.java @@ -156,7 +156,7 @@ public abstract class LockManager<T extends FeatureLockImpl> implements PolicyRe logger.debug("added lock to map {}", lock); finishLock(lock); } else { - lock.deny("resource is busy", true); + lock.deny("resource is busy"); } return lock; diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java index 839c17dc..a62d7667 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java @@ -158,14 +158,14 @@ public class SimpleLockManager extends LockManager<SimpleLockManager.SimpleLock> SimpleLock lock = lockref.get(); if (lock != null) { - lock.deny("lock expired", false); + lock.deny("lock expired"); } } } @Override protected void finishLock(SimpleLock lock) { - lock.grant(true); + lock.grant(); } @Override @@ -257,9 +257,9 @@ public class SimpleLockManager extends LockManager<SimpleLockManager.SimpleLock> } if (resource2lock.get(getResourceId()) == this) { - grant(true); + grant(); } else { - deny(NOT_LOCKED_MSG, true); + deny(NOT_LOCKED_MSG); } } diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/FeatureLockImplTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/FeatureLockImplTest.java index 258ee0c5..a224a636 100644 --- a/policy-management/src/test/java/org/onap/policy/drools/system/internal/FeatureLockImplTest.java +++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/FeatureLockImplTest.java @@ -46,6 +46,8 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.onap.policy.drools.core.DroolsRunnable; +import org.onap.policy.drools.core.PolicySession; import org.onap.policy.drools.core.lock.LockCallback; import org.onap.policy.drools.core.lock.LockState; import org.onap.policy.drools.system.PolicyEngineConstants; @@ -126,30 +128,10 @@ public class FeatureLockImplTest { assertEquals(HOLD_SEC, lock.getHoldSec()); } - /** - * Tests grant(), when using the foreground thread. - */ - @Test - public void testGrantForeground() { - MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback); - lock.grant(true); - - assertTrue(lock.isActive()); - assertEquals(1, lock.nupdates); - - verify(exsvc, never()).execute(any()); - - verify(callback).lockAvailable(any()); - verify(callback, never()).lockUnavailable(any()); - } - - /** - * Tests grant(), when using the background thread. - */ @Test - public void testGrantBackground() { + public void testGrant() { MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback); - lock.grant(false); + lock.grant(); assertTrue(lock.isActive()); assertEquals(1, lock.nupdates); @@ -166,7 +148,7 @@ public class FeatureLockImplTest { public void testGrantUnavailable() { MyLock lock = new MyLock(LockState.UNAVAILABLE, RESOURCE, OWNER_KEY, HOLD_SEC, callback); lock.setState(LockState.UNAVAILABLE); - lock.grant(true); + lock.grant(); assertTrue(lock.isUnavailable()); assertEquals(0, lock.nupdates); @@ -174,35 +156,65 @@ public class FeatureLockImplTest { verify(exsvc, never()).execute(any()); } - /** - * Tests deny(), when using the foreground thread. - */ @Test - public void testDenyForeground() { + public void testDeny() { MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback); - lock.deny("my reason", true); + lock.deny("my reason"); assertTrue(lock.isUnavailable()); - verify(exsvc, never()).execute(any()); - + invokeCallback(1); verify(callback, never()).lockAvailable(any()); verify(callback).lockUnavailable(any()); } /** - * Tests deny(), when using the background thread. + * Tests doNotify() when a session exists. */ @Test - public void testDenyBackground() { + public void testDoNotifySession() { + PolicySession session = mock(PolicySession.class); + + MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback) { + private static final long serialVersionUID = 1L; + + @Override + protected PolicySession getSession() { + return session; + } + }; + + lock.grant(); + + assertTrue(lock.isActive()); + assertEquals(1, lock.nupdates); + + verify(exsvc, never()).execute(any()); + + ArgumentCaptor<Object> captor = ArgumentCaptor.forClass(Object.class); + verify(session).insertDrools(captor.capture()); + + DroolsRunnable runner = (DroolsRunnable) captor.getValue(); + runner.run(); + + verify(callback).lockAvailable(any()); + verify(callback, never()).lockUnavailable(any()); + } + + /** + * Tests doNotify() when there is no session. + */ + @Test + public void testDoNotifyNoSession() { MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback); - lock.deny("my reason", false); + lock.grant(); - assertTrue(lock.isUnavailable()); + assertTrue(lock.isActive()); + assertEquals(1, lock.nupdates); invokeCallback(1); - verify(callback, never()).lockAvailable(any()); - verify(callback).lockUnavailable(any()); + verify(callback).lockAvailable(any()); + verify(callback, never()).lockUnavailable(any()); } @Test @@ -283,6 +295,7 @@ public class FeatureLockImplTest { assertEquals(HOLD_SEC2, lock.getHoldSec()); assertSame(scallback, lock.getCallback()); + invokeCallback(1); verify(scallback, never()).lockAvailable(lock); verify(scallback).lockUnavailable(lock); } @@ -324,11 +337,20 @@ public class FeatureLockImplTest { assertEquals(HOLD_SEC2, lock.getHoldSec()); assertSame(scallback, lock.getCallback()); + invokeCallback(1); verify(scallback, never()).lockAvailable(lock); verify(scallback).lockUnavailable(lock); } @Test + public void testGetSession() { + MyLockStdSession lock = new MyLockStdSession(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback); + + // this should invoke the real policy session without throwing an exception + lock.grant(); + } + + @Test public void testToString() { String text = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback).toString(); assertNotNull(text); @@ -361,15 +383,19 @@ public class FeatureLockImplTest { } } - public static class MyLock extends FeatureLockImpl { + /** + * Lock that inherits the normal getSession() method. + */ + public static class MyLockStdSession extends FeatureLockImpl { private static final long serialVersionUID = 1L; - private int nupdates = 0; + protected int nupdates = 0; - public MyLock() { + public MyLockStdSession() { super(); } - public MyLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback) { + public MyLockStdSession(LockState state, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { super(state, resourceId, ownerKey, holdSec, callback); } @@ -395,6 +421,23 @@ public class FeatureLockImplTest { } } + public static class MyLock extends MyLockStdSession { + private static final long serialVersionUID = 1L; + + public MyLock() { + super(); + } + + public MyLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback) { + super(state, resourceId, ownerKey, holdSec, callback); + } + + @Override + protected PolicySession getSession() { + return null; + } + } + public static class MyLockNoFeature extends MyLock { private static final long serialVersionUID = 1L; diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/LockManagerTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/LockManagerTest.java index 1cda079d..6b6e7363 100644 --- a/policy-management/src/test/java/org/onap/policy/drools/system/internal/LockManagerTest.java +++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/LockManagerTest.java @@ -208,7 +208,7 @@ public class LockManagerTest { @Override protected void finishLock(MyLock lock) { - lock.grant(true); + lock.grant(); } @Override diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java index 79e20673..b1c34c24 100644 --- a/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java +++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java @@ -55,11 +55,13 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.kie.api.runtime.KieSession; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.onap.policy.common.utils.time.CurrentTime; import org.onap.policy.common.utils.time.TestTime; +import org.onap.policy.drools.core.PolicySession; import org.onap.policy.drools.core.lock.Lock; import org.onap.policy.drools.core.lock.LockCallback; import org.onap.policy.drools.core.lock.LockState; @@ -85,12 +87,16 @@ public class SimpleLockManagerTest { private static ScheduledExecutorService saveExec; private static ScheduledExecutorService realExec; + private PolicySession session; private TestTime testTime; private AtomicInteger nactive; private AtomicInteger nsuccesses; private SimpleLockManager feature; @Mock + private KieSession kieSess; + + @Mock private ScheduledExecutorService exsvc; @Mock @@ -130,6 +136,16 @@ public class SimpleLockManagerTest { public void setUp() { MockitoAnnotations.initMocks(this); + // grant() and deny() calls will come through here and be immediately executed + session = new PolicySession(null, null, kieSess) { + @Override + public void insertDrools(Object object) { + ((Runnable) object).run(); + } + }; + + session.setPolicySession(); + testTime = new TestTime(); nactive = new AtomicInteger(0); nsuccesses = new AtomicInteger(0); @@ -252,10 +268,6 @@ public class SimpleLockManagerTest { assertFalse(lock2.isActive()); assertTrue(lock3.isActive()); - // run the callbacks - captor = ArgumentCaptor.forClass(Runnable.class); - verify(exsvc, times(2)).execute(captor.capture()); - captor.getAllValues().forEach(Runnable::run); verify(callback).lockUnavailable(lock); verify(callback).lockUnavailable(lock2); verify(callback, never()).lockUnavailable(lock3); @@ -272,10 +284,6 @@ public class SimpleLockManagerTest { checker.run(); assertFalse(lock3.isActive()); - // run the callback - captor = ArgumentCaptor.forClass(Runnable.class); - verify(exsvc, times(3)).execute(captor.capture()); - captor.getValue().run(); verify(callback).lockUnavailable(lock3); } @@ -433,7 +441,7 @@ public class SimpleLockManagerTest { @Test public void testSimpleLockExpired() { SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); - lock.grant(true); + lock.grant(); assertFalse(lock.expired(testTime.getMillis())); assertFalse(lock.expired(testTime.getMillis() + HOLD_MS - 1)); |