diff options
Diffstat (limited to 'policy-management')
9 files changed, 1683 insertions, 4 deletions
diff --git a/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java b/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java index fe31eb50..87001ad6 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java +++ b/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java @@ -23,6 +23,7 @@ package org.onap.policy.drools.features; import java.util.Properties; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.utils.services.OrderedService; +import org.onap.policy.drools.core.lock.PolicyResourceLockManager; import org.onap.policy.drools.protocol.configuration.PdpdConfiguration; import org.onap.policy.drools.system.PolicyEngine; @@ -271,4 +272,26 @@ public interface PolicyEngineFeatureApi extends OrderedService { default boolean afterOpen(PolicyEngine engine) { return false; } + + /** + * Called before the PolicyEngine creates a lock manager. + * + * @return a lock manager if this feature intercepts and takes ownership of the + * operation preventing the invocation of lower priority features. Null, + * otherwise + */ + default PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) { + return null; + } + + /** + * Called after the PolicyEngine creates a lock manager. + * + * @return True if this feature intercepts and takes ownership of the operation + * preventing the invocation of lower priority features. False, otherwise + */ + default boolean afterCreateLockManager(PolicyEngine engine, Properties properties, + PolicyResourceLockManager lockManager) { + return false; + } } diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java index 653ff72e..cb0749d9 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java @@ -22,6 +22,7 @@ package org.onap.policy.drools.system; import java.util.List; import java.util.Properties; +import java.util.concurrent.ScheduledExecutorService; import org.onap.policy.common.capabilities.Lockable; import org.onap.policy.common.capabilities.Startable; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; @@ -29,6 +30,8 @@ import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.http.server.HttpServletServer; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; import org.onap.policy.drools.features.PolicyEngineFeatureApi; import org.onap.policy.drools.protocol.configuration.ControllerConfiguration; import org.onap.policy.drools.protocol.configuration.PdpdConfiguration; @@ -198,6 +201,11 @@ public interface PolicyEngine extends Startable, Lockable, TopicListener { List<HttpServletServer> getHttpServers(); /** + * Gets a thread pool that can be used to execute background tasks. + */ + ScheduledExecutorService getExecutorService(); + + /** * get properties configuration. * * @return properties objects @@ -280,6 +288,31 @@ public interface PolicyEngine extends Startable, Lockable, TopicListener { boolean deliver(CommInfrastructure busType, String topic, String event); /** + * Requests a lock on a resource. Typically, the lock is not immediately granted, + * though a "lock" object is always returned. Once the lock has been granted (or + * denied), the callback will be invoked to indicate the result. + * + * <p/> + * Notes: + * <dl> + * <li>The callback may be invoked <i>before</i> this method returns</li> + * <li>The implementation need not honor waitForLock={@code true}</li> + * </dl> + * + * @param resourceId identifier of the resource to be locked + * @param ownerKey information identifying the owner requesting the lock + * @param holdSec amount of time, in seconds, for which the lock should be held once + * it has been granted, after which it will automatically be released + * @param callback callback to be invoked once the lock is granted, or subsequently + * lost; must not be {@code null} + * @param waitForLock {@code true} to wait for the lock, if it is currently locked, + * {@code false} otherwise + * @return a new lock + */ + public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback, + boolean waitForLock); + + /** * Invoked when the host goes into the active state. */ void activate(); diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java index 766848c6..c924fd69 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java @@ -24,7 +24,6 @@ import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERV import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME; import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_PORT; -import com.att.aft.dme2.internal.apache.commons.lang3.StringUtils; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.gson.Gson; @@ -32,11 +31,16 @@ import com.google.gson.GsonBuilder; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Stream; +import lombok.AccessLevel; import lombok.Getter; +import lombok.NonNull; +import org.apache.commons.lang.StringUtils; import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; @@ -54,6 +58,9 @@ import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.controller.DroolsControllerConstants; import org.onap.policy.drools.core.PolicyContainer; import org.onap.policy.drools.core.jmx.PdpJmxListener; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.PolicyResourceLockManager; import org.onap.policy.drools.features.PolicyControllerFeatureApi; import org.onap.policy.drools.features.PolicyControllerFeatureApiConstants; import org.onap.policy.drools.features.PolicyEngineFeatureApi; @@ -67,6 +74,7 @@ import org.onap.policy.drools.protocol.configuration.ControllerConfiguration; import org.onap.policy.drools.protocol.configuration.PdpdConfiguration; import org.onap.policy.drools.server.restful.RestManager; import org.onap.policy.drools.server.restful.aaf.AafTelemetryAuthFilter; +import org.onap.policy.drools.system.internal.SimpleLockManager; import org.onap.policy.drools.utils.PropertyUtil; import org.onap.policy.drools.utils.logging.LoggerUtil; import org.onap.policy.drools.utils.logging.MdcTransaction; @@ -78,7 +86,6 @@ import org.slf4j.LoggerFactory; * Policy Engine Manager Implementation. */ class PolicyEngineManager implements PolicyEngine { - /** * String literals. */ @@ -88,6 +95,9 @@ class PolicyEngineManager implements PolicyEngine { private static final String ENGINE_STOPPED_MSG = "Policy Engine is stopped"; private static final String ENGINE_LOCKED_MSG = "Policy Engine is locked"; + public static final String EXECUTOR_THREAD_PROP = "executor.threads"; + protected static final int DEFAULT_EXECUTOR_THREADS = 5; + /** * logger. */ @@ -134,6 +144,17 @@ class PolicyEngineManager implements PolicyEngine { private List<HttpServletServer> httpServers = new ArrayList<>(); /** + * Thread pool used to execute background tasks. + */ + private ScheduledExecutorService executorService = null; + + /** + * Lock manager used to create locks. + */ + @Getter(AccessLevel.PROTECTED) + private PolicyResourceLockManager lockManager = null; + + /** * gson parser to decode configuration requests. */ private final Gson decoder = new GsonBuilder().disableHtmlEscaping().create(); @@ -214,6 +235,53 @@ class PolicyEngineManager implements PolicyEngine { } @Override + @JsonIgnore + @GsonJsonIgnore + public ScheduledExecutorService getExecutorService() { + return executorService; + } + + private ScheduledExecutorService makeExecutorService(Properties properties) { + int nthreads = DEFAULT_EXECUTOR_THREADS; + try { + nthreads = Integer.valueOf( + properties.getProperty(EXECUTOR_THREAD_PROP, String.valueOf(DEFAULT_EXECUTOR_THREADS))); + + } catch (NumberFormatException e) { + logger.error("invalid number for " + EXECUTOR_THREAD_PROP + " property", e); + } + + return makeScheduledExecutor(nthreads); + } + + private void createLockManager(Properties properties) { + for (PolicyEngineFeatureApi feature : getEngineProviders()) { + try { + this.lockManager = feature.beforeCreateLockManager(this, properties); + if (this.lockManager != null) { + return; + } + } catch (RuntimeException e) { + logger.error("{}: feature {} before-create-lock-manager failure because of {}", this, + feature.getClass().getName(), e.getMessage(), e); + } + } + + try { + this.lockManager = new SimpleLockManager(this, properties); + } catch (RuntimeException e) { + logger.error("{}: cannot create simple lock manager because of {}", this, e.getMessage(), e); + this.lockManager = new SimpleLockManager(this, new Properties()); + } + + /* policy-engine dispatch post operation hook */ + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterCreateLockManager(this, properties, this.lockManager), + (feature, ex) -> logger.error("{}: feature {} after-create-lock-manager failure because of {}", + this, feature.getClass().getName(), ex.getMessage(), ex)); + } + + @Override public synchronized void configure(Properties properties) { if (properties == null) { @@ -257,6 +325,10 @@ class PolicyEngineManager implements PolicyEngine { logger.error("{}: add-http-servers failed", this, e); } + executorService = makeExecutorService(properties); + + createLockManager(properties); + /* policy-engine dispatch post configure hook */ FeatureApiUtils.apply(getEngineProviders(), feature -> feature.afterConfigure(this), @@ -499,6 +571,13 @@ class PolicyEngineManager implements PolicyEngine { AtomicReference<Boolean> success = new AtomicReference<>(true); + try { + success.compareAndSet(true, this.lockManager.start()); + } catch (final RuntimeException e) { + logger.warn("{}: cannot start lock manager because of {}", this, e.getMessage(), e); + success.set(false); + } + /* Start Policy Engine exclusively-owned (unmanaged) http servers */ attempt(success, this.httpServers, @@ -639,9 +718,16 @@ class PolicyEngineManager implements PolicyEngine { (item, ex) -> logger.error("{}: cannot stop http-server {} because of {}", this, item, ex.getMessage(), ex)); + try { + success.compareAndSet(true, this.lockManager.stop()); + } catch (final RuntimeException e) { + logger.warn("{}: cannot stop lock manager because of {}", this, e.getMessage(), e); + success.set(false); + } + // stop JMX? - /* policy-engine dispatch pre stop hook */ + /* policy-engine dispatch post stop hook */ FeatureApiUtils.apply(getEngineProviders(), feature -> feature.afterStop(this), (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this, @@ -688,6 +774,14 @@ class PolicyEngineManager implements PolicyEngine { getTopicEndpointManager().shutdown(); getServletFactory().destroy(); + try { + this.lockManager.shutdown(); + } catch (final RuntimeException e) { + logger.warn("{}: cannot shutdown lock manager because of {}", this, e.getMessage(), e); + } + + executorService.shutdownNow(); + // Stop the JMX listener stopPdpJmxListener(); @@ -806,6 +900,13 @@ class PolicyEngineManager implements PolicyEngine { success = getTopicEndpointManager().lock() && success; + try { + success = (this.lockManager == null || this.lockManager.lock()) && success; + } catch (final RuntimeException e) { + logger.warn("{}: cannot lock() lock manager because of {}", this, e.getMessage(), e); + success = false; + } + /* policy-engine dispatch post lock hook */ FeatureApiUtils.apply(getEngineProviders(), feature -> feature.afterLock(this), @@ -833,6 +934,14 @@ class PolicyEngineManager implements PolicyEngine { this.locked = false; boolean success = true; + + try { + success = (this.lockManager == null || this.lockManager.unlock()) && success; + } catch (final RuntimeException e) { + logger.warn("{}: cannot unlock() lock manager because of {}", this, e.getMessage(), e); + success = false; + } + final List<PolicyController> controllers = getControllerFactory().inventory(); for (final PolicyController controller : controllers) { try { @@ -1167,6 +1276,21 @@ class PolicyEngineManager implements PolicyEngine { feature.getClass().getName(), ex.getMessage(), ex)); } + @Override + public Lock createLock(@NonNull String resourceId, @NonNull String ownerKey, int holdSec, + @NonNull LockCallback callback, boolean waitForLock) { + + if (holdSec < 0) { + throw new IllegalArgumentException("holdSec is negative"); + } + + if (lockManager == null) { + throw new IllegalStateException("lock manager has not been initialized"); + } + + return lockManager.createLock(resourceId, ownerKey, holdSec, callback, waitForLock); + } + private boolean controllerConfig(PdpdConfiguration config) { /* only this one supported for now */ final List<ControllerConfiguration> configControllers = config.getControllers(); @@ -1234,4 +1358,13 @@ class PolicyEngineManager implements PolicyEngine { protected PolicyEngine getPolicyEngine() { return PolicyEngineConstants.getManager(); } + + protected ScheduledExecutorService makeScheduledExecutor(int nthreads) { + ScheduledThreadPoolExecutor exsvc = new ScheduledThreadPoolExecutor(nthreads); + exsvc.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + exsvc.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + exsvc.setRemoveOnCancelPolicy(true); + + return exsvc; + } } diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java new file mode 100644 index 00000000..f5163e9b --- /dev/null +++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java @@ -0,0 +1,426 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.utils.properties.exception.PropertyException; +import org.onap.policy.common.utils.time.CurrentTime; +import org.onap.policy.drools.core.lock.AlwaysFailLock; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockImpl; +import org.onap.policy.drools.core.lock.LockState; +import org.onap.policy.drools.core.lock.PolicyResourceLockManager; +import org.onap.policy.drools.system.PolicyEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Simple implementation of the Lock Feature. Locks do not span across instances of this + * object (i.e., locks do not span across servers). + * + * <p/> + * Note: this implementation does <i>not</i> honor the waitForLocks={@code true} + * parameter. + * + * <p/> + * When a lock is deserialized, it will not initially appear in this feature's map; it + * will be added to the map once free() or extend() is invoked, provided there isn't + * already an entry. + */ +public class SimpleLockManager implements PolicyResourceLockManager { + + private static final Logger logger = LoggerFactory.getLogger(SimpleLockManager.class); + + private static final String NOT_LOCKED_MSG = "not locked"; + private static final String LOCK_LOST_MSG = "lock lost"; + + /** + * Provider of current time. May be overridden by junit tests. + */ + private static CurrentTime currentTime = new CurrentTime(); + + @Getter(AccessLevel.PROTECTED) + @Setter(AccessLevel.PROTECTED) + private static SimpleLockManager latestInstance = null; + + + /** + * Engine with which this manager is associated. + */ + private final PolicyEngine engine; + + /** + * Feature properties. + */ + private final SimpleLockProperties featProps; + + /** + * Maps a resource to the lock that owns it. + */ + private final Map<String, SimpleLock> resource2lock = new ConcurrentHashMap<>(); + + /** + * Thread pool used to check for lock expiration and to notify owners when locks are + * lost. + */ + private ScheduledExecutorService exsvc = null; + + /** + * Used to cancel the expiration checker on shutdown. + */ + private ScheduledFuture<?> checker = null; + + + /** + * Constructs the object. + * + * @param engine engine with which this manager is associated + * @param properties properties used to configure the manager + */ + public SimpleLockManager(PolicyEngine engine, Properties properties) { + try { + this.engine = engine; + this.featProps = new SimpleLockProperties(properties); + + } catch (PropertyException e) { + throw new SimpleLockManagerException(e); + } + } + + @Override + public boolean isAlive() { + return (checker != null); + } + + @Override + public boolean start() { + if (checker != null) { + return false; + } + + exsvc = getThreadPool(); + + checker = exsvc.scheduleWithFixedDelay(this::checkExpired, featProps.getExpireCheckSec(), + featProps.getExpireCheckSec(), TimeUnit.SECONDS); + + setLatestInstance(this); + + return true; + } + + /** + * Stops the expiration checker. Does <i>not</i> invoke any lock call-backs. + */ + @Override + public synchronized boolean stop() { + exsvc = null; + + if (checker == null) { + return false; + } + + ScheduledFuture<?> checker2 = checker; + checker = null; + + checker2.cancel(true); + + return true; + } + + @Override + public void shutdown() { + stop(); + } + + @Override + public boolean isLocked() { + return false; + } + + @Override + public boolean lock() { + return true; + } + + @Override + public boolean unlock() { + return true; + } + + @Override + public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback, + boolean waitForLock) { + + if (latestInstance != this) { + AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback); + lock.notifyUnavailable(); + return lock; + } + + SimpleLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback); + + SimpleLock existingLock = resource2lock.putIfAbsent(resourceId, lock); + + if (existingLock == null) { + lock.grant(); + } else { + lock.deny("resource is busy"); + } + + return lock; + } + + /** + * Checks for expired locks. + */ + private void checkExpired() { + long currentMs = currentTime.getMillis(); + logger.info("checking for expired locks at {}", currentMs); + + /* + * Could do this via an iterator, but using compute() guarantees that the lock + * doesn't get extended while it's being removed from the map. + */ + for (Entry<String, SimpleLock> ent : resource2lock.entrySet()) { + if (!ent.getValue().expired(currentMs)) { + continue; + } + + AtomicReference<SimpleLock> lockref = new AtomicReference<>(null); + + resource2lock.computeIfPresent(ent.getKey(), (resourceId, lock) -> { + if (lock.expired(currentMs)) { + lockref.set(lock); + return null; + } + + return lock; + }); + + SimpleLock lock = lockref.get(); + if (lock != null) { + lock.deny("lock expired"); + } + } + } + + /** + * Simple Lock implementation. + */ + public static class SimpleLock extends LockImpl { + private static final long serialVersionUID = 1L; + + /** + * Time, in milliseconds, when the lock expires. + */ + @Getter + private long holdUntilMs; + + /** + * Feature containing this lock. May be {@code null} until the feature is + * identified. Note: this can only be null if the lock has been de-serialized. + */ + private transient SimpleLockManager feature; + + /** + * Constructs the object. + */ + public SimpleLock() { + this.holdUntilMs = 0; + this.feature = null; + } + + /** + * Constructs the object. + * + * @param state initial state of the lock + * @param resourceId identifier of the resource to be locked + * @param ownerKey information identifying the owner requesting the lock + * @param holdSec amount of time, in seconds, for which the lock should be held, + * after which it will automatically be released + * @param callback callback to be invoked once the lock is granted, or + * subsequently lost; must not be {@code null} + * @param feature feature containing this lock + */ + public SimpleLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback, + SimpleLockManager feature) { + super(state, resourceId, ownerKey, holdSec, callback); + this.feature = feature; + } + + /** + * Determines if the owner's lock has expired. + * + * @param currentMs current time, in milliseconds + * @return {@code true} if the owner's lock has expired, {@code false} otherwise + */ + public boolean expired(long currentMs) { + return (holdUntilMs <= currentMs); + } + + /** + * Grants this lock. The notification is <i>always</i> invoked via a background + * thread. + */ + protected synchronized void grant() { + if (isUnavailable()) { + return; + } + + setState(LockState.ACTIVE); + holdUntilMs = currentTime.getMillis() + TimeUnit.SECONDS.toMillis(getHoldSec()); + + logger.info("lock granted: {}", this); + + feature.exsvc.execute(this::notifyAvailable); + } + + /** + * Permanently denies this lock. The notification is invoked via a background + * thread, if a feature instance is attached, otherwise it uses the foreground + * thread. + * + * @param reason the reason the lock was denied + */ + protected void deny(String reason) { + synchronized (this) { + setState(LockState.UNAVAILABLE); + } + + logger.info("{}: {}", reason, this); + + if (feature == null) { + notifyUnavailable(); + + } else { + feature.exsvc.execute(this::notifyUnavailable); + } + } + + @Override + public boolean free() { + // do a quick check of the state + if (isUnavailable()) { + return false; + } + + logger.info("releasing lock: {}", this); + + if (!attachFeature()) { + setState(LockState.UNAVAILABLE); + return false; + } + + AtomicBoolean result = new AtomicBoolean(false); + + feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> { + + if (curlock == this) { + // this lock was the owner - resource is now available + result.set(true); + setState(LockState.UNAVAILABLE); + return null; + + } else { + return curlock; + } + }); + + return result.get(); + } + + @Override + public void extend(int holdSec, LockCallback callback) { + if (holdSec < 0) { + throw new IllegalArgumentException("holdSec is negative"); + } + + setHoldSec(holdSec); + setCallback(callback); + + // do a quick check of the state + if (isUnavailable() || !attachFeature()) { + deny(LOCK_LOST_MSG); + return; + } + + if (feature.resource2lock.get(getResourceId()) == this) { + grant(); + } else { + deny(NOT_LOCKED_MSG); + } + } + + /** + * Attaches to the feature instance, if not already attached. + * + * @return {@code true} if the lock is now attached to a feature, {@code false} + * otherwise + */ + private synchronized boolean attachFeature() { + if (feature != null) { + // already attached + return true; + } + + feature = latestInstance; + if (feature == null) { + logger.warn("no feature yet for {}", this); + return false; + } + + // put this lock into the map + feature.resource2lock.putIfAbsent(getResourceId(), this); + + return true; + } + + @Override + public String toString() { + return "SimpleLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey=" + getOwnerKey() + + ", holdSec=" + getHoldSec() + ", holdUntilMs=" + holdUntilMs + "]"; + } + } + + // these may be overridden by junit tests + + protected ScheduledExecutorService getThreadPool() { + return engine.getExecutorService(); + } + + protected SimpleLock makeLock(LockState waiting, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, this); + } +} diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManagerException.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManagerException.java new file mode 100644 index 00000000..ff02f39e --- /dev/null +++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManagerException.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +public class SimpleLockManagerException extends RuntimeException { + private static final long serialVersionUID = 1L; + + /** + * Constructor. + * + * @param ex exception to be wrapped + */ + public SimpleLockManagerException(Exception ex) { + super(ex); + } +} diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockProperties.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockProperties.java new file mode 100644 index 00000000..0d1ca89b --- /dev/null +++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockProperties.java @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +import java.util.Properties; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.utils.properties.BeanConfigurator; +import org.onap.policy.common.utils.properties.Property; +import org.onap.policy.common.utils.properties.exception.PropertyException; + + +@Getter +@Setter +public class SimpleLockProperties { + public static final String PREFIX = "simple.locking."; + public static final String EXPIRE_CHECK_SEC = PREFIX + "expire.check.seconds"; + + /** + * Time, in seconds, to wait between checks for expired locks. + */ + @Property(name = EXPIRE_CHECK_SEC, defaultValue = "900") + private int expireCheckSec; + + /** + * Constructs the object, populating fields from the properties. + * + * @param props properties from which to configure this + * @throws PropertyException if an error occurs + */ + public SimpleLockProperties(Properties props) throws PropertyException { + new BeanConfigurator().configureFromProperties(this, props); + } +} diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java index 5e0ead9d..fe1a2345 100644 --- a/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java +++ b/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java @@ -27,12 +27,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -41,6 +43,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; import org.junit.Before; @@ -53,6 +56,10 @@ import org.onap.policy.common.endpoints.http.server.HttpServletServer; import org.onap.policy.common.endpoints.http.server.HttpServletServerFactory; import org.onap.policy.common.utils.gson.GsonTestUtils; import org.onap.policy.drools.controller.DroolsController; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockImpl; +import org.onap.policy.drools.core.lock.PolicyResourceLockManager; import org.onap.policy.drools.features.PolicyControllerFeatureApi; import org.onap.policy.drools.features.PolicyEngineFeatureApi; import org.onap.policy.drools.persistence.SystemPersistence; @@ -61,9 +68,10 @@ import org.onap.policy.drools.protocol.coders.EventProtocolCoder; import org.onap.policy.drools.protocol.configuration.ControllerConfiguration; import org.onap.policy.drools.protocol.configuration.DroolsConfiguration; import org.onap.policy.drools.protocol.configuration.PdpdConfiguration; +import org.onap.policy.drools.system.internal.SimpleLockManager; +import org.onap.policy.drools.system.internal.SimpleLockProperties; public class PolicyEngineManagerTest { - private static final String EXPECTED = "expected exception"; private static final String NOOP_STR = CommInfrastructure.NOOP.name(); @@ -76,6 +84,8 @@ public class PolicyEngineManagerTest { private static final String FEATURE2 = "feature-b"; private static final String MY_TOPIC = "my-topic"; private static final String MESSAGE = "my-message"; + private static final String MY_OWNER = "my-owner"; + private static final String MY_RESOURCE = "my-resource"; private static final Object MY_EVENT = new Object(); @@ -125,6 +135,8 @@ public class PolicyEngineManagerTest { private PdpdConfiguration pdpConfig; private String pdpConfigJson; private PolicyEngineManager mgr; + private ScheduledExecutorService exsvc; + private PolicyResourceLockManager lockmgr; /** * Initializes the object to be tested. @@ -176,6 +188,15 @@ public class PolicyEngineManagerTest { config3 = new ControllerConfiguration(); config4 = new ControllerConfiguration(); pdpConfig = new PdpdConfiguration(); + exsvc = mock(ScheduledExecutorService.class); + lockmgr = mock(PolicyResourceLockManager.class); + + when(lockmgr.start()).thenReturn(true); + when(lockmgr.stop()).thenReturn(true); + when(lockmgr.lock()).thenReturn(true); + when(lockmgr.unlock()).thenReturn(true); + + when(prov2.beforeCreateLockManager(any(), any())).thenReturn(lockmgr); when(prov1.getName()).thenReturn(FEATURE1); when(prov2.getName()).thenReturn(FEATURE2); @@ -387,6 +408,86 @@ public class PolicyEngineManagerTest { assertFalse(config.isEmpty()); } + /** + * Tests that makeExecutorService() uses the value from the thread + * property. + */ + @Test + public void testMakeExecutorServicePropertyProvided() { + PolicyEngineManager mgrspy = spy(mgr); + + properties.setProperty(PolicyEngineManager.EXECUTOR_THREAD_PROP, "3"); + mgrspy.configure(properties); + assertSame(exsvc, mgrspy.getExecutorService()); + verify(mgrspy).makeScheduledExecutor(3); + } + + /** + * Tests that makeExecutorService() uses the default thread count when no thread + * property is provided. + */ + @Test + public void testMakeExecutorServiceNoProperty() { + PolicyEngineManager mgrspy = spy(mgr); + + mgrspy.configure(properties); + assertSame(exsvc, mgrspy.getExecutorService()); + verify(mgrspy).makeScheduledExecutor(PolicyEngineManager.DEFAULT_EXECUTOR_THREADS); + } + + /** + * Tests that makeExecutorService() uses the default thread count when the thread + * property is invalid. + */ + @Test + public void testMakeExecutorServiceInvalidProperty() { + PolicyEngineManager mgrspy = spy(mgr); + + properties.setProperty(PolicyEngineManager.EXECUTOR_THREAD_PROP, "abc"); + mgrspy.configure(properties); + assertSame(exsvc, mgrspy.getExecutorService()); + verify(mgrspy).makeScheduledExecutor(PolicyEngineManager.DEFAULT_EXECUTOR_THREADS); + } + + /** + * Tests createLockManager() when beforeCreateLock throws an exception and returns a + * manager. + */ + @Test + public void testCreateLockManagerHaveProvider() { + // first provider throws an exception + when(prov1.beforeCreateLockManager(any(), any())).thenThrow(new RuntimeException(EXPECTED)); + + mgr.configure(properties); + assertSame(lockmgr, mgr.getLockManager()); + } + + /** + * Tests createLockManager() when SimpleLockManager throws an exception. + */ + @Test + public void testCreateLockManagerSimpleEx() { + when(prov2.beforeCreateLockManager(any(), any())).thenReturn(null); + + // invalid property for SimpleLockManager + properties.setProperty(SimpleLockProperties.EXPIRE_CHECK_SEC, "abc"); + mgr.configure(properties); + + // should create a manager using default properties + assertTrue(mgr.getLockManager() instanceof SimpleLockManager); + } + + /** + * Tests createLockManager() when SimpleLockManager is returned. + */ + @Test + public void testCreateLockManagerSimple() { + when(prov2.beforeCreateLockManager(any(), any())).thenReturn(null); + + mgr.configure(properties); + assertTrue(mgr.getLockManager() instanceof SimpleLockManager); + } + @Test public void testConfigureProperties() throws Exception { // arrange for first provider to throw exceptions @@ -667,6 +768,12 @@ public class PolicyEngineManagerTest { when(sink1.start()).thenThrow(new RuntimeException(EXPECTED)); }); + // lock manager fails to start - still does everything + testStart(false, () -> when(lockmgr.start()).thenReturn(false)); + + // lock manager throws an exception - still does everything + testStart(false, () -> when(lockmgr.start()).thenThrow(new RuntimeException(EXPECTED))); + // servlet wait fails - still does everything testStart(false, () -> when(server1.waitedStart(anyLong())).thenReturn(false)); @@ -796,6 +903,12 @@ public class PolicyEngineManagerTest { // servlet fails to stop - still does everything testStop(false, () -> when(server1.stop()).thenReturn(false)); + // lock manager fails to stop - still does everything + testStop(false, () -> when(lockmgr.stop()).thenReturn(false)); + + // lock manager throws an exception - still does everything + testStop(false, () -> when(lockmgr.stop()).thenThrow(new RuntimeException(EXPECTED))); + // other tests checkBeforeAfter( (prov, flag) -> when(prov.beforeStop(mgr)).thenReturn(flag), @@ -861,6 +974,10 @@ public class PolicyEngineManagerTest { assertTrue(threadStarted); assertTrue(threadInterrupted); + + // lock manager throws an exception - still does everything + testShutdown(() -> doThrow(new RuntimeException(EXPECTED)).when(lockmgr).shutdown()); + // other tests checkBeforeAfter( (prov, flag) -> when(prov.beforeShutdown(mgr)).thenReturn(flag), @@ -906,6 +1023,8 @@ public class PolicyEngineManagerTest { verify(prov1).afterShutdown(mgr); verify(prov2).afterShutdown(mgr); + + verify(exsvc).shutdownNow(); } @Test @@ -985,6 +1104,12 @@ public class PolicyEngineManagerTest { // endpoint manager fails to lock - still does everything testLock(false, () -> when(endpoint.lock()).thenReturn(false)); + // lock manager fails to lock - still does everything + testLock(false, () -> when(lockmgr.lock()).thenReturn(false)); + + // lock manager throws an exception - still does everything + testLock(false, () -> when(lockmgr.lock()).thenThrow(new RuntimeException(EXPECTED))); + // other tests checkBeforeAfter( (prov, flag) -> when(prov.beforeLock(mgr)).thenReturn(flag), @@ -1055,6 +1180,12 @@ public class PolicyEngineManagerTest { // endpoint manager fails to unlock - still does everything testUnlock(false, () -> when(endpoint.unlock()).thenReturn(false)); + // lock manager fails to lock - still does everything + testUnlock(false, () -> when(lockmgr.unlock()).thenReturn(false)); + + // lock manager throws an exception - still does everything + testUnlock(false, () -> when(lockmgr.unlock()).thenThrow(new RuntimeException(EXPECTED))); + // other tests checkBeforeAfter( (prov, flag) -> when(prov.beforeUnlock(mgr)).thenReturn(flag), @@ -1484,6 +1615,32 @@ public class PolicyEngineManagerTest { } @Test + public void testCreateLock() { + Lock lock = mock(Lock.class); + LockCallback callback = mock(LockCallback.class); + when(lockmgr.createLock(MY_RESOURCE, MY_OWNER, 10, callback, false)).thenReturn(lock); + + // not configured yet, thus no lock manager + assertThatIllegalStateException() + .isThrownBy(() -> mgr.createLock(MY_RESOURCE, MY_OWNER, 10, callback, false)); + + // now configure it and try again + mgr.configure(properties); + assertSame(lock, mgr.createLock(MY_RESOURCE, MY_OWNER, 10, callback, false)); + + // test illegal args + assertThatThrownBy(() -> mgr.createLock(null, MY_OWNER, 10, callback, false)) + .hasMessageContaining("resourceId"); + assertThatThrownBy(() -> mgr.createLock(MY_RESOURCE, null, 10, callback, false)) + .hasMessageContaining("ownerKey"); + assertThatIllegalArgumentException() + .isThrownBy(() -> mgr.createLock(MY_RESOURCE, MY_OWNER, -1, callback, false)) + .withMessageContaining("holdSec"); + assertThatThrownBy(() -> mgr.createLock(MY_RESOURCE, MY_OWNER, 10, null, false)) + .hasMessageContaining("callback"); + } + + @Test public void testOpen() throws Throwable { when(prov1.beforeOpen(mgr)).thenThrow(new RuntimeException(EXPECTED)); when(prov1.afterOpen(mgr)).thenThrow(new RuntimeException(EXPECTED)); @@ -1789,6 +1946,11 @@ public class PolicyEngineManagerTest { return engine; } + @Override + protected ScheduledExecutorService makeScheduledExecutor(int nthreads) { + return exsvc; + } + /** * Shutdown thread with overrides. */ diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerExceptionTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerExceptionTest.java new file mode 100644 index 00000000..7ffc72ff --- /dev/null +++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerExceptionTest.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.onap.policy.common.utils.test.ExceptionsTester; +import org.onap.policy.drools.system.internal.SimpleLockManagerException; + +public class SimpleLockManagerExceptionTest extends ExceptionsTester { + + @Test + public void test() { + assertEquals(1, test(SimpleLockManagerException.class)); + } +} diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java new file mode 100644 index 00000000..66406898 --- /dev/null +++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java @@ -0,0 +1,781 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.common.utils.time.CurrentTime; +import org.onap.policy.common.utils.time.TestTime; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockState; +import org.onap.policy.drools.system.PolicyEngine; +import org.onap.policy.drools.system.PolicyEngineConstants; +import org.onap.policy.drools.system.internal.SimpleLockManager.SimpleLock; +import org.powermock.reflect.Whitebox; + +public class SimpleLockManagerTest { + private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService"; + private static final String TIME_FIELD = "currentTime"; + private static final String OWNER_KEY = "my key"; + private static final String RESOURCE = "my resource"; + private static final String RESOURCE2 = "my resource #2"; + private static final String RESOURCE3 = "my resource #3"; + private static final int HOLD_SEC = 100; + private static final int HOLD_SEC2 = 120; + private static final int HOLD_MS = HOLD_SEC * 1000; + private static final int HOLD_MS2 = HOLD_SEC2 * 1000; + private static final int MAX_THREADS = 10; + private static final int MAX_LOOPS = 50; + + private static CurrentTime saveTime; + private static ScheduledExecutorService saveExec; + private static ScheduledExecutorService realExec; + + private TestTime testTime; + private AtomicInteger nactive; + private AtomicInteger nsuccesses; + private SimpleLockManager feature; + + @Mock + private ScheduledExecutorService exsvc; + + @Mock + private PolicyEngine engine; + + @Mock + private ScheduledFuture<?> future; + + @Mock + private LockCallback callback; + + + /** + * Saves static fields and configures the location of the property files. + */ + @BeforeClass + public static void setUpBeforeClass() { + saveTime = Whitebox.getInternalState(SimpleLockManager.class, TIME_FIELD); + saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD); + + realExec = Executors.newScheduledThreadPool(3); + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec); + } + + /** + * Restores static fields. + */ + @AfterClass + public static void tearDownAfterClass() { + Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, saveTime); + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec); + + realExec.shutdown(); + } + + /** + * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute + * tasks. + */ + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + testTime = new TestTime(); + nactive = new AtomicInteger(0); + nsuccesses = new AtomicInteger(0); + + Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, testTime); + + when(engine.getExecutorService()).thenReturn(exsvc); + + feature = new MyLockingFeature(); + feature.start(); + } + + /** + * Tests constructor() when properties are invalid. + */ + @Test + public void testSimpleLockManagerInvalidProperties() { + // use properties containing an invalid value + Properties props = new Properties(); + props.setProperty(SimpleLockProperties.EXPIRE_CHECK_SEC, "abc"); + + assertThatThrownBy(() -> new MyLockingFeature(engine, props)).isInstanceOf(SimpleLockManagerException.class); + } + + @Test + public void testIsAlive() { + assertTrue(feature.isAlive()); + + feature.stop(); + assertFalse(feature.isAlive()); + } + + @Test + public void testStart() { + assertFalse(feature.start()); + + feature.stop(); + assertTrue(feature.start()); + } + + @Test + public void testStop() { + assertTrue(feature.stop()); + verify(future).cancel(true); + + assertFalse(feature.stop()); + + // no more invocations + verify(future).cancel(anyBoolean()); + } + + @Test + public void testShutdown() { + feature.shutdown(); + + verify(future).cancel(true); + } + + @Test + public void testLockApi() { + assertFalse(feature.isLocked()); + assertTrue(feature.lock()); + assertTrue(feature.unlock()); + } + + @Test + public void testCreateLock() { + // this lock should be granted immediately + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock.isActive()); + assertEquals(testTime.getMillis() + HOLD_MS, lock.getHoldUntilMs()); + + invokeCallback(1); + + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + + // this time it should be busy + Lock lock2 = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertFalse(lock2.isActive()); + assertTrue(lock2.isUnavailable()); + + invokeCallback(2); + + verify(callback, never()).lockAvailable(lock2); + verify(callback).lockUnavailable(lock2); + + // should have been no change to the original lock + assertTrue(lock.isActive()); + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // should work with "true" value also + Lock lock3 = feature.createLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, true); + assertTrue(lock3.isActive()); + invokeCallback(3); + verify(callback).lockAvailable(lock3); + verify(callback, never()).lockUnavailable(lock3); + } + + /** + * Tests lock() when the feature is not the latest instance. + */ + @Test + public void testCreateLockNotLatestInstance() { + SimpleLockManager.setLatestInstance(null); + + Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock.isUnavailable()); + verify(callback, never()).lockAvailable(any()); + verify(callback).lockUnavailable(lock); + } + + @Test + public void testCheckExpired() throws InterruptedException { + final SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + final SimpleLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false); + final SimpleLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC2, callback, false); + + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any()); + + Runnable checker = captor.getValue(); + + // time unchanged - checker should have no impact + checker.run(); + assertTrue(lock.isActive()); + assertTrue(lock2.isActive()); + assertTrue(lock3.isActive()); + + // expire the first two locks + testTime.sleep(HOLD_MS); + checker.run(); + assertFalse(lock.isActive()); + assertFalse(lock2.isActive()); + assertTrue(lock3.isActive()); + + // run the callbacks + captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(5)).execute(captor.capture()); + captor.getAllValues().forEach(Runnable::run); + verify(callback).lockUnavailable(lock); + verify(callback).lockUnavailable(lock2); + verify(callback, never()).lockUnavailable(lock3); + + // should be able to get a lock on the first two resources + assertTrue(feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC + HOLD_SEC2, callback, false).isActive()); + assertTrue(feature.createLock(RESOURCE2, OWNER_KEY, HOLD_SEC + HOLD_SEC2, callback, false).isActive()); + + // lock is still busy on the last resource + assertFalse(feature.createLock(RESOURCE3, OWNER_KEY, HOLD_SEC + HOLD_SEC2, callback, false).isActive()); + + // expire the last lock + testTime.sleep(HOLD_MS2); + checker.run(); + assertFalse(lock3.isActive()); + + // run the callback + captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(9)).execute(captor.capture()); + captor.getValue().run(); + verify(callback).lockUnavailable(lock3); + } + + /** + * Tests checkExpired(), where the lock is removed from the map between invoking + * expired() and compute(). Should cause "null" to be returned by compute(). + * + * @throws InterruptedException if the test is interrupted + */ + @Test + public void testCheckExpiredLockDeleted() throws InterruptedException { + feature = new MyLockingFeature() { + @Override + protected SimpleLock makeLock(LockState waiting, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, feature) { + private static final long serialVersionUID = 1L; + + @Override + public boolean expired(long currentMs) { + // remove the lock from the map + free(); + return true; + } + }; + } + }; + + feature.start(); + + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(1); + + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any()); + + Runnable checker = captor.getValue(); + + checker.run(); + + // lock should now be gone and we should be able to get another + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(2); + + // should have succeeded twice + verify(callback, times(2)).lockAvailable(any()); + + // lock should not be available now + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(3); + verify(callback).lockUnavailable(any()); + } + + /** + * Tests checkExpired(), where the lock is removed from the map and replaced with a + * new lock, between invoking expired() and compute(). Should cause the new lock to be + * returned. + * + * @throws InterruptedException if the test is interrupted + */ + @Test + public void testCheckExpiredLockReplaced() throws InterruptedException { + feature = new MyLockingFeature() { + private boolean madeLock = false; + + @Override + protected SimpleLock makeLock(LockState waiting, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + if (madeLock) { + return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, feature); + } + + madeLock = true; + + return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, feature) { + private static final long serialVersionUID = 1L; + + @Override + public boolean expired(long currentMs) { + // remove the lock from the map and add a new lock + free(); + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + return true; + } + }; + } + }; + + feature.start(); + + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(1); + + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any()); + + Runnable checker = captor.getValue(); + + checker.run(); + + // lock should not be available now + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(3); + verify(callback).lockUnavailable(any()); + } + + @Test + public void testGetThreadPool() { + // use a real feature + feature = new SimpleLockManager(engine, new Properties()); + + // load properties + feature.start(); + + // should create thread pool + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // should shut down thread pool + feature.stop(); + } + + @Test + public void testSimpleLockNoArgs() { + SimpleLock lock = new SimpleLock(); + assertNull(lock.getResourceId()); + assertNull(lock.getOwnerKey()); + assertNull(lock.getCallback()); + assertEquals(0, lock.getHoldSec()); + + assertEquals(0, lock.getHoldUntilMs()); + } + + @Test + public void testSimpleLockSimpleLock() { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertEquals(RESOURCE, lock.getResourceId()); + assertEquals(OWNER_KEY, lock.getOwnerKey()); + assertSame(callback, lock.getCallback()); + assertEquals(HOLD_SEC, lock.getHoldSec()); + + assertThatIllegalArgumentException() + .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false)) + .withMessageContaining("holdSec"); + } + + @Test + public void testSimpleLockSerializable() throws Exception { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + lock = roundTrip(lock); + + assertTrue(lock.isActive()); + + assertEquals(RESOURCE, lock.getResourceId()); + assertEquals(OWNER_KEY, lock.getOwnerKey()); + assertNull(lock.getCallback()); + assertEquals(HOLD_SEC, lock.getHoldSec()); + } + + @Test + public void testSimpleLockExpired() { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + lock.grant(); + + assertFalse(lock.expired(testTime.getMillis())); + assertFalse(lock.expired(testTime.getMillis() + HOLD_MS - 1)); + assertTrue(lock.expired(testTime.getMillis() + HOLD_MS)); + } + + /** + * Tests grant() when the lock is already unavailable. + */ + @Test + public void testSimpleLockGrantUnavailable() { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + lock.setState(LockState.UNAVAILABLE); + lock.grant(); + + assertTrue(lock.isUnavailable()); + verify(callback, never()).lockAvailable(any()); + verify(callback, never()).lockUnavailable(any()); + } + + @Test + public void testSimpleLockFree() { + final SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // lock2 should be denied + SimpleLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(2); + verify(callback, never()).lockAvailable(lock2); + verify(callback).lockUnavailable(lock2); + + // lock2 was denied, so nothing new should happen when freed + assertFalse(lock2.free()); + invokeCallback(2); + + // force lock2 to be active - still nothing should happen + Whitebox.setInternalState(lock2, "state", LockState.ACTIVE); + assertFalse(lock2.free()); + invokeCallback(2); + + // now free the first lock + assertTrue(lock.free()); + assertEquals(LockState.UNAVAILABLE, lock.getState()); + + // should be able to get the lock now + SimpleLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock3.isActive()); + } + + /** + * Tests that free() works on a serialized lock with a new feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testSimpleLockFreeSerialized() throws Exception { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + feature = new MyLockingFeature(); + feature.start(); + + lock = roundTrip(lock); + assertTrue(lock.free()); + assertTrue(lock.isUnavailable()); + } + + /** + * Tests free() on a serialized lock without a feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testSimpleLockFreeNoFeature() throws Exception { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + SimpleLockManager.setLatestInstance(null); + + lock = roundTrip(lock); + assertFalse(lock.free()); + assertTrue(lock.isUnavailable()); + } + + @Test + public void testSimpleLockExtend() { + final SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // lock2 should be denied + SimpleLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(2); + verify(callback, never()).lockAvailable(lock2); + verify(callback).lockUnavailable(lock2); + + // lock2 will still be denied + lock2.extend(HOLD_SEC, callback); + invokeCallback(3); + verify(callback, times(2)).lockUnavailable(lock2); + + // force lock2 to be active - should still be denied + Whitebox.setInternalState(lock2, "state", LockState.ACTIVE); + lock2.extend(HOLD_SEC, callback); + invokeCallback(4); + verify(callback, times(3)).lockUnavailable(lock2); + + assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback)) + .withMessageContaining("holdSec"); + + // now extend the first lock + lock.extend(HOLD_SEC2, callback); + assertEquals(HOLD_SEC2, lock.getHoldSec()); + assertEquals(testTime.getMillis() + HOLD_MS2, lock.getHoldUntilMs()); + invokeCallback(5); + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + } + + /** + * Tests that extend() works on a serialized lock with a new feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testSimpleLockExtendSerialized() throws Exception { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + feature = new MyLockingFeature(); + feature.start(); + + lock = roundTrip(lock); + LockCallback scallback = mock(LockCallback.class); + + lock.extend(HOLD_SEC, scallback); + assertTrue(lock.isActive()); + + invokeCallback(1); + verify(scallback).lockAvailable(lock); + verify(scallback, never()).lockUnavailable(lock); + } + + /** + * Tests extend() on a serialized lock without a feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testSimpleLockExtendNoFeature() throws Exception { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + SimpleLockManager.setLatestInstance(null); + + lock = roundTrip(lock); + LockCallback scallback = mock(LockCallback.class); + + lock.extend(HOLD_SEC, scallback); + assertTrue(lock.isUnavailable()); + + invokeCallback(1); + verify(scallback, never()).lockAvailable(lock); + verify(scallback).lockUnavailable(lock); + } + + @Test + public void testSimpleLockToString() { + String text = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString(); + assertNotNull(text); + assertThat(text).contains("holdUntil").doesNotContain("ownerInfo").doesNotContain("callback"); + } + + /** + * Performs a multi-threaded test of the locking facility. + * + * @throws InterruptedException if the current thread is interrupted while waiting for + * the background threads to complete + */ + @Test + public void testMultiThreaded() throws InterruptedException { + Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, testTime); + feature = new SimpleLockManager(PolicyEngineConstants.getManager(), new Properties()); + feature.start(); + + List<MyThread> threads = new ArrayList<>(MAX_THREADS); + for (int x = 0; x < MAX_THREADS; ++x) { + threads.add(new MyThread()); + } + + threads.forEach(Thread::start); + + for (MyThread thread : threads) { + thread.join(6000); + assertFalse(thread.isAlive()); + } + + for (MyThread thread : threads) { + if (thread.err != null) { + throw thread.err; + } + } + + assertTrue(nsuccesses.get() > 0); + } + + private SimpleLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback, + boolean waitForLock) { + return (SimpleLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock); + } + + private SimpleLock roundTrip(SimpleLock lock) throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(lock); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + try (ObjectInputStream ois = new ObjectInputStream(bais)) { + return (SimpleLock) ois.readObject(); + } + } + + /** + * Invokes the last call-back in the work queue. + * + * @param nexpected number of call-backs expected in the work queue + */ + private void invokeCallback(int nexpected) { + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(nexpected)).execute(captor.capture()); + + if (nexpected > 0) { + captor.getAllValues().get(nexpected - 1).run(); + } + } + + /** + * Feature that uses <i>exsvc</i> to execute requests. + */ + private class MyLockingFeature extends SimpleLockManager { + + public MyLockingFeature() { + this(engine, new Properties()); + } + + public MyLockingFeature(PolicyEngine engine, Properties props) { + super(engine, props); + + exsvc = mock(ScheduledExecutorService.class); + when(engine.getExecutorService()).thenReturn(exsvc); + + when(exsvc.scheduleWithFixedDelay(any(), anyLong(), anyLong(), any())).thenAnswer(answer -> { + return future; + }); + } + } + + /** + * Thread used with the multi-threaded test. It repeatedly attempts to get a lock, + * extend it, and then unlock it. + */ + private class MyThread extends Thread { + AssertionError err = null; + + public MyThread() { + setDaemon(true); + } + + @Override + public void run() { + try { + for (int x = 0; x < MAX_LOOPS; ++x) { + makeAttempt(); + } + + } catch (AssertionError e) { + err = e; + } + } + + private void makeAttempt() { + try { + Semaphore sem = new Semaphore(0); + + LockCallback cb = new LockCallback() { + @Override + public void lockAvailable(Lock lock) { + sem.release(); + } + + @Override + public void lockUnavailable(Lock lock) { + sem.release(); + } + }; + + Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false); + + // wait for callback, whether available or unavailable + assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS)); + if (!lock.isActive()) { + return; + } + + nsuccesses.incrementAndGet(); + + assertEquals(1, nactive.incrementAndGet()); + + lock.extend(HOLD_SEC2, cb); + assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS)); + assertTrue(lock.isActive()); + + // decrement BEFORE free() + nactive.decrementAndGet(); + + assertTrue(lock.free()); + assertTrue(lock.isUnavailable()); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("interrupted", e); + } + } + } +} |