From 6e0b450abe7e62fa47ffe14e95a67d035174dbdb Mon Sep 17 00:00:00 2001 From: Jim Hahn Date: Tue, 24 Sep 2019 10:51:21 -0400 Subject: Reimplement Lock API using Lock objects Modified PolicyResourceLockManager to just return a feature, deferring the lock() call/method to the feature, itself. The manager was also modified so that, if it can't find an enabled provider, it will return a default provider, whose lock() methods always fail. Once a feature has been identified, the manager will cache it for use thereafter. Modified the feature API to return lock objects and simplified the interface to remove the beforeXxx and afterXxx methods. Moved the unlock and refresh methods from the feature API into the lock class, renaming them to free and extend, respectively. Added a separate, feature-simple-locking project, which implements a simple version of the locking feature, over a single JVM. Extensively revised the distributed locking feature to fit in with the new API. Added support for persistence so that the various LockImpl classes can be serialized and still function correctly when they are deserialized back into new feature instances Added default implementations of free & extend to LockImpl. Modified API to take the ownerKey string, instead of the owner object. Removed Extractor as unneeded - may add via another review, if still useful. Updates per review comments: - Updated licenses in feature-simple-locking - Added beforeCreateLock & afterCreateLock to feature API - Moved SimpleLockingFeature into policy-management so that it's always available - Moved the executor service, "exsvc", into PolicyEngine - Moved Extrator into policy-utils - Changed Extractor logging level for exceptions - Fixed feature sequence numbers - Fixed mixing of seconds and milliseconds - Renamed exsvc - Modified to use property method with default value - Configured scheduled executor - Added suffix to Extractor.register() - Eliminated Feature Api and tied lock manager into engine - Added non-null checks to LockImpl parameters - Added non-null checks to createLock() parameters - Checked that lockManager is initialized Change-Id: Iddba38157ddc5f7277656979c0e679e5489eb7b1 Issue-ID: POLICY-2113 Signed-off-by: Jim Hahn --- .../drools/features/PolicyEngineFeatureApi.java | 23 + .../onap/policy/drools/system/PolicyEngine.java | 33 + .../policy/drools/system/PolicyEngineManager.java | 139 +++- .../drools/system/internal/SimpleLockManager.java | 426 +++++++++++ .../internal/SimpleLockManagerException.java | 34 + .../system/internal/SimpleLockProperties.java | 52 ++ .../drools/system/PolicyEngineManagerTest.java | 164 ++++- .../internal/SimpleLockManagerExceptionTest.java | 35 + .../system/internal/SimpleLockManagerTest.java | 781 +++++++++++++++++++++ 9 files changed, 1683 insertions(+), 4 deletions(-) create mode 100644 policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java create mode 100644 policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManagerException.java create mode 100644 policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockProperties.java create mode 100644 policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerExceptionTest.java create mode 100644 policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java (limited to 'policy-management/src') diff --git a/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java b/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java index fe31eb50..87001ad6 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java +++ b/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java @@ -23,6 +23,7 @@ package org.onap.policy.drools.features; import java.util.Properties; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.utils.services.OrderedService; +import org.onap.policy.drools.core.lock.PolicyResourceLockManager; import org.onap.policy.drools.protocol.configuration.PdpdConfiguration; import org.onap.policy.drools.system.PolicyEngine; @@ -271,4 +272,26 @@ public interface PolicyEngineFeatureApi extends OrderedService { default boolean afterOpen(PolicyEngine engine) { return false; } + + /** + * Called before the PolicyEngine creates a lock manager. + * + * @return a lock manager if this feature intercepts and takes ownership of the + * operation preventing the invocation of lower priority features. Null, + * otherwise + */ + default PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) { + return null; + } + + /** + * Called after the PolicyEngine creates a lock manager. + * + * @return True if this feature intercepts and takes ownership of the operation + * preventing the invocation of lower priority features. False, otherwise + */ + default boolean afterCreateLockManager(PolicyEngine engine, Properties properties, + PolicyResourceLockManager lockManager) { + return false; + } } diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java index 653ff72e..cb0749d9 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java @@ -22,6 +22,7 @@ package org.onap.policy.drools.system; import java.util.List; import java.util.Properties; +import java.util.concurrent.ScheduledExecutorService; import org.onap.policy.common.capabilities.Lockable; import org.onap.policy.common.capabilities.Startable; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; @@ -29,6 +30,8 @@ import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.http.server.HttpServletServer; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; import org.onap.policy.drools.features.PolicyEngineFeatureApi; import org.onap.policy.drools.protocol.configuration.ControllerConfiguration; import org.onap.policy.drools.protocol.configuration.PdpdConfiguration; @@ -197,6 +200,11 @@ public interface PolicyEngine extends Startable, Lockable, TopicListener { */ List getHttpServers(); + /** + * Gets a thread pool that can be used to execute background tasks. + */ + ScheduledExecutorService getExecutorService(); + /** * get properties configuration. * @@ -279,6 +287,31 @@ public interface PolicyEngine extends Startable, Lockable, TopicListener { */ boolean deliver(CommInfrastructure busType, String topic, String event); + /** + * Requests a lock on a resource. Typically, the lock is not immediately granted, + * though a "lock" object is always returned. Once the lock has been granted (or + * denied), the callback will be invoked to indicate the result. + * + *

+ * Notes: + *

+ *
  • The callback may be invoked before this method returns
  • + *
  • The implementation need not honor waitForLock={@code true}
  • + *
    + * + * @param resourceId identifier of the resource to be locked + * @param ownerKey information identifying the owner requesting the lock + * @param holdSec amount of time, in seconds, for which the lock should be held once + * it has been granted, after which it will automatically be released + * @param callback callback to be invoked once the lock is granted, or subsequently + * lost; must not be {@code null} + * @param waitForLock {@code true} to wait for the lock, if it is currently locked, + * {@code false} otherwise + * @return a new lock + */ + public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback, + boolean waitForLock); + /** * Invoked when the host goes into the active state. */ diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java index 766848c6..c924fd69 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java @@ -24,7 +24,6 @@ import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERV import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME; import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_PORT; -import com.att.aft.dme2.internal.apache.commons.lang3.StringUtils; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.gson.Gson; @@ -32,11 +31,16 @@ import com.google.gson.GsonBuilder; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Stream; +import lombok.AccessLevel; import lombok.Getter; +import lombok.NonNull; +import org.apache.commons.lang.StringUtils; import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; @@ -54,6 +58,9 @@ import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.controller.DroolsControllerConstants; import org.onap.policy.drools.core.PolicyContainer; import org.onap.policy.drools.core.jmx.PdpJmxListener; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.PolicyResourceLockManager; import org.onap.policy.drools.features.PolicyControllerFeatureApi; import org.onap.policy.drools.features.PolicyControllerFeatureApiConstants; import org.onap.policy.drools.features.PolicyEngineFeatureApi; @@ -67,6 +74,7 @@ import org.onap.policy.drools.protocol.configuration.ControllerConfiguration; import org.onap.policy.drools.protocol.configuration.PdpdConfiguration; import org.onap.policy.drools.server.restful.RestManager; import org.onap.policy.drools.server.restful.aaf.AafTelemetryAuthFilter; +import org.onap.policy.drools.system.internal.SimpleLockManager; import org.onap.policy.drools.utils.PropertyUtil; import org.onap.policy.drools.utils.logging.LoggerUtil; import org.onap.policy.drools.utils.logging.MdcTransaction; @@ -78,7 +86,6 @@ import org.slf4j.LoggerFactory; * Policy Engine Manager Implementation. */ class PolicyEngineManager implements PolicyEngine { - /** * String literals. */ @@ -88,6 +95,9 @@ class PolicyEngineManager implements PolicyEngine { private static final String ENGINE_STOPPED_MSG = "Policy Engine is stopped"; private static final String ENGINE_LOCKED_MSG = "Policy Engine is locked"; + public static final String EXECUTOR_THREAD_PROP = "executor.threads"; + protected static final int DEFAULT_EXECUTOR_THREADS = 5; + /** * logger. */ @@ -133,6 +143,17 @@ class PolicyEngineManager implements PolicyEngine { @Getter private List httpServers = new ArrayList<>(); + /** + * Thread pool used to execute background tasks. + */ + private ScheduledExecutorService executorService = null; + + /** + * Lock manager used to create locks. + */ + @Getter(AccessLevel.PROTECTED) + private PolicyResourceLockManager lockManager = null; + /** * gson parser to decode configuration requests. */ @@ -213,6 +234,53 @@ class PolicyEngineManager implements PolicyEngine { return defaultConfig; } + @Override + @JsonIgnore + @GsonJsonIgnore + public ScheduledExecutorService getExecutorService() { + return executorService; + } + + private ScheduledExecutorService makeExecutorService(Properties properties) { + int nthreads = DEFAULT_EXECUTOR_THREADS; + try { + nthreads = Integer.valueOf( + properties.getProperty(EXECUTOR_THREAD_PROP, String.valueOf(DEFAULT_EXECUTOR_THREADS))); + + } catch (NumberFormatException e) { + logger.error("invalid number for " + EXECUTOR_THREAD_PROP + " property", e); + } + + return makeScheduledExecutor(nthreads); + } + + private void createLockManager(Properties properties) { + for (PolicyEngineFeatureApi feature : getEngineProviders()) { + try { + this.lockManager = feature.beforeCreateLockManager(this, properties); + if (this.lockManager != null) { + return; + } + } catch (RuntimeException e) { + logger.error("{}: feature {} before-create-lock-manager failure because of {}", this, + feature.getClass().getName(), e.getMessage(), e); + } + } + + try { + this.lockManager = new SimpleLockManager(this, properties); + } catch (RuntimeException e) { + logger.error("{}: cannot create simple lock manager because of {}", this, e.getMessage(), e); + this.lockManager = new SimpleLockManager(this, new Properties()); + } + + /* policy-engine dispatch post operation hook */ + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterCreateLockManager(this, properties, this.lockManager), + (feature, ex) -> logger.error("{}: feature {} after-create-lock-manager failure because of {}", + this, feature.getClass().getName(), ex.getMessage(), ex)); + } + @Override public synchronized void configure(Properties properties) { @@ -257,6 +325,10 @@ class PolicyEngineManager implements PolicyEngine { logger.error("{}: add-http-servers failed", this, e); } + executorService = makeExecutorService(properties); + + createLockManager(properties); + /* policy-engine dispatch post configure hook */ FeatureApiUtils.apply(getEngineProviders(), feature -> feature.afterConfigure(this), @@ -499,6 +571,13 @@ class PolicyEngineManager implements PolicyEngine { AtomicReference success = new AtomicReference<>(true); + try { + success.compareAndSet(true, this.lockManager.start()); + } catch (final RuntimeException e) { + logger.warn("{}: cannot start lock manager because of {}", this, e.getMessage(), e); + success.set(false); + } + /* Start Policy Engine exclusively-owned (unmanaged) http servers */ attempt(success, this.httpServers, @@ -639,9 +718,16 @@ class PolicyEngineManager implements PolicyEngine { (item, ex) -> logger.error("{}: cannot stop http-server {} because of {}", this, item, ex.getMessage(), ex)); + try { + success.compareAndSet(true, this.lockManager.stop()); + } catch (final RuntimeException e) { + logger.warn("{}: cannot stop lock manager because of {}", this, e.getMessage(), e); + success.set(false); + } + // stop JMX? - /* policy-engine dispatch pre stop hook */ + /* policy-engine dispatch post stop hook */ FeatureApiUtils.apply(getEngineProviders(), feature -> feature.afterStop(this), (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this, @@ -688,6 +774,14 @@ class PolicyEngineManager implements PolicyEngine { getTopicEndpointManager().shutdown(); getServletFactory().destroy(); + try { + this.lockManager.shutdown(); + } catch (final RuntimeException e) { + logger.warn("{}: cannot shutdown lock manager because of {}", this, e.getMessage(), e); + } + + executorService.shutdownNow(); + // Stop the JMX listener stopPdpJmxListener(); @@ -806,6 +900,13 @@ class PolicyEngineManager implements PolicyEngine { success = getTopicEndpointManager().lock() && success; + try { + success = (this.lockManager == null || this.lockManager.lock()) && success; + } catch (final RuntimeException e) { + logger.warn("{}: cannot lock() lock manager because of {}", this, e.getMessage(), e); + success = false; + } + /* policy-engine dispatch post lock hook */ FeatureApiUtils.apply(getEngineProviders(), feature -> feature.afterLock(this), @@ -833,6 +934,14 @@ class PolicyEngineManager implements PolicyEngine { this.locked = false; boolean success = true; + + try { + success = (this.lockManager == null || this.lockManager.unlock()) && success; + } catch (final RuntimeException e) { + logger.warn("{}: cannot unlock() lock manager because of {}", this, e.getMessage(), e); + success = false; + } + final List controllers = getControllerFactory().inventory(); for (final PolicyController controller : controllers) { try { @@ -1167,6 +1276,21 @@ class PolicyEngineManager implements PolicyEngine { feature.getClass().getName(), ex.getMessage(), ex)); } + @Override + public Lock createLock(@NonNull String resourceId, @NonNull String ownerKey, int holdSec, + @NonNull LockCallback callback, boolean waitForLock) { + + if (holdSec < 0) { + throw new IllegalArgumentException("holdSec is negative"); + } + + if (lockManager == null) { + throw new IllegalStateException("lock manager has not been initialized"); + } + + return lockManager.createLock(resourceId, ownerKey, holdSec, callback, waitForLock); + } + private boolean controllerConfig(PdpdConfiguration config) { /* only this one supported for now */ final List configControllers = config.getControllers(); @@ -1234,4 +1358,13 @@ class PolicyEngineManager implements PolicyEngine { protected PolicyEngine getPolicyEngine() { return PolicyEngineConstants.getManager(); } + + protected ScheduledExecutorService makeScheduledExecutor(int nthreads) { + ScheduledThreadPoolExecutor exsvc = new ScheduledThreadPoolExecutor(nthreads); + exsvc.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + exsvc.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + exsvc.setRemoveOnCancelPolicy(true); + + return exsvc; + } } diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java new file mode 100644 index 00000000..f5163e9b --- /dev/null +++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java @@ -0,0 +1,426 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.utils.properties.exception.PropertyException; +import org.onap.policy.common.utils.time.CurrentTime; +import org.onap.policy.drools.core.lock.AlwaysFailLock; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockImpl; +import org.onap.policy.drools.core.lock.LockState; +import org.onap.policy.drools.core.lock.PolicyResourceLockManager; +import org.onap.policy.drools.system.PolicyEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Simple implementation of the Lock Feature. Locks do not span across instances of this + * object (i.e., locks do not span across servers). + * + *

    + * Note: this implementation does not honor the waitForLocks={@code true} + * parameter. + * + *

    + * When a lock is deserialized, it will not initially appear in this feature's map; it + * will be added to the map once free() or extend() is invoked, provided there isn't + * already an entry. + */ +public class SimpleLockManager implements PolicyResourceLockManager { + + private static final Logger logger = LoggerFactory.getLogger(SimpleLockManager.class); + + private static final String NOT_LOCKED_MSG = "not locked"; + private static final String LOCK_LOST_MSG = "lock lost"; + + /** + * Provider of current time. May be overridden by junit tests. + */ + private static CurrentTime currentTime = new CurrentTime(); + + @Getter(AccessLevel.PROTECTED) + @Setter(AccessLevel.PROTECTED) + private static SimpleLockManager latestInstance = null; + + + /** + * Engine with which this manager is associated. + */ + private final PolicyEngine engine; + + /** + * Feature properties. + */ + private final SimpleLockProperties featProps; + + /** + * Maps a resource to the lock that owns it. + */ + private final Map resource2lock = new ConcurrentHashMap<>(); + + /** + * Thread pool used to check for lock expiration and to notify owners when locks are + * lost. + */ + private ScheduledExecutorService exsvc = null; + + /** + * Used to cancel the expiration checker on shutdown. + */ + private ScheduledFuture checker = null; + + + /** + * Constructs the object. + * + * @param engine engine with which this manager is associated + * @param properties properties used to configure the manager + */ + public SimpleLockManager(PolicyEngine engine, Properties properties) { + try { + this.engine = engine; + this.featProps = new SimpleLockProperties(properties); + + } catch (PropertyException e) { + throw new SimpleLockManagerException(e); + } + } + + @Override + public boolean isAlive() { + return (checker != null); + } + + @Override + public boolean start() { + if (checker != null) { + return false; + } + + exsvc = getThreadPool(); + + checker = exsvc.scheduleWithFixedDelay(this::checkExpired, featProps.getExpireCheckSec(), + featProps.getExpireCheckSec(), TimeUnit.SECONDS); + + setLatestInstance(this); + + return true; + } + + /** + * Stops the expiration checker. Does not invoke any lock call-backs. + */ + @Override + public synchronized boolean stop() { + exsvc = null; + + if (checker == null) { + return false; + } + + ScheduledFuture checker2 = checker; + checker = null; + + checker2.cancel(true); + + return true; + } + + @Override + public void shutdown() { + stop(); + } + + @Override + public boolean isLocked() { + return false; + } + + @Override + public boolean lock() { + return true; + } + + @Override + public boolean unlock() { + return true; + } + + @Override + public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback, + boolean waitForLock) { + + if (latestInstance != this) { + AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback); + lock.notifyUnavailable(); + return lock; + } + + SimpleLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback); + + SimpleLock existingLock = resource2lock.putIfAbsent(resourceId, lock); + + if (existingLock == null) { + lock.grant(); + } else { + lock.deny("resource is busy"); + } + + return lock; + } + + /** + * Checks for expired locks. + */ + private void checkExpired() { + long currentMs = currentTime.getMillis(); + logger.info("checking for expired locks at {}", currentMs); + + /* + * Could do this via an iterator, but using compute() guarantees that the lock + * doesn't get extended while it's being removed from the map. + */ + for (Entry ent : resource2lock.entrySet()) { + if (!ent.getValue().expired(currentMs)) { + continue; + } + + AtomicReference lockref = new AtomicReference<>(null); + + resource2lock.computeIfPresent(ent.getKey(), (resourceId, lock) -> { + if (lock.expired(currentMs)) { + lockref.set(lock); + return null; + } + + return lock; + }); + + SimpleLock lock = lockref.get(); + if (lock != null) { + lock.deny("lock expired"); + } + } + } + + /** + * Simple Lock implementation. + */ + public static class SimpleLock extends LockImpl { + private static final long serialVersionUID = 1L; + + /** + * Time, in milliseconds, when the lock expires. + */ + @Getter + private long holdUntilMs; + + /** + * Feature containing this lock. May be {@code null} until the feature is + * identified. Note: this can only be null if the lock has been de-serialized. + */ + private transient SimpleLockManager feature; + + /** + * Constructs the object. + */ + public SimpleLock() { + this.holdUntilMs = 0; + this.feature = null; + } + + /** + * Constructs the object. + * + * @param state initial state of the lock + * @param resourceId identifier of the resource to be locked + * @param ownerKey information identifying the owner requesting the lock + * @param holdSec amount of time, in seconds, for which the lock should be held, + * after which it will automatically be released + * @param callback callback to be invoked once the lock is granted, or + * subsequently lost; must not be {@code null} + * @param feature feature containing this lock + */ + public SimpleLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback, + SimpleLockManager feature) { + super(state, resourceId, ownerKey, holdSec, callback); + this.feature = feature; + } + + /** + * Determines if the owner's lock has expired. + * + * @param currentMs current time, in milliseconds + * @return {@code true} if the owner's lock has expired, {@code false} otherwise + */ + public boolean expired(long currentMs) { + return (holdUntilMs <= currentMs); + } + + /** + * Grants this lock. The notification is always invoked via a background + * thread. + */ + protected synchronized void grant() { + if (isUnavailable()) { + return; + } + + setState(LockState.ACTIVE); + holdUntilMs = currentTime.getMillis() + TimeUnit.SECONDS.toMillis(getHoldSec()); + + logger.info("lock granted: {}", this); + + feature.exsvc.execute(this::notifyAvailable); + } + + /** + * Permanently denies this lock. The notification is invoked via a background + * thread, if a feature instance is attached, otherwise it uses the foreground + * thread. + * + * @param reason the reason the lock was denied + */ + protected void deny(String reason) { + synchronized (this) { + setState(LockState.UNAVAILABLE); + } + + logger.info("{}: {}", reason, this); + + if (feature == null) { + notifyUnavailable(); + + } else { + feature.exsvc.execute(this::notifyUnavailable); + } + } + + @Override + public boolean free() { + // do a quick check of the state + if (isUnavailable()) { + return false; + } + + logger.info("releasing lock: {}", this); + + if (!attachFeature()) { + setState(LockState.UNAVAILABLE); + return false; + } + + AtomicBoolean result = new AtomicBoolean(false); + + feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> { + + if (curlock == this) { + // this lock was the owner - resource is now available + result.set(true); + setState(LockState.UNAVAILABLE); + return null; + + } else { + return curlock; + } + }); + + return result.get(); + } + + @Override + public void extend(int holdSec, LockCallback callback) { + if (holdSec < 0) { + throw new IllegalArgumentException("holdSec is negative"); + } + + setHoldSec(holdSec); + setCallback(callback); + + // do a quick check of the state + if (isUnavailable() || !attachFeature()) { + deny(LOCK_LOST_MSG); + return; + } + + if (feature.resource2lock.get(getResourceId()) == this) { + grant(); + } else { + deny(NOT_LOCKED_MSG); + } + } + + /** + * Attaches to the feature instance, if not already attached. + * + * @return {@code true} if the lock is now attached to a feature, {@code false} + * otherwise + */ + private synchronized boolean attachFeature() { + if (feature != null) { + // already attached + return true; + } + + feature = latestInstance; + if (feature == null) { + logger.warn("no feature yet for {}", this); + return false; + } + + // put this lock into the map + feature.resource2lock.putIfAbsent(getResourceId(), this); + + return true; + } + + @Override + public String toString() { + return "SimpleLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey=" + getOwnerKey() + + ", holdSec=" + getHoldSec() + ", holdUntilMs=" + holdUntilMs + "]"; + } + } + + // these may be overridden by junit tests + + protected ScheduledExecutorService getThreadPool() { + return engine.getExecutorService(); + } + + protected SimpleLock makeLock(LockState waiting, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, this); + } +} diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManagerException.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManagerException.java new file mode 100644 index 00000000..ff02f39e --- /dev/null +++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManagerException.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +public class SimpleLockManagerException extends RuntimeException { + private static final long serialVersionUID = 1L; + + /** + * Constructor. + * + * @param ex exception to be wrapped + */ + public SimpleLockManagerException(Exception ex) { + super(ex); + } +} diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockProperties.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockProperties.java new file mode 100644 index 00000000..0d1ca89b --- /dev/null +++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockProperties.java @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +import java.util.Properties; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.utils.properties.BeanConfigurator; +import org.onap.policy.common.utils.properties.Property; +import org.onap.policy.common.utils.properties.exception.PropertyException; + + +@Getter +@Setter +public class SimpleLockProperties { + public static final String PREFIX = "simple.locking."; + public static final String EXPIRE_CHECK_SEC = PREFIX + "expire.check.seconds"; + + /** + * Time, in seconds, to wait between checks for expired locks. + */ + @Property(name = EXPIRE_CHECK_SEC, defaultValue = "900") + private int expireCheckSec; + + /** + * Constructs the object, populating fields from the properties. + * + * @param props properties from which to configure this + * @throws PropertyException if an error occurs + */ + public SimpleLockProperties(Properties props) throws PropertyException { + new BeanConfigurator().configureFromProperties(this, props); + } +} diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java index 5e0ead9d..fe1a2345 100644 --- a/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java +++ b/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java @@ -27,12 +27,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -41,6 +43,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; import org.junit.Before; @@ -53,6 +56,10 @@ import org.onap.policy.common.endpoints.http.server.HttpServletServer; import org.onap.policy.common.endpoints.http.server.HttpServletServerFactory; import org.onap.policy.common.utils.gson.GsonTestUtils; import org.onap.policy.drools.controller.DroolsController; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockImpl; +import org.onap.policy.drools.core.lock.PolicyResourceLockManager; import org.onap.policy.drools.features.PolicyControllerFeatureApi; import org.onap.policy.drools.features.PolicyEngineFeatureApi; import org.onap.policy.drools.persistence.SystemPersistence; @@ -61,9 +68,10 @@ import org.onap.policy.drools.protocol.coders.EventProtocolCoder; import org.onap.policy.drools.protocol.configuration.ControllerConfiguration; import org.onap.policy.drools.protocol.configuration.DroolsConfiguration; import org.onap.policy.drools.protocol.configuration.PdpdConfiguration; +import org.onap.policy.drools.system.internal.SimpleLockManager; +import org.onap.policy.drools.system.internal.SimpleLockProperties; public class PolicyEngineManagerTest { - private static final String EXPECTED = "expected exception"; private static final String NOOP_STR = CommInfrastructure.NOOP.name(); @@ -76,6 +84,8 @@ public class PolicyEngineManagerTest { private static final String FEATURE2 = "feature-b"; private static final String MY_TOPIC = "my-topic"; private static final String MESSAGE = "my-message"; + private static final String MY_OWNER = "my-owner"; + private static final String MY_RESOURCE = "my-resource"; private static final Object MY_EVENT = new Object(); @@ -125,6 +135,8 @@ public class PolicyEngineManagerTest { private PdpdConfiguration pdpConfig; private String pdpConfigJson; private PolicyEngineManager mgr; + private ScheduledExecutorService exsvc; + private PolicyResourceLockManager lockmgr; /** * Initializes the object to be tested. @@ -176,6 +188,15 @@ public class PolicyEngineManagerTest { config3 = new ControllerConfiguration(); config4 = new ControllerConfiguration(); pdpConfig = new PdpdConfiguration(); + exsvc = mock(ScheduledExecutorService.class); + lockmgr = mock(PolicyResourceLockManager.class); + + when(lockmgr.start()).thenReturn(true); + when(lockmgr.stop()).thenReturn(true); + when(lockmgr.lock()).thenReturn(true); + when(lockmgr.unlock()).thenReturn(true); + + when(prov2.beforeCreateLockManager(any(), any())).thenReturn(lockmgr); when(prov1.getName()).thenReturn(FEATURE1); when(prov2.getName()).thenReturn(FEATURE2); @@ -387,6 +408,86 @@ public class PolicyEngineManagerTest { assertFalse(config.isEmpty()); } + /** + * Tests that makeExecutorService() uses the value from the thread + * property. + */ + @Test + public void testMakeExecutorServicePropertyProvided() { + PolicyEngineManager mgrspy = spy(mgr); + + properties.setProperty(PolicyEngineManager.EXECUTOR_THREAD_PROP, "3"); + mgrspy.configure(properties); + assertSame(exsvc, mgrspy.getExecutorService()); + verify(mgrspy).makeScheduledExecutor(3); + } + + /** + * Tests that makeExecutorService() uses the default thread count when no thread + * property is provided. + */ + @Test + public void testMakeExecutorServiceNoProperty() { + PolicyEngineManager mgrspy = spy(mgr); + + mgrspy.configure(properties); + assertSame(exsvc, mgrspy.getExecutorService()); + verify(mgrspy).makeScheduledExecutor(PolicyEngineManager.DEFAULT_EXECUTOR_THREADS); + } + + /** + * Tests that makeExecutorService() uses the default thread count when the thread + * property is invalid. + */ + @Test + public void testMakeExecutorServiceInvalidProperty() { + PolicyEngineManager mgrspy = spy(mgr); + + properties.setProperty(PolicyEngineManager.EXECUTOR_THREAD_PROP, "abc"); + mgrspy.configure(properties); + assertSame(exsvc, mgrspy.getExecutorService()); + verify(mgrspy).makeScheduledExecutor(PolicyEngineManager.DEFAULT_EXECUTOR_THREADS); + } + + /** + * Tests createLockManager() when beforeCreateLock throws an exception and returns a + * manager. + */ + @Test + public void testCreateLockManagerHaveProvider() { + // first provider throws an exception + when(prov1.beforeCreateLockManager(any(), any())).thenThrow(new RuntimeException(EXPECTED)); + + mgr.configure(properties); + assertSame(lockmgr, mgr.getLockManager()); + } + + /** + * Tests createLockManager() when SimpleLockManager throws an exception. + */ + @Test + public void testCreateLockManagerSimpleEx() { + when(prov2.beforeCreateLockManager(any(), any())).thenReturn(null); + + // invalid property for SimpleLockManager + properties.setProperty(SimpleLockProperties.EXPIRE_CHECK_SEC, "abc"); + mgr.configure(properties); + + // should create a manager using default properties + assertTrue(mgr.getLockManager() instanceof SimpleLockManager); + } + + /** + * Tests createLockManager() when SimpleLockManager is returned. + */ + @Test + public void testCreateLockManagerSimple() { + when(prov2.beforeCreateLockManager(any(), any())).thenReturn(null); + + mgr.configure(properties); + assertTrue(mgr.getLockManager() instanceof SimpleLockManager); + } + @Test public void testConfigureProperties() throws Exception { // arrange for first provider to throw exceptions @@ -667,6 +768,12 @@ public class PolicyEngineManagerTest { when(sink1.start()).thenThrow(new RuntimeException(EXPECTED)); }); + // lock manager fails to start - still does everything + testStart(false, () -> when(lockmgr.start()).thenReturn(false)); + + // lock manager throws an exception - still does everything + testStart(false, () -> when(lockmgr.start()).thenThrow(new RuntimeException(EXPECTED))); + // servlet wait fails - still does everything testStart(false, () -> when(server1.waitedStart(anyLong())).thenReturn(false)); @@ -796,6 +903,12 @@ public class PolicyEngineManagerTest { // servlet fails to stop - still does everything testStop(false, () -> when(server1.stop()).thenReturn(false)); + // lock manager fails to stop - still does everything + testStop(false, () -> when(lockmgr.stop()).thenReturn(false)); + + // lock manager throws an exception - still does everything + testStop(false, () -> when(lockmgr.stop()).thenThrow(new RuntimeException(EXPECTED))); + // other tests checkBeforeAfter( (prov, flag) -> when(prov.beforeStop(mgr)).thenReturn(flag), @@ -861,6 +974,10 @@ public class PolicyEngineManagerTest { assertTrue(threadStarted); assertTrue(threadInterrupted); + + // lock manager throws an exception - still does everything + testShutdown(() -> doThrow(new RuntimeException(EXPECTED)).when(lockmgr).shutdown()); + // other tests checkBeforeAfter( (prov, flag) -> when(prov.beforeShutdown(mgr)).thenReturn(flag), @@ -906,6 +1023,8 @@ public class PolicyEngineManagerTest { verify(prov1).afterShutdown(mgr); verify(prov2).afterShutdown(mgr); + + verify(exsvc).shutdownNow(); } @Test @@ -985,6 +1104,12 @@ public class PolicyEngineManagerTest { // endpoint manager fails to lock - still does everything testLock(false, () -> when(endpoint.lock()).thenReturn(false)); + // lock manager fails to lock - still does everything + testLock(false, () -> when(lockmgr.lock()).thenReturn(false)); + + // lock manager throws an exception - still does everything + testLock(false, () -> when(lockmgr.lock()).thenThrow(new RuntimeException(EXPECTED))); + // other tests checkBeforeAfter( (prov, flag) -> when(prov.beforeLock(mgr)).thenReturn(flag), @@ -1055,6 +1180,12 @@ public class PolicyEngineManagerTest { // endpoint manager fails to unlock - still does everything testUnlock(false, () -> when(endpoint.unlock()).thenReturn(false)); + // lock manager fails to lock - still does everything + testUnlock(false, () -> when(lockmgr.unlock()).thenReturn(false)); + + // lock manager throws an exception - still does everything + testUnlock(false, () -> when(lockmgr.unlock()).thenThrow(new RuntimeException(EXPECTED))); + // other tests checkBeforeAfter( (prov, flag) -> when(prov.beforeUnlock(mgr)).thenReturn(flag), @@ -1483,6 +1614,32 @@ public class PolicyEngineManagerTest { verify(prov2).afterDeactivate(mgr); } + @Test + public void testCreateLock() { + Lock lock = mock(Lock.class); + LockCallback callback = mock(LockCallback.class); + when(lockmgr.createLock(MY_RESOURCE, MY_OWNER, 10, callback, false)).thenReturn(lock); + + // not configured yet, thus no lock manager + assertThatIllegalStateException() + .isThrownBy(() -> mgr.createLock(MY_RESOURCE, MY_OWNER, 10, callback, false)); + + // now configure it and try again + mgr.configure(properties); + assertSame(lock, mgr.createLock(MY_RESOURCE, MY_OWNER, 10, callback, false)); + + // test illegal args + assertThatThrownBy(() -> mgr.createLock(null, MY_OWNER, 10, callback, false)) + .hasMessageContaining("resourceId"); + assertThatThrownBy(() -> mgr.createLock(MY_RESOURCE, null, 10, callback, false)) + .hasMessageContaining("ownerKey"); + assertThatIllegalArgumentException() + .isThrownBy(() -> mgr.createLock(MY_RESOURCE, MY_OWNER, -1, callback, false)) + .withMessageContaining("holdSec"); + assertThatThrownBy(() -> mgr.createLock(MY_RESOURCE, MY_OWNER, 10, null, false)) + .hasMessageContaining("callback"); + } + @Test public void testOpen() throws Throwable { when(prov1.beforeOpen(mgr)).thenThrow(new RuntimeException(EXPECTED)); @@ -1789,6 +1946,11 @@ public class PolicyEngineManagerTest { return engine; } + @Override + protected ScheduledExecutorService makeScheduledExecutor(int nthreads) { + return exsvc; + } + /** * Shutdown thread with overrides. */ diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerExceptionTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerExceptionTest.java new file mode 100644 index 00000000..7ffc72ff --- /dev/null +++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerExceptionTest.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.onap.policy.common.utils.test.ExceptionsTester; +import org.onap.policy.drools.system.internal.SimpleLockManagerException; + +public class SimpleLockManagerExceptionTest extends ExceptionsTester { + + @Test + public void test() { + assertEquals(1, test(SimpleLockManagerException.class)); + } +} diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java new file mode 100644 index 00000000..66406898 --- /dev/null +++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java @@ -0,0 +1,781 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.common.utils.time.CurrentTime; +import org.onap.policy.common.utils.time.TestTime; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockState; +import org.onap.policy.drools.system.PolicyEngine; +import org.onap.policy.drools.system.PolicyEngineConstants; +import org.onap.policy.drools.system.internal.SimpleLockManager.SimpleLock; +import org.powermock.reflect.Whitebox; + +public class SimpleLockManagerTest { + private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService"; + private static final String TIME_FIELD = "currentTime"; + private static final String OWNER_KEY = "my key"; + private static final String RESOURCE = "my resource"; + private static final String RESOURCE2 = "my resource #2"; + private static final String RESOURCE3 = "my resource #3"; + private static final int HOLD_SEC = 100; + private static final int HOLD_SEC2 = 120; + private static final int HOLD_MS = HOLD_SEC * 1000; + private static final int HOLD_MS2 = HOLD_SEC2 * 1000; + private static final int MAX_THREADS = 10; + private static final int MAX_LOOPS = 50; + + private static CurrentTime saveTime; + private static ScheduledExecutorService saveExec; + private static ScheduledExecutorService realExec; + + private TestTime testTime; + private AtomicInteger nactive; + private AtomicInteger nsuccesses; + private SimpleLockManager feature; + + @Mock + private ScheduledExecutorService exsvc; + + @Mock + private PolicyEngine engine; + + @Mock + private ScheduledFuture future; + + @Mock + private LockCallback callback; + + + /** + * Saves static fields and configures the location of the property files. + */ + @BeforeClass + public static void setUpBeforeClass() { + saveTime = Whitebox.getInternalState(SimpleLockManager.class, TIME_FIELD); + saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD); + + realExec = Executors.newScheduledThreadPool(3); + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec); + } + + /** + * Restores static fields. + */ + @AfterClass + public static void tearDownAfterClass() { + Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, saveTime); + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec); + + realExec.shutdown(); + } + + /** + * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute + * tasks. + */ + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + testTime = new TestTime(); + nactive = new AtomicInteger(0); + nsuccesses = new AtomicInteger(0); + + Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, testTime); + + when(engine.getExecutorService()).thenReturn(exsvc); + + feature = new MyLockingFeature(); + feature.start(); + } + + /** + * Tests constructor() when properties are invalid. + */ + @Test + public void testSimpleLockManagerInvalidProperties() { + // use properties containing an invalid value + Properties props = new Properties(); + props.setProperty(SimpleLockProperties.EXPIRE_CHECK_SEC, "abc"); + + assertThatThrownBy(() -> new MyLockingFeature(engine, props)).isInstanceOf(SimpleLockManagerException.class); + } + + @Test + public void testIsAlive() { + assertTrue(feature.isAlive()); + + feature.stop(); + assertFalse(feature.isAlive()); + } + + @Test + public void testStart() { + assertFalse(feature.start()); + + feature.stop(); + assertTrue(feature.start()); + } + + @Test + public void testStop() { + assertTrue(feature.stop()); + verify(future).cancel(true); + + assertFalse(feature.stop()); + + // no more invocations + verify(future).cancel(anyBoolean()); + } + + @Test + public void testShutdown() { + feature.shutdown(); + + verify(future).cancel(true); + } + + @Test + public void testLockApi() { + assertFalse(feature.isLocked()); + assertTrue(feature.lock()); + assertTrue(feature.unlock()); + } + + @Test + public void testCreateLock() { + // this lock should be granted immediately + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock.isActive()); + assertEquals(testTime.getMillis() + HOLD_MS, lock.getHoldUntilMs()); + + invokeCallback(1); + + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + + // this time it should be busy + Lock lock2 = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertFalse(lock2.isActive()); + assertTrue(lock2.isUnavailable()); + + invokeCallback(2); + + verify(callback, never()).lockAvailable(lock2); + verify(callback).lockUnavailable(lock2); + + // should have been no change to the original lock + assertTrue(lock.isActive()); + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // should work with "true" value also + Lock lock3 = feature.createLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, true); + assertTrue(lock3.isActive()); + invokeCallback(3); + verify(callback).lockAvailable(lock3); + verify(callback, never()).lockUnavailable(lock3); + } + + /** + * Tests lock() when the feature is not the latest instance. + */ + @Test + public void testCreateLockNotLatestInstance() { + SimpleLockManager.setLatestInstance(null); + + Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock.isUnavailable()); + verify(callback, never()).lockAvailable(any()); + verify(callback).lockUnavailable(lock); + } + + @Test + public void testCheckExpired() throws InterruptedException { + final SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + final SimpleLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false); + final SimpleLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC2, callback, false); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any()); + + Runnable checker = captor.getValue(); + + // time unchanged - checker should have no impact + checker.run(); + assertTrue(lock.isActive()); + assertTrue(lock2.isActive()); + assertTrue(lock3.isActive()); + + // expire the first two locks + testTime.sleep(HOLD_MS); + checker.run(); + assertFalse(lock.isActive()); + assertFalse(lock2.isActive()); + assertTrue(lock3.isActive()); + + // run the callbacks + captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(5)).execute(captor.capture()); + captor.getAllValues().forEach(Runnable::run); + verify(callback).lockUnavailable(lock); + verify(callback).lockUnavailable(lock2); + verify(callback, never()).lockUnavailable(lock3); + + // should be able to get a lock on the first two resources + assertTrue(feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC + HOLD_SEC2, callback, false).isActive()); + assertTrue(feature.createLock(RESOURCE2, OWNER_KEY, HOLD_SEC + HOLD_SEC2, callback, false).isActive()); + + // lock is still busy on the last resource + assertFalse(feature.createLock(RESOURCE3, OWNER_KEY, HOLD_SEC + HOLD_SEC2, callback, false).isActive()); + + // expire the last lock + testTime.sleep(HOLD_MS2); + checker.run(); + assertFalse(lock3.isActive()); + + // run the callback + captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(9)).execute(captor.capture()); + captor.getValue().run(); + verify(callback).lockUnavailable(lock3); + } + + /** + * Tests checkExpired(), where the lock is removed from the map between invoking + * expired() and compute(). Should cause "null" to be returned by compute(). + * + * @throws InterruptedException if the test is interrupted + */ + @Test + public void testCheckExpiredLockDeleted() throws InterruptedException { + feature = new MyLockingFeature() { + @Override + protected SimpleLock makeLock(LockState waiting, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, feature) { + private static final long serialVersionUID = 1L; + + @Override + public boolean expired(long currentMs) { + // remove the lock from the map + free(); + return true; + } + }; + } + }; + + feature.start(); + + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(1); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any()); + + Runnable checker = captor.getValue(); + + checker.run(); + + // lock should now be gone and we should be able to get another + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(2); + + // should have succeeded twice + verify(callback, times(2)).lockAvailable(any()); + + // lock should not be available now + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(3); + verify(callback).lockUnavailable(any()); + } + + /** + * Tests checkExpired(), where the lock is removed from the map and replaced with a + * new lock, between invoking expired() and compute(). Should cause the new lock to be + * returned. + * + * @throws InterruptedException if the test is interrupted + */ + @Test + public void testCheckExpiredLockReplaced() throws InterruptedException { + feature = new MyLockingFeature() { + private boolean madeLock = false; + + @Override + protected SimpleLock makeLock(LockState waiting, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + if (madeLock) { + return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, feature); + } + + madeLock = true; + + return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, feature) { + private static final long serialVersionUID = 1L; + + @Override + public boolean expired(long currentMs) { + // remove the lock from the map and add a new lock + free(); + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + return true; + } + }; + } + }; + + feature.start(); + + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(1); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any()); + + Runnable checker = captor.getValue(); + + checker.run(); + + // lock should not be available now + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(3); + verify(callback).lockUnavailable(any()); + } + + @Test + public void testGetThreadPool() { + // use a real feature + feature = new SimpleLockManager(engine, new Properties()); + + // load properties + feature.start(); + + // should create thread pool + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // should shut down thread pool + feature.stop(); + } + + @Test + public void testSimpleLockNoArgs() { + SimpleLock lock = new SimpleLock(); + assertNull(lock.getResourceId()); + assertNull(lock.getOwnerKey()); + assertNull(lock.getCallback()); + assertEquals(0, lock.getHoldSec()); + + assertEquals(0, lock.getHoldUntilMs()); + } + + @Test + public void testSimpleLockSimpleLock() { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertEquals(RESOURCE, lock.getResourceId()); + assertEquals(OWNER_KEY, lock.getOwnerKey()); + assertSame(callback, lock.getCallback()); + assertEquals(HOLD_SEC, lock.getHoldSec()); + + assertThatIllegalArgumentException() + .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false)) + .withMessageContaining("holdSec"); + } + + @Test + public void testSimpleLockSerializable() throws Exception { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + lock = roundTrip(lock); + + assertTrue(lock.isActive()); + + assertEquals(RESOURCE, lock.getResourceId()); + assertEquals(OWNER_KEY, lock.getOwnerKey()); + assertNull(lock.getCallback()); + assertEquals(HOLD_SEC, lock.getHoldSec()); + } + + @Test + public void testSimpleLockExpired() { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + lock.grant(); + + assertFalse(lock.expired(testTime.getMillis())); + assertFalse(lock.expired(testTime.getMillis() + HOLD_MS - 1)); + assertTrue(lock.expired(testTime.getMillis() + HOLD_MS)); + } + + /** + * Tests grant() when the lock is already unavailable. + */ + @Test + public void testSimpleLockGrantUnavailable() { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + lock.setState(LockState.UNAVAILABLE); + lock.grant(); + + assertTrue(lock.isUnavailable()); + verify(callback, never()).lockAvailable(any()); + verify(callback, never()).lockUnavailable(any()); + } + + @Test + public void testSimpleLockFree() { + final SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // lock2 should be denied + SimpleLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(2); + verify(callback, never()).lockAvailable(lock2); + verify(callback).lockUnavailable(lock2); + + // lock2 was denied, so nothing new should happen when freed + assertFalse(lock2.free()); + invokeCallback(2); + + // force lock2 to be active - still nothing should happen + Whitebox.setInternalState(lock2, "state", LockState.ACTIVE); + assertFalse(lock2.free()); + invokeCallback(2); + + // now free the first lock + assertTrue(lock.free()); + assertEquals(LockState.UNAVAILABLE, lock.getState()); + + // should be able to get the lock now + SimpleLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock3.isActive()); + } + + /** + * Tests that free() works on a serialized lock with a new feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testSimpleLockFreeSerialized() throws Exception { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + feature = new MyLockingFeature(); + feature.start(); + + lock = roundTrip(lock); + assertTrue(lock.free()); + assertTrue(lock.isUnavailable()); + } + + /** + * Tests free() on a serialized lock without a feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testSimpleLockFreeNoFeature() throws Exception { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + SimpleLockManager.setLatestInstance(null); + + lock = roundTrip(lock); + assertFalse(lock.free()); + assertTrue(lock.isUnavailable()); + } + + @Test + public void testSimpleLockExtend() { + final SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // lock2 should be denied + SimpleLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(2); + verify(callback, never()).lockAvailable(lock2); + verify(callback).lockUnavailable(lock2); + + // lock2 will still be denied + lock2.extend(HOLD_SEC, callback); + invokeCallback(3); + verify(callback, times(2)).lockUnavailable(lock2); + + // force lock2 to be active - should still be denied + Whitebox.setInternalState(lock2, "state", LockState.ACTIVE); + lock2.extend(HOLD_SEC, callback); + invokeCallback(4); + verify(callback, times(3)).lockUnavailable(lock2); + + assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback)) + .withMessageContaining("holdSec"); + + // now extend the first lock + lock.extend(HOLD_SEC2, callback); + assertEquals(HOLD_SEC2, lock.getHoldSec()); + assertEquals(testTime.getMillis() + HOLD_MS2, lock.getHoldUntilMs()); + invokeCallback(5); + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + } + + /** + * Tests that extend() works on a serialized lock with a new feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testSimpleLockExtendSerialized() throws Exception { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + feature = new MyLockingFeature(); + feature.start(); + + lock = roundTrip(lock); + LockCallback scallback = mock(LockCallback.class); + + lock.extend(HOLD_SEC, scallback); + assertTrue(lock.isActive()); + + invokeCallback(1); + verify(scallback).lockAvailable(lock); + verify(scallback, never()).lockUnavailable(lock); + } + + /** + * Tests extend() on a serialized lock without a feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testSimpleLockExtendNoFeature() throws Exception { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + SimpleLockManager.setLatestInstance(null); + + lock = roundTrip(lock); + LockCallback scallback = mock(LockCallback.class); + + lock.extend(HOLD_SEC, scallback); + assertTrue(lock.isUnavailable()); + + invokeCallback(1); + verify(scallback, never()).lockAvailable(lock); + verify(scallback).lockUnavailable(lock); + } + + @Test + public void testSimpleLockToString() { + String text = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString(); + assertNotNull(text); + assertThat(text).contains("holdUntil").doesNotContain("ownerInfo").doesNotContain("callback"); + } + + /** + * Performs a multi-threaded test of the locking facility. + * + * @throws InterruptedException if the current thread is interrupted while waiting for + * the background threads to complete + */ + @Test + public void testMultiThreaded() throws InterruptedException { + Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, testTime); + feature = new SimpleLockManager(PolicyEngineConstants.getManager(), new Properties()); + feature.start(); + + List threads = new ArrayList<>(MAX_THREADS); + for (int x = 0; x < MAX_THREADS; ++x) { + threads.add(new MyThread()); + } + + threads.forEach(Thread::start); + + for (MyThread thread : threads) { + thread.join(6000); + assertFalse(thread.isAlive()); + } + + for (MyThread thread : threads) { + if (thread.err != null) { + throw thread.err; + } + } + + assertTrue(nsuccesses.get() > 0); + } + + private SimpleLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback, + boolean waitForLock) { + return (SimpleLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock); + } + + private SimpleLock roundTrip(SimpleLock lock) throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(lock); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + try (ObjectInputStream ois = new ObjectInputStream(bais)) { + return (SimpleLock) ois.readObject(); + } + } + + /** + * Invokes the last call-back in the work queue. + * + * @param nexpected number of call-backs expected in the work queue + */ + private void invokeCallback(int nexpected) { + ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(nexpected)).execute(captor.capture()); + + if (nexpected > 0) { + captor.getAllValues().get(nexpected - 1).run(); + } + } + + /** + * Feature that uses exsvc to execute requests. + */ + private class MyLockingFeature extends SimpleLockManager { + + public MyLockingFeature() { + this(engine, new Properties()); + } + + public MyLockingFeature(PolicyEngine engine, Properties props) { + super(engine, props); + + exsvc = mock(ScheduledExecutorService.class); + when(engine.getExecutorService()).thenReturn(exsvc); + + when(exsvc.scheduleWithFixedDelay(any(), anyLong(), anyLong(), any())).thenAnswer(answer -> { + return future; + }); + } + } + + /** + * Thread used with the multi-threaded test. It repeatedly attempts to get a lock, + * extend it, and then unlock it. + */ + private class MyThread extends Thread { + AssertionError err = null; + + public MyThread() { + setDaemon(true); + } + + @Override + public void run() { + try { + for (int x = 0; x < MAX_LOOPS; ++x) { + makeAttempt(); + } + + } catch (AssertionError e) { + err = e; + } + } + + private void makeAttempt() { + try { + Semaphore sem = new Semaphore(0); + + LockCallback cb = new LockCallback() { + @Override + public void lockAvailable(Lock lock) { + sem.release(); + } + + @Override + public void lockUnavailable(Lock lock) { + sem.release(); + } + }; + + Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false); + + // wait for callback, whether available or unavailable + assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS)); + if (!lock.isActive()) { + return; + } + + nsuccesses.incrementAndGet(); + + assertEquals(1, nactive.incrementAndGet()); + + lock.extend(HOLD_SEC2, cb); + assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS)); + assertTrue(lock.isActive()); + + // decrement BEFORE free() + nactive.decrementAndGet(); + + assertTrue(lock.free()); + assertTrue(lock.isUnavailable()); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("interrupted", e); + } + } + } +} -- cgit 1.2.3-korg