diff options
8 files changed, 1228 insertions, 491 deletions
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java index 7ee786b9..6f83ea15 100644 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java @@ -30,9 +30,9 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -42,15 +42,15 @@ import lombok.Setter; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.dbcp2.BasicDataSourceFactory; import org.onap.policy.common.utils.network.NetworkUtil; -import org.onap.policy.drools.core.lock.AlwaysFailLock; -import org.onap.policy.drools.core.lock.Lock; import org.onap.policy.drools.core.lock.LockCallback; -import org.onap.policy.drools.core.lock.LockImpl; import org.onap.policy.drools.core.lock.LockState; import org.onap.policy.drools.core.lock.PolicyResourceLockManager; import org.onap.policy.drools.features.PolicyEngineFeatureApi; import org.onap.policy.drools.persistence.SystemPersistenceConstants; import org.onap.policy.drools.system.PolicyEngine; +import org.onap.policy.drools.system.PolicyEngineConstants; +import org.onap.policy.drools.system.internal.FeatureLockImpl; +import org.onap.policy.drools.system.internal.LockManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,13 +78,12 @@ import org.slf4j.LoggerFactory; * instance.</li> * </dl> */ -public class DistributedLockManager implements PolicyResourceLockManager, PolicyEngineFeatureApi { +public class DistributedLockManager extends LockManager<DistributedLockManager.DistributedLock> + implements PolicyEngineFeatureApi { private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class); private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking"; - private static final String LOCK_LOST_MSG = "lock lost"; - private static final String NOT_LOCKED_MSG = "not locked"; @Getter(AccessLevel.PROTECTED) @Setter(AccessLevel.PROTECTED) @@ -108,23 +107,23 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy * lock is added to the map, it remains in the map until the lock is lost or until the * unlock request completes. */ - private final Map<String, DistributedLock> resource2lock = new ConcurrentHashMap<>(); + private final Map<String, DistributedLock> resource2lock; /** - * Engine with which this manager is associated. + * Thread pool used to check for lock expiration and to notify owners when locks are + * granted or lost. */ - private PolicyEngine engine; + private ScheduledExecutorService exsvc = null; /** - * Feature properties. + * Used to cancel the expiration checker on shutdown. */ - private DistributedLockProperties featProps; + private ScheduledFuture<?> checker = null; /** - * Thread pool used to check for lock expiration and to notify owners when locks are - * granted or lost. + * Feature properties. */ - private ScheduledExecutorService exsvc = null; + private DistributedLockProperties featProps; /** * Data source used to connect to the DB. @@ -137,6 +136,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy */ public DistributedLockManager() { this.hostName = NetworkUtil.getHostname(); + this.resource2lock = getResource2lock(); } @Override @@ -145,49 +145,10 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy } @Override - public boolean isAlive() { - return (exsvc != null); - } - - @Override - public boolean start() { - // handled via engine API - return true; - } - - @Override - public boolean stop() { - // handled via engine API - return true; - } - - @Override - public void shutdown() { - // handled via engine API - } - - @Override - public boolean isLocked() { - return false; - } - - @Override - public boolean lock() { - return true; - } - - @Override - public boolean unlock() { - return true; - } - - @Override public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) { try { - this.engine = engine; this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME)); - this.exsvc = getThreadPool(); this.dataSource = makeDataSource(); return this; @@ -201,8 +162,9 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy public boolean afterStart(PolicyEngine engine) { try { + exsvc = PolicyEngineConstants.getManager().getExecutorService(); exsvc.execute(this::deleteExpiredDbLocks); - exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS); + checker = exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS); setLatestInstance(this); @@ -257,6 +219,11 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy @Override public boolean afterStop(PolicyEngine engine) { exsvc = null; + + if (checker != null) { + checker.cancel(true); + } + closeDataSource(); return false; } @@ -278,49 +245,36 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy } @Override - public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback, - boolean waitForLock) { - - if (latestInstance != this) { - AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback); - lock.notifyUnavailable(); - return lock; - } - - DistributedLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback); - - DistributedLock existingLock = resource2lock.putIfAbsent(resourceId, lock); - - // do these outside of compute() to avoid blocking other map operations - if (existingLock == null) { - logger.debug("added lock to map {}", lock); - lock.scheduleRequest(lock::doLock); - } else { - lock.deny("resource is busy", true); - } + protected boolean hasInstanceChanged() { + return (getLatestInstance() != this); + } - return lock; + @Override + protected void finishLock(DistributedLock lock) { + lock.scheduleRequest(lock::doLock); } /** * Checks for expired locks. */ private void checkExpired() { - try { logger.info("checking for expired locks"); Set<String> expiredIds = new HashSet<>(resource2lock.keySet()); identifyDbLocks(expiredIds); expireLocks(expiredIds); - exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS); + checker = exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS); } catch (RejectedExecutionException e) { logger.warn("thread pool is no longer accepting requests", e); } catch (SQLException | RuntimeException e) { logger.error("error checking expired locks", e); - exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS); + + if (isAlive()) { + checker = exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS); + } } logger.info("done checking for expired locks"); @@ -387,7 +341,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy DistributedLock lock = lockref.get(); if (lock != null) { logger.debug("removed lock from map {}", lock); - lock.deny(LOCK_LOST_MSG, false); + lock.deny(DistributedLock.LOCK_LOST_MSG, false); } } } @@ -395,7 +349,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy /** * Distributed Lock implementation. */ - public static class DistributedLock extends LockImpl { + public static class DistributedLock extends FeatureLockImpl { private static final String SQL_FAILED_MSG = "request failed for lock: {}"; private static final long serialVersionUID = 1L; @@ -439,6 +393,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy * Constructs the object. */ public DistributedLock() { + this.feature = null; this.hostName = ""; this.uuidString = ""; } @@ -464,58 +419,9 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy this.uuidString = feature.uuidString; } - /** - * Grants this lock. The notification is <i>always</i> invoked via the - * <i>foreground</i> thread. - */ - protected void grant() { - synchronized (this) { - if (isUnavailable()) { - return; - } - - setState(LockState.ACTIVE); - } - - logger.info("lock granted: {}", this); - - notifyAvailable(); - } - - /** - * Permanently denies this lock. - * - * @param reason the reason the lock was denied - * @param foreground {@code true} if the callback can be invoked in the current - * (i.e., foreground) thread, {@code false} if it should be invoked via the - * executor - */ - protected void deny(String reason, boolean foreground) { - synchronized (this) { - setState(LockState.UNAVAILABLE); - } - - logger.info("{}: {}", reason, this); - - if (feature == null || foreground) { - notifyUnavailable(); - - } else { - feature.exsvc.execute(this::notifyUnavailable); - } - } - @Override public boolean free() { - // do a quick check of the state - if (isUnavailable()) { - return false; - } - - logger.info("releasing lock: {}", this); - - if (!attachFeature()) { - setState(LockState.UNAVAILABLE); + if (!freeAllowed()) { return false; } @@ -546,16 +452,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy @Override public void extend(int holdSec, LockCallback callback) { - if (holdSec < 0) { - throw new IllegalArgumentException("holdSec is negative"); - } - - setHoldSec(holdSec); - setCallback(callback); - - // do a quick check of the state - if (isUnavailable() || !attachFeature()) { - deny(LOCK_LOST_MSG, true); + if (!extendAllowed(holdSec, callback)) { return; } @@ -580,19 +477,9 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy } } - /** - * Attaches to the feature instance, if not already attached. - * - * @return {@code true} if the lock is now attached to a feature, {@code false} - * otherwise - */ - private synchronized boolean attachFeature() { - if (feature != null) { - // already attached - return true; - } - - feature = latestInstance; + @Override + protected boolean addToFeature() { + feature = getLatestInstance(); if (feature == null) { logger.warn("no feature yet for {}", this); return false; @@ -613,7 +500,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy logger.debug("schedule lock action {}", this); nretries = 0; request = schedreq; - feature.exsvc.execute(this::doRequest); + getThreadPool().execute(this::doRequest); } /** @@ -633,7 +520,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy if (nretries++ < feature.featProps.getMaxRetries()) { logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this); request = req; - feature.exsvc.schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS); + getThreadPool().schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS); return; } } @@ -752,7 +639,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy } if (success) { - grant(); + grant(true); return; } } @@ -803,7 +690,7 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy * the record, thus we have to try to insert, if the update fails */ if (doDbUpdate(conn) || doDbInsert(conn)) { - grant(); + grant(true); return; } } @@ -927,10 +814,6 @@ public class DistributedLockManager implements PolicyResourceLockManager, Policy return SystemPersistenceConstants.getManager().getProperties(fileName); } - protected ScheduledExecutorService getThreadPool() { - return engine.getExecutorService(); - } - protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback) { return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this); diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java index 0ba92c51..c996d8d9 100644 --- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java +++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java @@ -30,6 +30,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doThrow; @@ -55,6 +56,7 @@ import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -115,16 +117,19 @@ public class DistributedLockManagerTest { private static ScheduledExecutorService realExec; @Mock + private PolicyEngine engine; + + @Mock private ScheduledExecutorService exsvc; @Mock - private LockCallback callback; + private ScheduledFuture<?> checker; @Mock - private BasicDataSource datasrc; + private LockCallback callback; @Mock - private PolicyEngine engine; + private BasicDataSource datasrc; private DistributedLock lock; @@ -153,7 +158,6 @@ public class DistributedLockManagerTest { saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD); realExec = Executors.newScheduledThreadPool(3); - Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec); } /** @@ -181,8 +185,6 @@ public class DistributedLockManagerTest { cleanDb(); - when(engine.getExecutorService()).thenReturn(exsvc); - feature = new MyLockingFeature(true); } @@ -214,52 +216,12 @@ public class DistributedLockManagerTest { .anyMatch(obj -> obj instanceof DistributedLockManager)); } - /** - * Tests constructor() when properties are invalid. - */ - @Test - public void testDistributedLockManagerInvalidProperties() { - // use properties containing an invalid value - Properties props = new Properties(); - props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "abc"); - - feature = new MyLockingFeature(false) { - @Override - protected Properties getProperties(String fileName) { - return props; - } - }; - - assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class); - } - @Test public void testGetSequenceNumber() { assertEquals(1000, feature.getSequenceNumber()); } @Test - public void testStartableApi() { - assertTrue(feature.isAlive()); - assertTrue(feature.start()); - assertTrue(feature.stop()); - feature.shutdown(); - - // above should have had no effect - assertTrue(feature.isAlive()); - - feature.afterStop(engine); - assertFalse(feature.isAlive()); - } - - @Test - public void testLockApi() { - assertFalse(feature.isLocked()); - assertTrue(feature.lock()); - assertTrue(feature.unlock()); - } - - @Test public void testBeforeCreateLockManager() { assertSame(feature, feature.beforeCreateLockManager(engine, new Properties())); } @@ -298,8 +260,7 @@ public class DistributedLockManagerTest { feature = new MyLockingFeature(false); - when(exsvc.schedule(any(Runnable.class), anyLong(), any())) - .thenThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)); + doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any()); assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class); } @@ -350,6 +311,7 @@ public class DistributedLockManagerTest { @Test public void testAfterStop() { shutdownFeature(); + verify(checker).cancel(anyBoolean()); feature = new DistributedLockManager(); @@ -424,7 +386,7 @@ public class DistributedLockManagerTest { } /** - * Tests lock() when the feature is not the latest instance. + * Tests createLock() when the feature is not the latest instance. */ @Test public void testCreateLockNotLatestInstance() { @@ -522,6 +484,27 @@ public class DistributedLockManagerTest { runChecker(0, 0, RETRY_SEC); } + /** + * Tests checkExpired(), when getConnection() throws an exception and the feature is + * no longer alive. + */ + @Test + public void testCheckExpiredSqlExFeatureStopped() { + // use a data source that throws an exception when getConnection() is called + feature = new InvalidDbLockingFeature(TRANSIENT) { + @Override + protected SQLException makeEx() { + this.stop(); + return super.makeEx(); + } + }; + + runChecker(0, 0, EXPIRE_SEC); + + // it should NOT have scheduled another check + verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any()); + } + @Test public void testExpireLocks() throws SQLException { AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null); @@ -637,20 +620,6 @@ public class DistributedLockManagerTest { verify(callback).lockAvailable(lock); } - /** - * Tests grant() when the lock is already unavailable. - */ - @Test - public void testDistributedLockGrantUnavailable() { - DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); - lock.setState(LockState.UNAVAILABLE); - lock.grant(); - - assertTrue(lock.isUnavailable()); - verify(callback, never()).lockAvailable(any()); - verify(callback, never()).lockUnavailable(any()); - } - @Test public void testDistributedLockDeny() { // get a lock @@ -1464,6 +1433,8 @@ public class DistributedLockManagerTest { */ @Test public void testMultiThreaded() throws InterruptedException { + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec); + feature = new DistributedLockManager(); feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties()); feature.afterStart(PolicyEngineConstants.getManager()); @@ -1661,10 +1632,12 @@ public class DistributedLockManagerTest { shutdownFeature(); exsvc = mock(ScheduledExecutorService.class); - when(engine.getExecutorService()).thenReturn(exsvc); + when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker); + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc); if (init) { beforeCreateLockManager(engine, new Properties()); + start(); afterStart(engine); } } @@ -1685,6 +1658,7 @@ public class DistributedLockManagerTest { this.isTransient = isTransient; this.beforeCreateLockManager(engine, new Properties()); + this.start(); this.afterStart(engine); } @@ -1704,7 +1678,7 @@ public class DistributedLockManagerTest { return datasrc; } - private SQLException makeEx() { + protected SQLException makeEx() { if (isTransient) { return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION)); diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/FeatureLockImpl.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/FeatureLockImpl.java new file mode 100644 index 00000000..d4e4f5fc --- /dev/null +++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/FeatureLockImpl.java @@ -0,0 +1,211 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +import java.util.concurrent.ScheduledExecutorService; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockImpl; +import org.onap.policy.drools.core.lock.LockState; +import org.onap.policy.drools.system.PolicyEngineConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Lock implementation used by locking features. + */ +public abstract class FeatureLockImpl extends LockImpl { + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(FeatureLockImpl.class); + + public static final String LOCK_LOST_MSG = "lock lost"; + + /** + * {@code True} if this lock is attached to a feature, {@code false} if it is not. + */ + private transient boolean attached; + + /** + * Constructs the object. + */ + public FeatureLockImpl() { + this.attached = false; + } + + /** + * Constructs the object. + * + * @param state initial state of the lock + * @param resourceId identifier of the resource to be locked + * @param ownerKey information identifying the owner requesting the lock + * @param holdSec amount of time, in seconds, for which the lock should be held, after + * which it will automatically be released + * @param callback callback to be invoked once the lock is granted, or subsequently + * lost; must not be {@code null} + */ + public FeatureLockImpl(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback) { + super(state, resourceId, ownerKey, holdSec, callback); + this.attached = true; + } + + /** + * Grants this lock. The notification is <i>always</i> invoked via a background + * thread. + * + * @param foreground {@code true} if to invoke the callback in the foreground thread, + * {@code false} otherwise + */ + protected synchronized void grant(boolean foreground) { + if (isUnavailable()) { + return; + } + + setState(LockState.ACTIVE); + updateGrant(); + + logger.info("lock granted: {}", this); + + if (foreground) { + notifyAvailable(); + } else { + getThreadPool().execute(this::notifyAvailable); + } + } + + /** + * Permanently denies this lock. + * + * @param reason the reason the lock was denied + * @param foreground {@code true} if to invoke the callback in the foreground thread, + * {@code false} otherwise + */ + public void deny(String reason, boolean foreground) { + synchronized (this) { + setState(LockState.UNAVAILABLE); + } + + logger.info("{}: {}", reason, this); + + if (foreground) { + notifyUnavailable(); + } else { + getThreadPool().execute(this::notifyUnavailable); + } + } + + /** + * The subclass should make use of {@link #freeAllowed()} in its implementation of + * {@link #free()}. + */ + @Override + public abstract boolean free(); + + /** + * Determines if the lock can be freed. + * + * @return {@code true} if the lock can be freed, {@code false} if the lock is + * unavailable + */ + protected boolean freeAllowed() { + // do a quick check of the state + if (isUnavailable()) { + return false; + } + + logger.info("releasing lock: {}", this); + + if (!attachFeature()) { + setState(LockState.UNAVAILABLE); + return false; + } + + return true; + } + + /** + * The subclass should make use of {@link #extendAllowed()} in its implementation of + * {@link #extend()}. + */ + @Override + public abstract void extend(int holdSec, LockCallback callback); + + /** + * Determines if the lock can be extended. + * + * @param holdSec the additional amount of time to hold the lock, in seconds + * @param callback callback to be invoked when the extension completes + * @return {@code true} if the lock can be extended, {@code false} if the lock is + * unavailable + */ + protected boolean extendAllowed(int holdSec, LockCallback callback) { + if (holdSec < 0) { + throw new IllegalArgumentException("holdSec is negative"); + } + + setHoldSec(holdSec); + setCallback(callback); + + // do a quick check of the state + if (isUnavailable() || !attachFeature()) { + deny(LOCK_LOST_MSG, true); + return false; + } + + return true; + } + + /** + * Attaches to the feature instance, if not already attached. + * + * @return {@code true} if the lock is now attached to a feature, {@code false} + * otherwise + */ + private synchronized boolean attachFeature() { + if (!attached) { + attached = addToFeature(); + } + + return attached; + } + + /** + * Updates a lock when it is granted. The default method does nothing. + */ + protected void updateGrant() { + // do nothing + } + + /** + * Adds the lock to the relevant feature. + * + * @return {@code true} if the lock was added, {@code false} if it could not be added + * (e.g., because there is no feature yet) + */ + protected abstract boolean addToFeature(); + + /** + * Gets the thread pool. + * + * @return the thread pool + */ + protected ScheduledExecutorService getThreadPool() { + return PolicyEngineConstants.getManager().getExecutorService(); + } +} diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/LockManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/LockManager.java new file mode 100644 index 00000000..7e4505be --- /dev/null +++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/LockManager.java @@ -0,0 +1,197 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import lombok.AccessLevel; +import lombok.Getter; +import org.onap.policy.drools.core.lock.AlwaysFailLock; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockState; +import org.onap.policy.drools.core.lock.PolicyResourceLockManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Super class for Lock Features. + * + * <p/> + * Note: this implementation does <i>not</i> honor the waitForLocks={@code true} + * parameter. + * + * <p/> + * When a lock is deserialized, it will not initially appear in this feature's map; it + * will be added to the map once free() or extend() is invoked, provided there isn't + * already an entry. + */ +public abstract class LockManager<T extends FeatureLockImpl> implements PolicyResourceLockManager { + + private static final Logger logger = LoggerFactory.getLogger(LockManager.class); + + public static final String NOT_LOCKED_MSG = "not locked"; + + /** + * Maps a resource to the lock that owns it. + */ + @Getter(AccessLevel.PROTECTED) + private final Map<String, T> resource2lock = new ConcurrentHashMap<>(); + + /** + * {@code True} if this feature is running, {@code false} otherwise. + */ + private boolean alive = false; + + /** + * {@code True} if this feature is locked, {@code false} otherwise. + */ + private boolean locked = false; + + + /** + * Constructs the object. + */ + public LockManager() { + super(); + } + + @Override + public boolean isAlive() { + return alive; + } + + @Override + public synchronized boolean start() { + if (alive) { + return false; + } + + alive = true; + return true; + } + + /** + * Stops the expiration checker. Does <i>not</i> invoke any lock call-backs. + */ + @Override + public synchronized boolean stop() { + if (!alive) { + return false; + } + + alive = false; + return true; + } + + @Override + public void shutdown() { + stop(); + } + + @Override + public boolean isLocked() { + return locked; + } + + @Override + public synchronized boolean lock() { + if (locked) { + return false; + } + + locked = true; + return true; + } + + @Override + public synchronized boolean unlock() { + if (!locked) { + return false; + } + + locked = false; + return true; + } + + /** + * After performing checks, this invokes + * {@link #makeLock(LockState, String, String, int, LockCallback)} to create a lock + * object, inserts it into the map, and then invokes {@link #finishLock(MgrLock)}. + */ + @Override + public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback, + boolean waitForLock) { + + if (hasInstanceChanged()) { + AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback); + lock.notifyUnavailable(); + return lock; + } + + T lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback); + + T existingLock = resource2lock.putIfAbsent(resourceId, lock); + + if (existingLock == null) { + logger.debug("added lock to map {}", lock); + finishLock(lock); + } else { + lock.deny("resource is busy", true); + } + + return lock; + } + + /** + * Determines if this object is no longer the current instance of this feature type. + * + * @return {@code true} if this object is no longer the current instance, + * {@code false} otherwise + */ + protected abstract boolean hasInstanceChanged(); + + /** + * Finishes the steps required to establish a lock, changing its state to ACTIVE, when + * appropriate. + * + * @param lock the lock to be locked + */ + protected abstract void finishLock(T lock); + + // these may be overridden by junit tests + + /** + * Makes a lock of the appropriate type. + * + * @param state initial state of the lock + * @param resourceId identifier of the resource to be locked + * @param ownerKey information identifying the owner requesting the lock + * @param holdSec amount of time, in seconds, for which the lock should be held, after + * which it will automatically be released + * @param callback callback to be invoked once the lock is granted, or subsequently + * lost; must not be {@code null} + * @return a new lock + */ + protected abstract T makeLock(LockState waiting, String resourceId, String ownerKey, int holdSec, + LockCallback callback); +} diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java index f5163e9b..839c17dc 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java @@ -23,8 +23,6 @@ package org.onap.policy.drools.system.internal; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,13 +32,10 @@ import lombok.Getter; import lombok.Setter; import org.onap.policy.common.utils.properties.exception.PropertyException; import org.onap.policy.common.utils.time.CurrentTime; -import org.onap.policy.drools.core.lock.AlwaysFailLock; -import org.onap.policy.drools.core.lock.Lock; import org.onap.policy.drools.core.lock.LockCallback; -import org.onap.policy.drools.core.lock.LockImpl; import org.onap.policy.drools.core.lock.LockState; -import org.onap.policy.drools.core.lock.PolicyResourceLockManager; import org.onap.policy.drools.system.PolicyEngine; +import org.onap.policy.drools.system.PolicyEngineConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,13 +53,10 @@ import org.slf4j.LoggerFactory; * will be added to the map once free() or extend() is invoked, provided there isn't * already an entry. */ -public class SimpleLockManager implements PolicyResourceLockManager { +public class SimpleLockManager extends LockManager<SimpleLockManager.SimpleLock> { private static final Logger logger = LoggerFactory.getLogger(SimpleLockManager.class); - private static final String NOT_LOCKED_MSG = "not locked"; - private static final String LOCK_LOST_MSG = "lock lost"; - /** * Provider of current time. May be overridden by junit tests. */ @@ -74,12 +66,6 @@ public class SimpleLockManager implements PolicyResourceLockManager { @Setter(AccessLevel.PROTECTED) private static SimpleLockManager latestInstance = null; - - /** - * Engine with which this manager is associated. - */ - private final PolicyEngine engine; - /** * Feature properties. */ @@ -88,13 +74,7 @@ public class SimpleLockManager implements PolicyResourceLockManager { /** * Maps a resource to the lock that owns it. */ - private final Map<String, SimpleLock> resource2lock = new ConcurrentHashMap<>(); - - /** - * Thread pool used to check for lock expiration and to notify owners when locks are - * lost. - */ - private ScheduledExecutorService exsvc = null; + private final Map<String, SimpleLock> resource2lock; /** * Used to cancel the expiration checker on shutdown. @@ -110,8 +90,8 @@ public class SimpleLockManager implements PolicyResourceLockManager { */ public SimpleLockManager(PolicyEngine engine, Properties properties) { try { - this.engine = engine; this.featProps = new SimpleLockProperties(properties); + this.resource2lock = getResource2lock(); } catch (PropertyException e) { throw new SimpleLockManagerException(e); @@ -119,24 +99,17 @@ public class SimpleLockManager implements PolicyResourceLockManager { } @Override - public boolean isAlive() { - return (checker != null); - } - - @Override - public boolean start() { - if (checker != null) { + public synchronized boolean start() { + if (isAlive()) { return false; } - exsvc = getThreadPool(); - - checker = exsvc.scheduleWithFixedDelay(this::checkExpired, featProps.getExpireCheckSec(), - featProps.getExpireCheckSec(), TimeUnit.SECONDS); + checker = PolicyEngineConstants.getManager().getExecutorService().scheduleWithFixedDelay(this::checkExpired, + featProps.getExpireCheckSec(), featProps.getExpireCheckSec(), TimeUnit.SECONDS); setLatestInstance(this); - return true; + return super.start(); } /** @@ -144,9 +117,7 @@ public class SimpleLockManager implements PolicyResourceLockManager { */ @Override public synchronized boolean stop() { - exsvc = null; - - if (checker == null) { + if (!super.stop()) { return false; } @@ -158,49 +129,6 @@ public class SimpleLockManager implements PolicyResourceLockManager { return true; } - @Override - public void shutdown() { - stop(); - } - - @Override - public boolean isLocked() { - return false; - } - - @Override - public boolean lock() { - return true; - } - - @Override - public boolean unlock() { - return true; - } - - @Override - public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback, - boolean waitForLock) { - - if (latestInstance != this) { - AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback); - lock.notifyUnavailable(); - return lock; - } - - SimpleLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback); - - SimpleLock existingLock = resource2lock.putIfAbsent(resourceId, lock); - - if (existingLock == null) { - lock.grant(); - } else { - lock.deny("resource is busy"); - } - - return lock; - } - /** * Checks for expired locks. */ @@ -230,15 +158,25 @@ public class SimpleLockManager implements PolicyResourceLockManager { SimpleLock lock = lockref.get(); if (lock != null) { - lock.deny("lock expired"); + lock.deny("lock expired", false); } } } + @Override + protected void finishLock(SimpleLock lock) { + lock.grant(true); + } + + @Override + protected boolean hasInstanceChanged() { + return (getLatestInstance() != this); + } + /** * Simple Lock implementation. */ - public static class SimpleLock extends LockImpl { + public static class SimpleLock extends FeatureLockImpl { private static final long serialVersionUID = 1L; /** @@ -248,17 +186,16 @@ public class SimpleLockManager implements PolicyResourceLockManager { private long holdUntilMs; /** - * Feature containing this lock. May be {@code null} until the feature is - * identified. Note: this can only be null if the lock has been de-serialized. + * Map that should hold this lock. */ - private transient SimpleLockManager feature; + private transient Map<String, SimpleLock> resource2lock; /** * Constructs the object. */ public SimpleLock() { this.holdUntilMs = 0; - this.feature = null; + this.resource2lock = null; } /** @@ -276,7 +213,7 @@ public class SimpleLockManager implements PolicyResourceLockManager { public SimpleLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback, SimpleLockManager feature) { super(state, resourceId, ownerKey, holdSec, callback); - this.feature = feature; + this.resource2lock = feature.resource2lock; } /** @@ -289,62 +226,15 @@ public class SimpleLockManager implements PolicyResourceLockManager { return (holdUntilMs <= currentMs); } - /** - * Grants this lock. The notification is <i>always</i> invoked via a background - * thread. - */ - protected synchronized void grant() { - if (isUnavailable()) { - return; - } - - setState(LockState.ACTIVE); - holdUntilMs = currentTime.getMillis() + TimeUnit.SECONDS.toMillis(getHoldSec()); - - logger.info("lock granted: {}", this); - - feature.exsvc.execute(this::notifyAvailable); - } - - /** - * Permanently denies this lock. The notification is invoked via a background - * thread, if a feature instance is attached, otherwise it uses the foreground - * thread. - * - * @param reason the reason the lock was denied - */ - protected void deny(String reason) { - synchronized (this) { - setState(LockState.UNAVAILABLE); - } - - logger.info("{}: {}", reason, this); - - if (feature == null) { - notifyUnavailable(); - - } else { - feature.exsvc.execute(this::notifyUnavailable); - } - } - @Override public boolean free() { - // do a quick check of the state - if (isUnavailable()) { - return false; - } - - logger.info("releasing lock: {}", this); - - if (!attachFeature()) { - setState(LockState.UNAVAILABLE); + if (!freeAllowed()) { return false; } AtomicBoolean result = new AtomicBoolean(false); - feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> { + resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> { if (curlock == this) { // this lock was the owner - resource is now available @@ -362,46 +252,33 @@ public class SimpleLockManager implements PolicyResourceLockManager { @Override public void extend(int holdSec, LockCallback callback) { - if (holdSec < 0) { - throw new IllegalArgumentException("holdSec is negative"); - } - - setHoldSec(holdSec); - setCallback(callback); - - // do a quick check of the state - if (isUnavailable() || !attachFeature()) { - deny(LOCK_LOST_MSG); + if (!extendAllowed(holdSec, callback)) { return; } - if (feature.resource2lock.get(getResourceId()) == this) { - grant(); + if (resource2lock.get(getResourceId()) == this) { + grant(true); } else { - deny(NOT_LOCKED_MSG); + deny(NOT_LOCKED_MSG, true); } } - /** - * Attaches to the feature instance, if not already attached. - * - * @return {@code true} if the lock is now attached to a feature, {@code false} - * otherwise - */ - private synchronized boolean attachFeature() { - if (feature != null) { - // already attached - return true; - } + @Override + protected void updateGrant() { + holdUntilMs = currentTime.getMillis() + TimeUnit.SECONDS.toMillis(getHoldSec()); + } - feature = latestInstance; + @Override + protected boolean addToFeature() { + SimpleLockManager feature = getLatestInstance(); if (feature == null) { logger.warn("no feature yet for {}", this); return false; } // put this lock into the map - feature.resource2lock.putIfAbsent(getResourceId(), this); + resource2lock = feature.resource2lock; + resource2lock.putIfAbsent(getResourceId(), this); return true; } @@ -415,10 +292,6 @@ public class SimpleLockManager implements PolicyResourceLockManager { // these may be overridden by junit tests - protected ScheduledExecutorService getThreadPool() { - return engine.getExecutorService(); - } - protected SimpleLock makeLock(LockState waiting, String resourceId, String ownerKey, int holdSec, LockCallback callback) { return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, this); diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/FeatureLockImplTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/FeatureLockImplTest.java new file mode 100644 index 00000000..258ee0c5 --- /dev/null +++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/FeatureLockImplTest.java @@ -0,0 +1,415 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.concurrent.ScheduledExecutorService; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockState; +import org.onap.policy.drools.system.PolicyEngineConstants; +import org.powermock.reflect.Whitebox; + +public class FeatureLockImplTest { + private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService"; + private static final String OWNER_KEY = "my key"; + private static final String RESOURCE = "my resource"; + private static final int HOLD_SEC = 100; + private static final int HOLD_SEC2 = 120; + + private static ScheduledExecutorService saveExec; + + @Mock + private ScheduledExecutorService exsvc; + + @Mock + private LockCallback callback; + + + /** + * Saves static fields and configures the location of the property files. + */ + @BeforeClass + public static void setUpBeforeClass() { + saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD); + } + + /** + * Restores static fields. + */ + @AfterClass + public static void tearDownAfterClass() { + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec); + } + + /** + * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute + * tasks. + */ + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc); + } + + @Test + public void testNoArgs() { + MyLock lock = new MyLock(); + assertNull(lock.getResourceId()); + assertNull(lock.getOwnerKey()); + assertNull(lock.getCallback()); + assertEquals(0, lock.getHoldSec()); + } + + @Test + public void testFeatureLockImpl() { + MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback); + assertTrue(lock.isWaiting()); + assertEquals(RESOURCE, lock.getResourceId()); + assertEquals(OWNER_KEY, lock.getOwnerKey()); + assertSame(callback, lock.getCallback()); + assertEquals(HOLD_SEC, lock.getHoldSec()); + } + + @Test + public void testSerializable() throws Exception { + MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback); + lock = roundTrip(lock); + + assertTrue(lock.isWaiting()); + + assertEquals(RESOURCE, lock.getResourceId()); + assertEquals(OWNER_KEY, lock.getOwnerKey()); + assertNull(lock.getCallback()); + assertEquals(HOLD_SEC, lock.getHoldSec()); + } + + /** + * Tests grant(), when using the foreground thread. + */ + @Test + public void testGrantForeground() { + MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback); + lock.grant(true); + + assertTrue(lock.isActive()); + assertEquals(1, lock.nupdates); + + verify(exsvc, never()).execute(any()); + + verify(callback).lockAvailable(any()); + verify(callback, never()).lockUnavailable(any()); + } + + /** + * Tests grant(), when using the background thread. + */ + @Test + public void testGrantBackground() { + MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback); + lock.grant(false); + + assertTrue(lock.isActive()); + assertEquals(1, lock.nupdates); + + invokeCallback(1); + verify(callback).lockAvailable(any()); + verify(callback, never()).lockUnavailable(any()); + } + + /** + * Tests grant() when the lock is already unavailable. + */ + @Test + public void testGrantUnavailable() { + MyLock lock = new MyLock(LockState.UNAVAILABLE, RESOURCE, OWNER_KEY, HOLD_SEC, callback); + lock.setState(LockState.UNAVAILABLE); + lock.grant(true); + + assertTrue(lock.isUnavailable()); + assertEquals(0, lock.nupdates); + + verify(exsvc, never()).execute(any()); + } + + /** + * Tests deny(), when using the foreground thread. + */ + @Test + public void testDenyForeground() { + MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback); + lock.deny("my reason", true); + + assertTrue(lock.isUnavailable()); + + verify(exsvc, never()).execute(any()); + + verify(callback, never()).lockAvailable(any()); + verify(callback).lockUnavailable(any()); + } + + /** + * Tests deny(), when using the background thread. + */ + @Test + public void testDenyBackground() { + MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback); + lock.deny("my reason", false); + + assertTrue(lock.isUnavailable()); + + invokeCallback(1); + verify(callback, never()).lockAvailable(any()); + verify(callback).lockUnavailable(any()); + } + + @Test + public void testFreeAllowed() { + MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback); + assertTrue(lock.freeAllowed()); + } + + /** + * Tests freeAllowed() when the lock is unavailable. + */ + @Test + public void testFreeAllowedUnavailable() { + MyLock lock = new MyLock(LockState.UNAVAILABLE, RESOURCE, OWNER_KEY, HOLD_SEC, callback); + assertFalse(lock.freeAllowed()); + assertTrue(lock.isUnavailable()); + } + + /** + * Tests that free() works on a serialized lock with a new feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testFreeAllowedSerialized() throws Exception { + MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback); + + lock = roundTrip(lock); + assertTrue(lock.freeAllowed()); + } + + /** + * Tests free() on a serialized lock without a feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testFreeAllowedNoFeature() throws Exception { + MyLock lock = new MyLockNoFeature(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback); + + lock = roundTrip(lock); + assertFalse(lock.freeAllowed()); + assertTrue(lock.isUnavailable()); + } + + @Test + public void testExtendAllowed() { + MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback); + + LockCallback scallback = mock(LockCallback.class); + assertTrue(lock.extendAllowed(HOLD_SEC2, scallback)); + assertTrue(lock.isWaiting()); + assertEquals(HOLD_SEC2, lock.getHoldSec()); + assertSame(scallback, lock.getCallback()); + + verify(exsvc, never()).execute(any()); + + // invalid arguments + + // @formatter:off + assertThatIllegalArgumentException().isThrownBy( + () -> new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback) + .extendAllowed(-1, callback)) + .withMessageContaining("holdSec"); + // @formatter:on + } + + /** + * Tests extendAllowed() when the lock is unavailable. + */ + @Test + public void testExtendAllowedUnavailable() { + MyLock lock = new MyLock(LockState.UNAVAILABLE, RESOURCE, OWNER_KEY, HOLD_SEC, callback); + + LockCallback scallback = mock(LockCallback.class); + assertFalse(lock.extendAllowed(HOLD_SEC2, scallback)); + assertTrue(lock.isUnavailable()); + assertEquals(HOLD_SEC2, lock.getHoldSec()); + assertSame(scallback, lock.getCallback()); + + verify(scallback, never()).lockAvailable(lock); + verify(scallback).lockUnavailable(lock); + } + + /** + * Tests that extendAllowed() works on a serialized lock with a new feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testExtendAllowedSerialized() throws Exception { + MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback); + + lock = roundTrip(lock); + + LockCallback scallback = mock(LockCallback.class); + assertTrue(lock.extendAllowed(HOLD_SEC2, scallback)); + assertTrue(lock.isWaiting()); + assertEquals(HOLD_SEC2, lock.getHoldSec()); + assertSame(scallback, lock.getCallback()); + + verify(exsvc, never()).execute(any()); + } + + /** + * Tests extendAllowed() on a serialized lock without a feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testExtendAllowedNoFeature() throws Exception { + MyLock lock = new MyLockNoFeature(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback); + + lock = roundTrip(lock); + + LockCallback scallback = mock(LockCallback.class); + assertFalse(lock.extendAllowed(HOLD_SEC2, scallback)); + assertTrue(lock.isUnavailable()); + assertEquals(HOLD_SEC2, lock.getHoldSec()); + assertSame(scallback, lock.getCallback()); + + verify(scallback, never()).lockAvailable(lock); + verify(scallback).lockUnavailable(lock); + } + + @Test + public void testToString() { + String text = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback).toString(); + assertNotNull(text); + assertThat(text).contains("LockImpl"); + } + + private MyLock roundTrip(MyLock lock) throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(lock); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + try (ObjectInputStream ois = new ObjectInputStream(bais)) { + return (MyLock) ois.readObject(); + } + } + + /** + * Invokes the last call-back in the work queue. + * + * @param nexpected number of call-backs expected in the work queue + */ + private void invokeCallback(int nexpected) { + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(nexpected)).execute(captor.capture()); + + if (nexpected > 0) { + captor.getAllValues().get(nexpected - 1).run(); + } + } + + public static class MyLock extends FeatureLockImpl { + private static final long serialVersionUID = 1L; + private int nupdates = 0; + + public MyLock() { + super(); + } + + public MyLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback) { + super(state, resourceId, ownerKey, holdSec, callback); + } + + @Override + protected void updateGrant() { + super.updateGrant(); + ++nupdates; + } + + @Override + public boolean free() { + return false; + } + + @Override + public void extend(int holdSec, LockCallback callback) { + // do nothing + } + + @Override + protected boolean addToFeature() { + return true; + } + } + + public static class MyLockNoFeature extends MyLock { + private static final long serialVersionUID = 1L; + + public MyLockNoFeature() { + super(); + } + + public MyLockNoFeature(LockState state, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + super(state, resourceId, ownerKey, holdSec, callback); + } + + @Override + protected boolean addToFeature() { + return false; + } + } +} diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/LockManagerTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/LockManagerTest.java new file mode 100644 index 00000000..1cda079d --- /dev/null +++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/LockManagerTest.java @@ -0,0 +1,259 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.ScheduledExecutorService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.drools.core.lock.AlwaysFailLock; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockState; + +public class LockManagerTest { + private static final String OWNER_KEY = "my key"; + private static final String RESOURCE = "my resource"; + private static final String RESOURCE2 = "my resource #2"; + private static final int HOLD_SEC = 100; + + @Mock + private LockCallback callback; + + @Mock + private ScheduledExecutorService exsvc; + + private MyManager mgr; + + /** + * Resets fields and creates {@link #mgr}. + */ + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + doAnswer(args -> { + args.getArgumentAt(0, Runnable.class).run(); + return null; + }).when(exsvc).execute(any()); + + mgr = new MyManager(); + } + + @After + public void tearDown() { + + } + + @Test + public void testIsAlive() { + assertFalse(mgr.isAlive()); + assertFalse(mgr.isLocked()); + + mgr.start(); + assertTrue(mgr.isAlive()); + assertFalse(mgr.isLocked()); + + mgr.stop(); + assertFalse(mgr.isAlive()); + } + + @Test + public void testStart() { + assertTrue(mgr.start()); + assertTrue(mgr.isAlive()); + + assertFalse(mgr.start()); + assertTrue(mgr.isAlive()); + + mgr.stop(); + assertTrue(mgr.start()); + assertTrue(mgr.isAlive()); + } + + @Test + public void testStop() { + assertFalse(mgr.stop()); + + mgr.start(); + assertTrue(mgr.stop()); + assertFalse(mgr.isAlive()); + } + + @Test + public void testShutdown() { + mgr.start(); + mgr.shutdown(); + assertFalse(mgr.isAlive()); + + mgr.shutdown(); + assertFalse(mgr.isAlive()); + } + + @Test + public void testIsLocked() { + assertFalse(mgr.isLocked()); + assertFalse(mgr.isAlive()); + + mgr.lock(); + assertTrue(mgr.isLocked()); + assertFalse(mgr.isAlive()); + + mgr.unlock(); + assertFalse(mgr.isLocked()); + } + + @Test + public void testLock() { + assertTrue(mgr.lock()); + assertTrue(mgr.isLocked()); + + assertFalse(mgr.lock()); + assertTrue(mgr.isLocked()); + + mgr.unlock(); + assertTrue(mgr.lock()); + assertTrue(mgr.isLocked()); + } + + @Test + public void testUnlock() { + assertFalse(mgr.unlock()); + + mgr.lock(); + assertTrue(mgr.unlock()); + assertFalse(mgr.isLocked()); + } + + @Test + public void testCreateLock() { + Lock lock = mgr.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock.isActive()); + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // should not be able to lock it again + LockCallback callback2 = mock(LockCallback.class); + Lock lock2 = mgr.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false); + assertTrue(lock2.isUnavailable()); + verify(callback2, never()).lockAvailable(lock2); + verify(callback2).lockUnavailable(lock2); + + // should be able to lock another resource + LockCallback callback3 = mock(LockCallback.class); + Lock lock3 = mgr.createLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback3, false); + assertTrue(lock3.isActive()); + verify(callback3).lockAvailable(lock3); + verify(callback3, never()).lockUnavailable(lock3); + } + + /** + * Tests createLock() when the feature instance has changed. + */ + @Test + public void testCreateLockInstanceChanged() { + mgr = spy(mgr); + when(mgr.hasInstanceChanged()).thenReturn(true); + + Lock lock = mgr.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock instanceof AlwaysFailLock); + assertTrue(lock.isUnavailable()); + + verify(callback, never()).lockAvailable(lock); + verify(callback).lockUnavailable(lock); + } + + @Test + public void testGetResource2lock() { + assertNotNull(mgr.getResource2lock()); + } + + private class MyManager extends LockManager<MyLock> { + + @Override + protected boolean hasInstanceChanged() { + return false; + } + + @Override + protected void finishLock(MyLock lock) { + lock.grant(true); + } + + @Override + protected MyLock makeLock(LockState waiting, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + return new MyLock(waiting, resourceId, ownerKey, holdSec, callback); + } + + } + + private class MyLock extends FeatureLockImpl { + private static final long serialVersionUID = 1L; + + public MyLock(LockState waiting, String resourceId, String ownerKey, int holdSec, LockCallback callback) { + super(waiting, resourceId, ownerKey, holdSec, callback); + } + + @Override + public boolean free() { + return false; + } + + @Override + public void extend(int holdSec, LockCallback callback) { + // do nothing + } + + @Override + protected boolean addToFeature() { + return false; + } + + @Override + public void notifyAvailable() { + getCallback().lockAvailable(this); + } + + @Override + public void notifyUnavailable() { + getCallback().lockUnavailable(this); + } + + @Override + protected ScheduledExecutorService getThreadPool() { + return exsvc; + } + } +} diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java index 66406898..79e20673 100644 --- a/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java +++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java @@ -63,7 +63,6 @@ import org.onap.policy.common.utils.time.TestTime; import org.onap.policy.drools.core.lock.Lock; import org.onap.policy.drools.core.lock.LockCallback; import org.onap.policy.drools.core.lock.LockState; -import org.onap.policy.drools.system.PolicyEngine; import org.onap.policy.drools.system.PolicyEngineConstants; import org.onap.policy.drools.system.internal.SimpleLockManager.SimpleLock; import org.powermock.reflect.Whitebox; @@ -95,9 +94,6 @@ public class SimpleLockManagerTest { private ScheduledExecutorService exsvc; @Mock - private PolicyEngine engine; - - @Mock private ScheduledFuture<?> future; @Mock @@ -113,7 +109,6 @@ public class SimpleLockManagerTest { saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD); realExec = Executors.newScheduledThreadPool(3); - Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec); } /** @@ -141,7 +136,7 @@ public class SimpleLockManagerTest { Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, testTime); - when(engine.getExecutorService()).thenReturn(exsvc); + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc); feature = new MyLockingFeature(); feature.start(); @@ -156,19 +151,14 @@ public class SimpleLockManagerTest { Properties props = new Properties(); props.setProperty(SimpleLockProperties.EXPIRE_CHECK_SEC, "abc"); - assertThatThrownBy(() -> new MyLockingFeature(engine, props)).isInstanceOf(SimpleLockManagerException.class); + assertThatThrownBy(() -> new MyLockingFeature(props)).isInstanceOf(SimpleLockManagerException.class); } @Test - public void testIsAlive() { + public void testStart() { assertTrue(feature.isAlive()); + verify(exsvc).scheduleWithFixedDelay(any(), anyLong(), anyLong(), any()); - feature.stop(); - assertFalse(feature.isAlive()); - } - - @Test - public void testStart() { assertFalse(feature.start()); feature.stop(); @@ -178,6 +168,7 @@ public class SimpleLockManagerTest { @Test public void testStop() { assertTrue(feature.stop()); + assertFalse(feature.isAlive()); verify(future).cancel(true); assertFalse(feature.stop()); @@ -194,21 +185,12 @@ public class SimpleLockManagerTest { } @Test - public void testLockApi() { - assertFalse(feature.isLocked()); - assertTrue(feature.lock()); - assertTrue(feature.unlock()); - } - - @Test public void testCreateLock() { // this lock should be granted immediately SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); assertTrue(lock.isActive()); assertEquals(testTime.getMillis() + HOLD_MS, lock.getHoldUntilMs()); - invokeCallback(1); - verify(callback).lockAvailable(lock); verify(callback, never()).lockUnavailable(lock); @@ -218,8 +200,6 @@ public class SimpleLockManagerTest { assertFalse(lock2.isActive()); assertTrue(lock2.isUnavailable()); - invokeCallback(2); - verify(callback, never()).lockAvailable(lock2); verify(callback).lockUnavailable(lock2); @@ -231,13 +211,12 @@ public class SimpleLockManagerTest { // should work with "true" value also Lock lock3 = feature.createLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, true); assertTrue(lock3.isActive()); - invokeCallback(3); verify(callback).lockAvailable(lock3); verify(callback, never()).lockUnavailable(lock3); } /** - * Tests lock() when the feature is not the latest instance. + * Tests createLock() when the feature is not the latest instance. */ @Test public void testCreateLockNotLatestInstance() { @@ -275,7 +254,7 @@ public class SimpleLockManagerTest { // run the callbacks captor = ArgumentCaptor.forClass(Runnable.class); - verify(exsvc, times(5)).execute(captor.capture()); + verify(exsvc, times(2)).execute(captor.capture()); captor.getAllValues().forEach(Runnable::run); verify(callback).lockUnavailable(lock); verify(callback).lockUnavailable(lock2); @@ -295,7 +274,7 @@ public class SimpleLockManagerTest { // run the callback captor = ArgumentCaptor.forClass(Runnable.class); - verify(exsvc, times(9)).execute(captor.capture()); + verify(exsvc, times(3)).execute(captor.capture()); captor.getValue().run(); verify(callback).lockUnavailable(lock3); } @@ -328,7 +307,6 @@ public class SimpleLockManagerTest { feature.start(); feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); - invokeCallback(1); ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); verify(exsvc).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any()); @@ -339,14 +317,12 @@ public class SimpleLockManagerTest { // lock should now be gone and we should be able to get another feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); - invokeCallback(2); // should have succeeded twice verify(callback, times(2)).lockAvailable(any()); // lock should not be available now feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); - invokeCallback(3); verify(callback).lockUnavailable(any()); } @@ -389,7 +365,6 @@ public class SimpleLockManagerTest { feature.start(); feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); - invokeCallback(1); ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); verify(exsvc).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any()); @@ -400,14 +375,13 @@ public class SimpleLockManagerTest { // lock should not be available now feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); - invokeCallback(3); verify(callback).lockUnavailable(any()); } @Test public void testGetThreadPool() { // use a real feature - feature = new SimpleLockManager(engine, new Properties()); + feature = new SimpleLockManager(null, new Properties()); // load properties feature.start(); @@ -459,45 +433,28 @@ public class SimpleLockManagerTest { @Test public void testSimpleLockExpired() { SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); - lock.grant(); + lock.grant(true); assertFalse(lock.expired(testTime.getMillis())); assertFalse(lock.expired(testTime.getMillis() + HOLD_MS - 1)); assertTrue(lock.expired(testTime.getMillis() + HOLD_MS)); } - /** - * Tests grant() when the lock is already unavailable. - */ - @Test - public void testSimpleLockGrantUnavailable() { - SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); - lock.setState(LockState.UNAVAILABLE); - lock.grant(); - - assertTrue(lock.isUnavailable()); - verify(callback, never()).lockAvailable(any()); - verify(callback, never()).lockUnavailable(any()); - } - @Test public void testSimpleLockFree() { final SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); // lock2 should be denied SimpleLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); - invokeCallback(2); verify(callback, never()).lockAvailable(lock2); verify(callback).lockUnavailable(lock2); // lock2 was denied, so nothing new should happen when freed assertFalse(lock2.free()); - invokeCallback(2); // force lock2 to be active - still nothing should happen Whitebox.setInternalState(lock2, "state", LockState.ACTIVE); assertFalse(lock2.free()); - invokeCallback(2); // now free the first lock assertTrue(lock.free()); @@ -506,6 +463,9 @@ public class SimpleLockManagerTest { // should be able to get the lock now SimpleLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); assertTrue(lock3.isActive()); + + verify(callback).lockAvailable(lock3); + verify(callback, never()).lockUnavailable(lock3); } /** @@ -525,41 +485,22 @@ public class SimpleLockManagerTest { assertTrue(lock.isUnavailable()); } - /** - * Tests free() on a serialized lock without a feature. - * - * @throws Exception if an error occurs - */ - @Test - public void testSimpleLockFreeNoFeature() throws Exception { - SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); - - SimpleLockManager.setLatestInstance(null); - - lock = roundTrip(lock); - assertFalse(lock.free()); - assertTrue(lock.isUnavailable()); - } - @Test public void testSimpleLockExtend() { final SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); // lock2 should be denied SimpleLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); - invokeCallback(2); verify(callback, never()).lockAvailable(lock2); verify(callback).lockUnavailable(lock2); // lock2 will still be denied lock2.extend(HOLD_SEC, callback); - invokeCallback(3); verify(callback, times(2)).lockUnavailable(lock2); // force lock2 to be active - should still be denied Whitebox.setInternalState(lock2, "state", LockState.ACTIVE); lock2.extend(HOLD_SEC, callback); - invokeCallback(4); verify(callback, times(3)).lockUnavailable(lock2); assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback)) @@ -569,8 +510,7 @@ public class SimpleLockManagerTest { lock.extend(HOLD_SEC2, callback); assertEquals(HOLD_SEC2, lock.getHoldSec()); assertEquals(testTime.getMillis() + HOLD_MS2, lock.getHoldUntilMs()); - invokeCallback(5); - verify(callback).lockAvailable(lock); + verify(callback, times(2)).lockAvailable(lock); verify(callback, never()).lockUnavailable(lock); } @@ -592,13 +532,12 @@ public class SimpleLockManagerTest { lock.extend(HOLD_SEC, scallback); assertTrue(lock.isActive()); - invokeCallback(1); verify(scallback).lockAvailable(lock); verify(scallback, never()).lockUnavailable(lock); } /** - * Tests extend() on a serialized lock without a feature. + * Tests that extend() fails when there is no feature. * * @throws Exception if an error occurs */ @@ -614,7 +553,6 @@ public class SimpleLockManagerTest { lock.extend(HOLD_SEC, scallback); assertTrue(lock.isUnavailable()); - invokeCallback(1); verify(scallback, never()).lockAvailable(lock); verify(scallback).lockUnavailable(lock); } @@ -635,7 +573,8 @@ public class SimpleLockManagerTest { @Test public void testMultiThreaded() throws InterruptedException { Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, testTime); - feature = new SimpleLockManager(PolicyEngineConstants.getManager(), new Properties()); + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec); + feature = new SimpleLockManager(null, new Properties()); feature.start(); List<MyThread> threads = new ArrayList<>(MAX_THREADS); @@ -677,33 +616,19 @@ public class SimpleLockManagerTest { } /** - * Invokes the last call-back in the work queue. - * - * @param nexpected number of call-backs expected in the work queue - */ - private void invokeCallback(int nexpected) { - ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); - verify(exsvc, times(nexpected)).execute(captor.capture()); - - if (nexpected > 0) { - captor.getAllValues().get(nexpected - 1).run(); - } - } - - /** * Feature that uses <i>exsvc</i> to execute requests. */ private class MyLockingFeature extends SimpleLockManager { public MyLockingFeature() { - this(engine, new Properties()); + this(new Properties()); } - public MyLockingFeature(PolicyEngine engine, Properties props) { - super(engine, props); + public MyLockingFeature(Properties props) { + super(null, props); exsvc = mock(ScheduledExecutorService.class); - when(engine.getExecutorService()).thenReturn(exsvc); + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc); when(exsvc.scheduleWithFixedDelay(any(), anyLong(), anyLong(), any())).thenAnswer(answer -> { return future; |