diff options
38 files changed, 5406 insertions, 3300 deletions
diff --git a/feature-distributed-locking/pom.xml b/feature-distributed-locking/pom.xml index af777efa..1b6063b0 100644 --- a/feature-distributed-locking/pom.xml +++ b/feature-distributed-locking/pom.xml @@ -2,7 +2,7 @@ ============LICENSE_START======================================================= ONAP Policy Engine - Drools PDP ================================================================================ - Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. ================================================================================ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -117,6 +117,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.powermock</groupId> <artifactId>powermock-api-mockito</artifactId> <scope>test</scope> diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java new file mode 100644 index 00000000..523c0d93 --- /dev/null +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java @@ -0,0 +1,936 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distributed.locking; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.commons.dbcp2.BasicDataSourceFactory; +import org.onap.policy.common.utils.network.NetworkUtil; +import org.onap.policy.drools.core.lock.AlwaysFailLock; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockImpl; +import org.onap.policy.drools.core.lock.LockState; +import org.onap.policy.drools.core.lock.PolicyResourceLockManager; +import org.onap.policy.drools.features.PolicyEngineFeatureApi; +import org.onap.policy.drools.persistence.SystemPersistenceConstants; +import org.onap.policy.drools.system.PolicyEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Distributed implementation of the Lock Feature. Maintains locks across servers using a + * shared DB. + * + * <p/> + * Note: this implementation does <i>not</i> honor the waitForLocks={@code true} + * parameter. + * + * <p/> + * Additional Notes: + * <dl> + * <li>The <i>owner</i> field in the DB is not derived from the lock's owner info, but is + * instead populated with the {@link #uuidString}.</li> + * <li>A periodic check of the DB is made to determine if any of the locks have + * expired.</li> + * <li>When a lock is deserialized, it will not initially appear in this feature's map; it + * will be added to the map once free() or extend() is invoked, provided there isn't + * already an entry. In addition, it initially has the host and UUID of the feature + * instance that created it. However, as soon as doExtend() completes successfully, the + * host and UUID of the lock will be updated to reflect the values within this feature + * instance.</li> + * </dl> + */ +public class DistributedLockManager + implements PolicyResourceLockManager, PolicyEngineFeatureApi { + + private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class); + + private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking"; + private static final String LOCK_LOST_MSG = "lock lost"; + private static final String NOT_LOCKED_MSG = "not locked"; + + @Getter(AccessLevel.PROTECTED) + @Setter(AccessLevel.PROTECTED) + private static DistributedLockManager latestInstance = null; + + + /** + * Name of the host on which this JVM is running. + */ + @Getter + private final String hostName; + + /** + * UUID of this object. + */ + @Getter + private final String uuidString = UUID.randomUUID().toString(); + + /** + * Maps a resource to the lock that owns it, or is awaiting a request for it. Once a + * lock is added to the map, it remains in the map until the lock is lost or until the + * unlock request completes. + */ + private final Map<String, DistributedLock> resource2lock = new ConcurrentHashMap<>(); + + /** + * Engine with which this manager is associated. + */ + private PolicyEngine engine; + + /** + * Feature properties. + */ + private DistributedLockProperties featProps; + + /** + * Thread pool used to check for lock expiration and to notify owners when locks are + * granted or lost. + */ + private ScheduledExecutorService exsvc = null; + + /** + * Data source used to connect to the DB. + */ + private BasicDataSource dataSource = null; + + + /** + * Constructs the object. + */ + public DistributedLockManager() { + this.hostName = NetworkUtil.getHostname(); + } + + @Override + public int getSequenceNumber() { + return 1000; + } + + @Override + public boolean isAlive() { + return (exsvc != null); + } + + @Override + public boolean start() { + // handled via engine API + return true; + } + + @Override + public boolean stop() { + // handled via engine API + return true; + } + + @Override + public void shutdown() { + // handled via engine API + } + + @Override + public boolean isLocked() { + return false; + } + + @Override + public boolean lock() { + return true; + } + + @Override + public boolean unlock() { + return true; + } + + @Override + public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) { + + try { + this.engine = engine; + this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME)); + this.exsvc = getThreadPool(); + this.dataSource = makeDataSource(); + + return this; + + } catch (Exception e) { + throw new DistributedLockManagerException(e); + } + } + + @Override + public boolean afterStart(PolicyEngine engine) { + + try { + exsvc.execute(this::deleteExpiredDbLocks); + exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS); + + setLatestInstance(this); + + } catch (Exception e) { + throw new DistributedLockManagerException(e); + } + + return false; + } + + /** + * Make data source. + * + * @return a new, pooled data source + * @throws Exception exception + */ + protected BasicDataSource makeDataSource() throws Exception { + Properties props = new Properties(); + props.put("driverClassName", featProps.getDbDriver()); + props.put("url", featProps.getDbUrl()); + props.put("username", featProps.getDbUser()); + props.put("password", featProps.getDbPwd()); + props.put("testOnBorrow", "true"); + props.put("poolPreparedStatements", "true"); + + // additional properties are listed in the GenericObjectPool API + + return BasicDataSourceFactory.createDataSource(props); + } + + /** + * Deletes expired locks from the DB. + */ + private void deleteExpiredDbLocks() { + logger.info("deleting all expired locks from the DB"); + + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn + .prepareStatement("DELETE FROM pooling.locks WHERE expirationTime <= now()")) { + + int ndel = stmt.executeUpdate(); + logger.info("deleted {} expired locks from the DB", ndel); + + } catch (SQLException e) { + logger.warn("failed to delete expired locks from the DB", e); + } + } + + /** + * Closes the data source. Does <i>not</i> invoke any lock call-backs. + */ + @Override + public boolean afterStop(PolicyEngine engine) { + exsvc = null; + closeDataSource(); + return false; + } + + /** + * Closes {@link #dataSource} and sets it to {@code null}. + */ + private void closeDataSource() { + try { + if (dataSource != null) { + dataSource.close(); + } + + } catch (SQLException e) { + logger.error("cannot close the distributed locking DB", e); + } + + dataSource = null; + } + + @Override + public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback, + boolean waitForLock) { + + if (latestInstance != this) { + AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback); + lock.notifyUnavailable(); + return lock; + } + + DistributedLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback); + + DistributedLock existingLock = resource2lock.putIfAbsent(resourceId, lock); + + // do these outside of compute() to avoid blocking other map operations + if (existingLock == null) { + logger.debug("added lock to map {}", lock); + lock.scheduleRequest(lock::doLock); + } else { + lock.deny("resource is busy", true); + } + + return lock; + } + + /** + * Checks for expired locks. + */ + private void checkExpired() { + + try { + logger.info("checking for expired locks"); + Set<String> expiredIds = new HashSet<>(resource2lock.keySet()); + identifyDbLocks(expiredIds); + expireLocks(expiredIds); + + exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS); + + } catch (RejectedExecutionException e) { + logger.warn("thread pool is no longer accepting requests", e); + + } catch (SQLException | RuntimeException e) { + logger.error("error checking expired locks", e); + exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS); + } + + logger.info("done checking for expired locks"); + } + + /** + * Identifies this feature instance's locks that the DB indicates are still active. + * + * @param expiredIds IDs of resources that have expired locks. If a resource is still + * locked, it's ID is removed from this set + * @throws SQLException if a DB error occurs + */ + private void identifyDbLocks(Set<String> expiredIds) throws SQLException { + /* + * We could query for host and UUIDs that actually appear within the locks, but + * those might change while the query is running so no real value in doing that. + * On the other hand, there's only a brief instance between the time a + * deserialized lock is added to this feature instance and its doExtend() method + * updates its host and UUID to match this feature instance. If this happens to + * run during that brief instance, then the lock will be lost and the callback + * invoked. It isn't worth complicating this code further to handle those highly + * unlikely cases. + */ + + // @formatter:off + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement( + "SELECT resourceId FROM pooling.locks WHERE host=? AND owner=? AND expirationTime > now()")) { + // @formatter:on + + stmt.setString(1, hostName); + stmt.setString(2, uuidString); + + try (ResultSet resultSet = stmt.executeQuery()) { + while (resultSet.next()) { + String resourceId = resultSet.getString(1); + + // we have now seen this resource id + expiredIds.remove(resourceId); + } + } + } + } + + /** + * Expires locks for the resources that no longer appear within the DB. + * + * @param expiredIds IDs of resources that have expired locks + */ + private void expireLocks(Set<String> expiredIds) { + for (String resourceId : expiredIds) { + AtomicReference<DistributedLock> lockref = new AtomicReference<>(null); + + resource2lock.computeIfPresent(resourceId, (key, lock) -> { + if (lock.isActive()) { + // it thinks it's active, but it isn't - remove from the map + lockref.set(lock); + return null; + } + + return lock; + }); + + DistributedLock lock = lockref.get(); + if (lock != null) { + logger.debug("removed lock from map {}", lock); + lock.deny(LOCK_LOST_MSG, false); + } + } + } + + /** + * Distributed Lock implementation. + */ + public static class DistributedLock extends LockImpl { + private static final long serialVersionUID = 1L; + + /** + * Feature containing this lock. May be {@code null} until the feature is + * identified. Note: this can only be null if the lock has been de-serialized. + */ + private transient DistributedLockManager feature; + + /** + * Host name from the feature instance that created this object. Replaced with the + * host name from the current feature instance whenever the lock is successfully + * extended. + */ + private String hostName; + + /** + * UUID string from the feature instance that created this object. Replaced with + * the UUID string from the current feature instance whenever the lock is + * successfully extended. + */ + private String uuidString; + + /** + * {@code True} if the lock is busy making a request, {@code false} otherwise. + */ + private transient boolean busy = false; + + /** + * Request to be performed. + */ + private transient RunnableWithEx request = null; + + /** + * Number of times we've retried a request. + */ + private transient int nretries = 0; + + /** + * Constructs the object. + */ + public DistributedLock() { + this.hostName = ""; + this.uuidString = ""; + } + + /** + * Constructs the object. + * + * @param state initial state of the lock + * @param resourceId identifier of the resource to be locked + * @param ownerKey information identifying the owner requesting the lock + * @param holdSec amount of time, in seconds, for which the lock should be held, + * after which it will automatically be released + * @param callback callback to be invoked once the lock is granted, or + * subsequently lost; must not be {@code null} + * @param feature feature containing this lock + */ + public DistributedLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback, + DistributedLockManager feature) { + super(state, resourceId, ownerKey, holdSec, callback); + + this.feature = feature; + this.hostName = feature.hostName; + this.uuidString = feature.uuidString; + } + + /** + * Grants this lock. The notification is <i>always</i> invoked via the + * <i>foreground</i> thread. + */ + protected void grant() { + synchronized (this) { + if (isUnavailable()) { + return; + } + + setState(LockState.ACTIVE); + } + + logger.info("lock granted: {}", this); + + notifyAvailable(); + } + + /** + * Permanently denies this lock. + * + * @param reason the reason the lock was denied + * @param foreground {@code true} if the callback can be invoked in the current + * (i.e., foreground) thread, {@code false} if it should be invoked via the + * executor + */ + protected void deny(String reason, boolean foreground) { + synchronized (this) { + setState(LockState.UNAVAILABLE); + } + + logger.info("{}: {}", reason, this); + + if (feature == null || foreground) { + notifyUnavailable(); + + } else { + feature.exsvc.execute(this::notifyUnavailable); + } + } + + @Override + public boolean free() { + // do a quick check of the state + if (isUnavailable()) { + return false; + } + + logger.info("releasing lock: {}", this); + + if (!attachFeature()) { + setState(LockState.UNAVAILABLE); + return false; + } + + AtomicBoolean result = new AtomicBoolean(false); + + feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> { + if (curlock == this && !isUnavailable()) { + // this lock was the owner + result.set(true); + setState(LockState.UNAVAILABLE); + + /* + * NOTE: do NOT return null; curlock must remain until doUnlock + * completes. + */ + } + + return curlock; + }); + + if (result.get()) { + scheduleRequest(this::doUnlock); + return true; + } + + return false; + } + + @Override + public void extend(int holdSec, LockCallback callback) { + if (holdSec < 0) { + throw new IllegalArgumentException("holdSec is negative"); + } + + setHoldSec(holdSec); + setCallback(callback); + + // do a quick check of the state + if (isUnavailable() || !attachFeature()) { + deny(LOCK_LOST_MSG, true); + return; + } + + AtomicBoolean success = new AtomicBoolean(false); + + feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> { + if (curlock == this && !isUnavailable()) { + success.set(true); + setState(LockState.WAITING); + } + + // note: leave it in the map until doUnlock() removes it + + return curlock; + }); + + if (success.get()) { + scheduleRequest(this::doExtend); + + } else { + deny(NOT_LOCKED_MSG, true); + } + } + + /** + * Attaches to the feature instance, if not already attached. + * + * @return {@code true} if the lock is now attached to a feature, {@code false} + * otherwise + */ + private synchronized boolean attachFeature() { + if (feature != null) { + // already attached + return true; + } + + feature = latestInstance; + if (feature == null) { + logger.warn("no feature yet for {}", this); + return false; + } + + // put this lock into the map + feature.resource2lock.putIfAbsent(getResourceId(), this); + + return true; + } + + /** + * Schedules a request for execution. + * + * @param schedreq the request that should be scheduled + */ + private synchronized void scheduleRequest(RunnableWithEx schedreq) { + logger.debug("schedule lock action {}", this); + nretries = 0; + request = schedreq; + feature.exsvc.execute(this::doRequest); + } + + /** + * Reschedules a request for execution, if there is not already a request in the + * queue, and if the retry count has not been exhausted. + * + * @param req request to be rescheduled + */ + private void rescheduleRequest(RunnableWithEx req) { + synchronized (this) { + if (request != null) { + // a new request has already been scheduled - it supersedes "req" + logger.debug("not rescheduling lock action {}", this); + return; + } + + if (nretries++ < feature.featProps.getMaxRetries()) { + logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this); + request = req; + feature.exsvc.schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS); + return; + } + } + + logger.warn("retry count {} exhausted for lock: {}", feature.featProps.getMaxRetries(), this); + removeFromMap(); + } + + /** + * Gets, and removes, the next request from the queue. Clears {@link #busy} if + * there are no more requests in the queue. + * + * @param prevReq the previous request that was just run + * + * @return the next request, or {@code null} if the queue is empty + */ + private synchronized RunnableWithEx getNextRequest(RunnableWithEx prevReq) { + if (request == null || request == prevReq) { + logger.debug("no more requests for {}", this); + busy = false; + return null; + } + + RunnableWithEx req = request; + request = null; + + return req; + } + + /** + * Executes the current request, if none are currently executing. + */ + private void doRequest() { + synchronized (this) { + if (busy) { + // another thread is already processing the request(s) + return; + } + busy = true; + } + + /* + * There is a race condition wherein this thread could invoke run() while the + * next scheduled thread checks the busy flag and finds that work is being + * done and returns, leaving the next work item in "request". In that case, + * the next work item may never be executed, thus we use a loop here, instead + * of just executing a single request. + */ + RunnableWithEx req = null; + while ((req = getNextRequest(req)) != null) { + if (feature.resource2lock.get(getResourceId()) != this) { + /* + * no longer in the map - don't apply the action, as it may interfere + * with any newly added Lock object + */ + logger.debug("discard lock action {}", this); + synchronized (this) { + busy = false; + } + return; + } + + try { + /* + * Run the request. If it throws an exception, then it will be + * rescheduled for execution a little later. + */ + req.run(); + + } catch (SQLException e) { + logger.warn("request failed for lock: {}", this, e); + + if (feature.featProps.isTransient(e.getErrorCode())) { + // retry the request a little later + rescheduleRequest(req); + } else { + removeFromMap(); + } + + } catch (RuntimeException e) { + logger.warn("request failed for lock: {}", this, e); + removeFromMap(); + } + } + } + + /** + * Attempts to add a lock to the DB. Generates a callback, indicating success or + * failure. + * + * @throws SQLException if a DB error occurs + */ + private void doLock() throws SQLException { + if (!isWaiting()) { + logger.debug("discard doLock {}", this); + return; + } + + /* + * There is a small window in which a client could invoke free() before the DB + * is updated. In that case, doUnlock will be added to the queue to run after + * this, which will delete the record, as desired. In addition, grant() will + * not do anything, because the lock state will have been set to UNAVAILABLE + * by free(). + */ + + logger.debug("doLock {}", this); + try (Connection conn = feature.dataSource.getConnection()) { + boolean success = false; + try { + success = doDbInsert(conn); + + } catch (SQLException e) { + logger.info("failed to insert lock record - attempting update: {}", this, e); + success = doDbUpdate(conn); + } + + if (success) { + grant(); + return; + } + } + + removeFromMap(); + } + + /** + * Attempts to remove a lock from the DB. Does <i>not</i> generate a callback if + * it fails, as this should only be executed in response to a call to + * {@link #free()}. + * + * @throws SQLException if a DB error occurs + */ + private void doUnlock() throws SQLException { + logger.debug("unlock {}", this); + try (Connection conn = feature.dataSource.getConnection()) { + doDbDelete(conn); + } + + removeFromMap(); + } + + /** + * Attempts to extend a lock in the DB. Generates a callback, indicating success + * or failure. + * + * @throws SQLException if a DB error occurs + */ + private void doExtend() throws SQLException { + if (!isWaiting()) { + logger.debug("discard doExtend {}", this); + return; + } + + /* + * There is a small window in which a client could invoke free() before the DB + * is updated. In that case, doUnlock will be added to the queue to run after + * this, which will delete the record, as desired. In addition, grant() will + * not do anything, because the lock state will have been set to UNAVAILABLE + * by free(). + */ + + logger.debug("doExtend {}", this); + try (Connection conn = feature.dataSource.getConnection()) { + /* + * invoker may have called extend() before free() had a chance to insert + * the record, thus we have to try to insert, if the update fails + */ + if (doDbUpdate(conn) || doDbInsert(conn)) { + grant(); + return; + } + } + + removeFromMap(); + } + + /** + * Inserts the lock into the DB. + * + * @param conn DB connection + * @return {@code true} if a record was successfully inserted, {@code false} + * otherwise + * @throws SQLException if a DB error occurs + */ + protected boolean doDbInsert(Connection conn) throws SQLException { + logger.debug("insert lock record {}", this); + try (PreparedStatement stmt = + conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) " + + "values (?, ?, ?, timestampadd(second, ?, now()))")) { + + stmt.setString(1, getResourceId()); + stmt.setString(2, feature.hostName); + stmt.setString(3, feature.uuidString); + stmt.setInt(4, getHoldSec()); + + stmt.executeUpdate(); + + this.hostName = feature.hostName; + this.uuidString = feature.uuidString; + + return true; + } + } + + /** + * Updates the lock in the DB. + * + * @param conn DB connection + * @return {@code true} if a record was successfully updated, {@code false} + * otherwise + * @throws SQLException if a DB error occurs + */ + protected boolean doDbUpdate(Connection conn) throws SQLException { + logger.debug("update lock record {}", this); + try (PreparedStatement stmt = + conn.prepareStatement("UPDATE pooling.locks SET resourceId=?, host=?, owner=?," + + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?" + + " AND ((host=? AND owner=?) OR expirationTime < now())")) { + + stmt.setString(1, getResourceId()); + stmt.setString(2, feature.hostName); + stmt.setString(3, feature.uuidString); + stmt.setInt(4, getHoldSec()); + + stmt.setString(5, getResourceId()); + stmt.setString(6, this.hostName); + stmt.setString(7, this.uuidString); + + if (stmt.executeUpdate() != 1) { + return false; + } + + this.hostName = feature.hostName; + this.uuidString = feature.uuidString; + + return true; + } + } + + /** + * Deletes the lock from the DB. + * + * @param conn DB connection + * @throws SQLException if a DB error occurs + */ + protected void doDbDelete(Connection conn) throws SQLException { + logger.debug("delete lock record {}", this); + try (PreparedStatement stmt = + conn.prepareStatement("DELETE pooling.locks WHERE resourceId=? AND host=? AND owner=?")) { + + stmt.setString(1, getResourceId()); + stmt.setString(2, this.hostName); + stmt.setString(3, this.uuidString); + + stmt.executeUpdate(); + } + } + + /** + * Removes the lock from the map, and sends a notification using the current + * thread. + */ + private void removeFromMap() { + logger.debug("remove lock from map {}", this); + feature.resource2lock.remove(getResourceId(), this); + + synchronized (this) { + if (!isUnavailable()) { + deny(LOCK_LOST_MSG, true); + } + } + } + + @Override + public String toString() { + return "DistributedLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey=" + + getOwnerKey() + ", holdSec=" + getHoldSec() + ", hostName=" + hostName + ", uuidString=" + + uuidString + "]"; + } + } + + @FunctionalInterface + private static interface RunnableWithEx { + void run() throws SQLException; + } + + // these may be overridden by junit tests + + protected Properties getProperties(String fileName) { + return SystemPersistenceConstants.getManager().getProperties(fileName); + } + + protected ScheduledExecutorService getThreadPool() { + return engine.getExecutorService(); + } + + protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this); + } +} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java index 55fc4fab..e720f9a1 100644 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * feature-distributed-locking * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,15 +20,15 @@ package org.onap.policy.distributed.locking; -public class DistributedLockingFeatureException extends RuntimeException { +public class DistributedLockManagerException extends RuntimeException { private static final long serialVersionUID = 1L; /** * Constructor. - * + * * @param ex exception to be wrapped */ - public DistributedLockingFeatureException(Exception ex) { + public DistributedLockManagerException(Exception ex) { super(ex); } } diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java new file mode 100644 index 00000000..f470c8e2 --- /dev/null +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java @@ -0,0 +1,136 @@ +/* + * ============LICENSE_START======================================================= + * feature-distributed-locking + * ================================================================================ + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distributed.locking; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.utils.properties.BeanConfigurator; +import org.onap.policy.common.utils.properties.Property; +import org.onap.policy.common.utils.properties.exception.PropertyException; + + +@Getter +@Setter +public class DistributedLockProperties { + public static final String PREFIX = "distributed.locking."; + + public static final String DB_DRIVER = "javax.persistence.jdbc.driver"; + public static final String DB_URL = "javax.persistence.jdbc.url"; + public static final String DB_USER = "javax.persistence.jdbc.user"; + public static final String DB_PASS = "javax.persistence.jdbc.password"; + public static final String TRANSIENT_ERROR_CODES = PREFIX + "transient.error.codes"; + public static final String EXPIRE_CHECK_SEC = PREFIX + "expire.check.seconds"; + public static final String RETRY_SEC = PREFIX + "retry.seconds"; + public static final String MAX_RETRIES = PREFIX + "max.retries"; + + /** + * Database driver. + */ + @Property(name = DB_DRIVER) + private String dbDriver; + + /** + * Database url. + */ + @Property(name = DB_URL) + private String dbUrl; + + /** + * Database user. + */ + @Property(name = DB_USER) + private String dbUser; + + /** + * Database password. + */ + @Property(name = DB_PASS) + private String dbPwd; + + /** + * Vendor-specific error codes that are "transient", meaning they may go away if the + * command is repeated (e.g., connection issue), as opposed to something like a syntax + * error or a duplicate key. + */ + @Property(name = TRANSIENT_ERROR_CODES) + private String errorCodeStrings; + + private final Set<Integer> transientErrorCodes; + + /** + * Time, in seconds, to wait between checks for expired locks. + */ + @Property(name = EXPIRE_CHECK_SEC, defaultValue = "900") + private int expireCheckSec; + + /** + * Number of seconds to wait before retrying, after a DB error. + */ + @Property(name = RETRY_SEC, defaultValue = "60") + private int retrySec; + + /** + * Maximum number of times to retry a DB operation. + */ + @Property(name = MAX_RETRIES, defaultValue = "2") + private int maxRetries; + + /** + * Constructs the object, populating fields from the properties. + * + * @param props properties from which to configure this + * @throws PropertyException if an error occurs + */ + public DistributedLockProperties(Properties props) throws PropertyException { + new BeanConfigurator().configureFromProperties(this, props); + + Set<Integer> set = new HashSet<>(); + for (String text : errorCodeStrings.split(",")) { + text = text.trim(); + if (text.isEmpty()) { + continue; + } + + try { + set.add(Integer.valueOf(text)); + + } catch (NumberFormatException e) { + throw new PropertyException(TRANSIENT_ERROR_CODES, "errorCodeStrings", e); + } + } + + transientErrorCodes = Collections.unmodifiableSet(set); + } + + /** + * Determines if an error is transient. + * + * @param errorCode error code to check + * @return {@code true} if the error is transient, {@code false} otherwise + */ + public boolean isTransient(int errorCode) { + return transientErrorCodes.contains(errorCode); + } +} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java deleted file mode 100644 index d5e07a30..00000000 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * feature-distributed-locking - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.distributed.locking; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.Properties; -import java.util.UUID; -import org.apache.commons.dbcp2.BasicDataSource; -import org.apache.commons.dbcp2.BasicDataSourceFactory; -import org.onap.policy.common.utils.properties.exception.PropertyException; -import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi; -import org.onap.policy.drools.features.PolicyEngineFeatureApi; -import org.onap.policy.drools.persistence.SystemPersistenceConstants; -import org.onap.policy.drools.system.PolicyEngine; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DistributedLockingFeature implements PolicyEngineFeatureApi, PolicyResourceLockFeatureApi { - - /** - * Logger instance. - */ - private static final Logger logger = LoggerFactory.getLogger(DistributedLockingFeature.class); - - /** - * Properties Configuration Name. - */ - public static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking"; - - /** - * Properties for locking feature. - */ - private DistributedLockingProperties lockProps; - - /** - * Data source used to connect to the DB containing locks. - */ - private BasicDataSource dataSource; - - /** - * UUID. - */ - private static final UUID uuid = UUID.randomUUID(); - - @Override - public int getSequenceNumber() { - return 1000; - } - - @Override - public OperResult beforeLock(String resourceId, String owner, int holdSec) { - - TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource); - - return (lock.lock(holdSec) ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); - } - - @Override - public OperResult beforeRefresh(String resourceId, String owner, int holdSec) { - - TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource); - - return (lock.refresh(holdSec) ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); - } - - @Override - public OperResult beforeUnlock(String resourceId, String owner) { - TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource); - - return (lock.unlock() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); - } - - @Override - public OperResult beforeIsLockedBy(String resourceId, String owner) { - TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource); - - return (lock.isActive() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); - } - - @Override - public OperResult beforeIsLocked(String resourceId) { - TargetLock lock = new TargetLock(resourceId, uuid, "dummyOwner", dataSource); - - return (lock.isLocked() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); - } - - @Override - public boolean afterStart(PolicyEngine engine) { - - try { - this.lockProps = new DistributedLockingProperties(SystemPersistenceConstants.getManager() - .getProperties(DistributedLockingFeature.CONFIGURATION_PROPERTIES_NAME)); - this.dataSource = makeDataSource(); - } catch (PropertyException e) { - logger.error("DistributedLockingFeature feature properies have not been loaded", e); - throw new DistributedLockingFeatureException(e); - } catch (InterruptedException e) { - logger.error("DistributedLockingFeature failed to create data source", e); - Thread.currentThread().interrupt(); - throw new DistributedLockingFeatureException(e); - } catch (Exception e) { - logger.error("DistributedLockingFeature failed to create data source", e); - throw new DistributedLockingFeatureException(e); - } - - cleanLockTable(); - - return false; - } - - /** - * Make data source. - * - * @return a new, pooled data source - * @throws Exception exception - */ - protected BasicDataSource makeDataSource() throws Exception { - Properties props = new Properties(); - props.put("driverClassName", lockProps.getDbDriver()); - props.put("url", lockProps.getDbUrl()); - props.put("username", lockProps.getDbUser()); - props.put("password", lockProps.getDbPwd()); - props.put("testOnBorrow", "true"); - props.put("poolPreparedStatements", "true"); - - // additional properties are listed in the GenericObjectPool API - - return BasicDataSourceFactory.createDataSource(props); - } - - /** - * This method kills the heartbeat thread and calls refreshLockTable which removes - * any records from the db where the current host is the owner. - */ - @Override - public boolean beforeShutdown(PolicyEngine engine) { - cleanLockTable(); - return false; - } - - /** - * This method removes all records owned by the current host from the db. - */ - private void cleanLockTable() { - - try (Connection conn = dataSource.getConnection(); - PreparedStatement statement = conn.prepareStatement( - "DELETE FROM pooling.locks WHERE host = ? OR expirationTime < now()") - ) { - - statement.setString(1, uuid.toString()); - statement.executeUpdate(); - - } catch (SQLException e) { - logger.error("error in refreshLockTable()", e); - } - - } -} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java deleted file mode 100644 index 0ed5930d..00000000 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * feature-distributed-locking - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.distributed.locking; - -import java.util.Properties; -import org.onap.policy.common.utils.properties.BeanConfigurator; -import org.onap.policy.common.utils.properties.Property; -import org.onap.policy.common.utils.properties.exception.PropertyException; - - -public class DistributedLockingProperties { - - /** - * Feature properties all begin with this prefix. - */ - public static final String PREFIX = "distributed.locking."; - - public static final String DB_DRIVER = "javax.persistence.jdbc.driver"; - public static final String DB_URL = "javax.persistence.jdbc.url"; - public static final String DB_USER = "javax.persistence.jdbc.user"; - public static final String DB_PWD = "javax.persistence.jdbc.password"; - - /** - * Properties from which this was constructed. - */ - private final Properties source; - - /** - * Database driver. - */ - @Property(name = DB_DRIVER) - private String dbDriver; - - /** - * Database url. - */ - @Property(name = DB_URL) - private String dbUrl; - - /** - * Database user. - */ - @Property(name = DB_USER) - private String dbUser; - - /** - * Database password. - */ - @Property(name = DB_PWD) - private String dbPwd; - - /** - * Constructs the object, populating fields from the properties. - * - * @param props properties from which to configure this - * @throws PropertyException if an error occurs - */ - public DistributedLockingProperties(Properties props) throws PropertyException { - source = props; - - new BeanConfigurator().configureFromProperties(this, props); - } - - - public Properties getSource() { - return source; - } - - - public String getDbDriver() { - return dbDriver; - } - - - public String getDbUrl() { - return dbUrl; - } - - - public String getDbUser() { - return dbUser; - } - - - public String getDbPwd() { - return dbPwd; - } - - - public void setDbDriver(String dbDriver) { - this.dbDriver = dbDriver; - } - - - public void setDbUrl(String dbUrl) { - this.dbUrl = dbUrl; - } - - - public void setDbUser(String dbUser) { - this.dbUser = dbUser; - } - - - public void setDbPwd(String dbPwd) { - this.dbPwd = dbPwd; - } - -} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java deleted file mode 100644 index 42e1f92f..00000000 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java +++ /dev/null @@ -1,282 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * feature-distributed-locking - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.distributed.locking; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.UUID; -import org.apache.commons.dbcp2.BasicDataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TargetLock { - - private static final Logger logger = LoggerFactory.getLogger(TargetLock.class); - - /** - * The Target resource we want to lock. - */ - private String resourceId; - - /** - * Data source used to connect to the DB containing locks. - */ - private BasicDataSource dataSource; - - /** - * UUID . - */ - private UUID uuid; - - /** - * Owner. - */ - private String owner; - - /** - * Constructs a TargetLock object. - * - * @param resourceId ID of the entity we want to lock - * @param dataSource used to connect to the DB containing locks - */ - public TargetLock(String resourceId, UUID uuid, String owner, BasicDataSource dataSource) { - this.resourceId = resourceId; - this.uuid = uuid; - this.owner = owner; - this.dataSource = dataSource; - } - - /** - * Obtain a lock. - * @param holdSec the amount of time, in seconds, that the lock should be held - */ - public boolean lock(int holdSec) { - - return grabLock(holdSec); - } - - /** - * Refresh a lock. - * - * @param holdSec the amount of time, in seconds, that the lock should be held - * @return {@code true} if the lock was refreshed, {@code false} if the resource is - * not currently locked by the given owner - */ - public boolean refresh(int holdSec) { - return updateLock(holdSec); - } - - /** - * Unlock a resource by deleting it's associated record in the db. - */ - public boolean unlock() { - return deleteLock(); - } - - /** - * "Grabs" lock by attempting to insert a new record in the db. - * If the insert fails due to duplicate key error resource is already locked - * so we call secondGrab. - * @param holdSec the amount of time, in seconds, that the lock should be held - */ - private boolean grabLock(int holdSec) { - - // try to insert a record into the table(thereby grabbing the lock) - try (Connection conn = dataSource.getConnection(); - - PreparedStatement statement = conn.prepareStatement( - "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) " - + "values (?, ?, ?, timestampadd(second, ?, now()))")) { - - int index = 1; - statement.setString(index++, this.resourceId); - statement.setString(index++, this.uuid.toString()); - statement.setString(index++, this.owner); - statement.setInt(index++, holdSec); - statement.executeUpdate(); - } - - catch (SQLException e) { - logger.error("error in TargetLock.grabLock()", e); - return secondGrab(holdSec); - } - - return true; - } - - /** - * A second attempt at grabbing a lock. It first attempts to update the lock in case it is expired. - * If that fails, it attempts to insert a new record again - * @param holdSec the amount of time, in seconds, that the lock should be held - */ - private boolean secondGrab(int holdSec) { - - try (Connection conn = dataSource.getConnection(); - - PreparedStatement updateStatement = conn.prepareStatement( - "UPDATE pooling.locks SET host = ?, owner = ?, " - + "expirationTime = timestampadd(second, ?, now()) " - + "WHERE resourceId = ? AND expirationTime < now()"); - - PreparedStatement insertStatement = conn.prepareStatement( - "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) " - + "values (?, ?, ?, timestampadd(second, ?, now()))");) { - - int index = 1; - updateStatement.setString(index++, this.uuid.toString()); - updateStatement.setString(index++, this.owner); - updateStatement.setInt(index++, holdSec); - updateStatement.setString(index++, this.resourceId); - - // The lock was expired and we grabbed it. - // return true - if (updateStatement.executeUpdate() == 1) { - return true; - } - - // If our update does not return 1 row, the lock either has not expired - // or it was removed. Try one last grab - else { - index = 1; - insertStatement.setString(index++, this.resourceId); - insertStatement.setString(index++, this.uuid.toString()); - insertStatement.setString(index++, this.owner); - insertStatement.setInt(index++, holdSec); - - // If our insert returns 1 we successfully grabbed the lock - return (insertStatement.executeUpdate() == 1); - } - - } catch (SQLException e) { - logger.error("error in TargetLock.secondGrab()", e); - return false; - } - - } - - /** - * Updates the DB record associated with the lock. - * - * @param holdSec the amount of time, in seconds, that the lock should be held - * @return {@code true} if the record was updated, {@code false} otherwise - */ - private boolean updateLock(int holdSec) { - - try (Connection conn = dataSource.getConnection(); - - PreparedStatement updateStatement = conn.prepareStatement( - "UPDATE pooling.locks SET host = ?, owner = ?, " - + "expirationTime = timestampadd(second, ?, now()) " - + "WHERE resourceId = ? AND owner = ? AND expirationTime >= now()")) { - - int index = 1; - updateStatement.setString(index++, this.uuid.toString()); - updateStatement.setString(index++, this.owner); - updateStatement.setInt(index++, holdSec); - updateStatement.setString(index++, this.resourceId); - updateStatement.setString(index++, this.owner); - - // refresh succeeded iff a record was updated - return (updateStatement.executeUpdate() == 1); - - } catch (SQLException e) { - logger.error("error in TargetLock.refreshLock()", e); - return false; - } - - } - - /** - *To remove a lock we simply delete the record from the db . - */ - private boolean deleteLock() { - - try (Connection conn = dataSource.getConnection(); - - PreparedStatement deleteStatement = conn.prepareStatement( - "DELETE FROM pooling.locks WHERE resourceId = ? AND owner = ? AND host = ?")) { - - deleteStatement.setString(1, this.resourceId); - deleteStatement.setString(2, this.owner); - deleteStatement.setString(3, this.uuid.toString()); - - return (deleteStatement.executeUpdate() == 1); - - } catch (SQLException e) { - logger.error("error in TargetLock.deleteLock()", e); - return false; - } - - } - - /** - * Is the lock active. - */ - public boolean isActive() { - try (Connection conn = dataSource.getConnection(); - - PreparedStatement selectStatement = conn.prepareStatement( - "SELECT * FROM pooling.locks " - + "WHERE resourceId = ? AND host = ? AND owner= ? AND expirationTime >= now()")) { - - selectStatement.setString(1, this.resourceId); - selectStatement.setString(2, this.uuid.toString()); - selectStatement.setString(3, this.owner); - try (ResultSet result = selectStatement.executeQuery()) { - - // This will return true if the - // query returned at least one row - return result.first(); - } - - } - - catch (SQLException e) { - logger.error("error in TargetLock.isActive()", e); - return false; - } - - } - - /** - * Is the resource locked. - */ - public boolean isLocked() { - - try (Connection conn = dataSource.getConnection(); - PreparedStatement selectStatement = conn - .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND expirationTime >= now()")) { - - selectStatement.setString(1, this.resourceId); - try (ResultSet result = selectStatement.executeQuery()) { - // This will return true if the - // query returned at least one row - return result.first(); - } - } catch (SQLException e) { - logger.error("error in TargetLock.isActive()", e); - return false; - } - } - -} diff --git a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi deleted file mode 100644 index 19bdf505..00000000 --- a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi +++ /dev/null @@ -1 +0,0 @@ -org.onap.policy.distributed.locking.DistributedLockingFeature
\ No newline at end of file diff --git a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi index 19bdf505..2a7c0547 100644 --- a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi +++ b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi @@ -1 +1 @@ -org.onap.policy.distributed.locking.DistributedLockingFeature
\ No newline at end of file +org.onap.policy.distributed.locking.DistributedLockManager
\ No newline at end of file diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureExceptionTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerExceptionTest.java index 7ba2384a..cfd6a151 100644 --- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureExceptionTest.java +++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerExceptionTest.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * feature-distributed-locking * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -24,13 +24,13 @@ import static org.junit.Assert.assertEquals; import org.junit.Test; import org.onap.policy.common.utils.test.ExceptionsTester; -import org.onap.policy.distributed.locking.DistributedLockingFeatureException; +import org.onap.policy.distributed.locking.DistributedLockManagerException; -public class DistributedLockingFeatureExceptionTest extends ExceptionsTester { +public class DistributedLockManagerExceptionTest extends ExceptionsTester { @Test public void test() { - assertEquals(1, test(DistributedLockingFeatureException.class)); + assertEquals(1, test(DistributedLockManagerException.class)); } } diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java new file mode 100644 index 00000000..59b56224 --- /dev/null +++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java @@ -0,0 +1,1818 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distributed.locking; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.dbcp2.BasicDataSource; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.common.utils.services.OrderedServiceImpl; +import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockState; +import org.onap.policy.drools.features.PolicyEngineFeatureApi; +import org.onap.policy.drools.persistence.SystemPersistenceConstants; +import org.onap.policy.drools.system.PolicyEngine; +import org.onap.policy.drools.system.PolicyEngineConstants; +import org.powermock.reflect.Whitebox; + +public class DistributedLockManagerTest { + private static final long EXPIRE_SEC = 900L; + private static final long RETRY_SEC = 60L; + private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService"; + private static final String OTHER_HOST = "other-host"; + private static final String OTHER_OWNER = "other-owner"; + private static final String EXPECTED_EXCEPTION = "expected exception"; + private static final String DB_CONNECTION = + "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling"; + private static final String DB_USER = "user"; + private static final String DB_PASSWORD = "password"; + private static final String OWNER_KEY = "my key"; + private static final String RESOURCE = "my resource"; + private static final String RESOURCE2 = "my resource #2"; + private static final String RESOURCE3 = "my resource #3"; + private static final String RESOURCE4 = "my resource #4"; + private static final String RESOURCE5 = "my resource #5"; + private static final int HOLD_SEC = 100; + private static final int HOLD_SEC2 = 120; + private static final int MAX_THREADS = 5; + private static final int MAX_LOOPS = 100; + private static final int TRANSIENT = 500; + private static final int PERMANENT = 600; + + // number of execute() calls before the first lock attempt + private static final int PRE_LOCK_EXECS = 1; + + // number of execute() calls before the first schedule attempt + private static final int PRE_SCHED_EXECS = 1; + + private static Connection conn = null; + private static ScheduledExecutorService saveExec; + private static ScheduledExecutorService realExec; + + @Mock + private ScheduledExecutorService exsvc; + + @Mock + private LockCallback callback; + + @Mock + private BasicDataSource datasrc; + + @Mock + private PolicyEngine engine; + + private DistributedLock lock; + + private AtomicInteger nactive; + private AtomicInteger nsuccesses; + private DistributedLockManager feature; + + + /** + * Configures the location of the property files and creates the DB. + * + * @throws SQLException if the DB cannot be created + */ + @BeforeClass + public static void setUpBeforeClass() throws SQLException { + SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources"); + + conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD); + + try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks " + + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), " + + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) { + createStmt.executeUpdate(); + } + + saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD); + + realExec = Executors.newScheduledThreadPool(3); + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec); + } + + /** + * Restores static fields. + */ + @AfterClass + public static void tearDownAfterClass() throws SQLException { + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec); + realExec.shutdown(); + conn.close(); + } + + /** + * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute + * tasks. + * + * @throws SQLException if the lock records cannot be deleted from the DB + */ + @Before + public void setUp() throws SQLException { + MockitoAnnotations.initMocks(this); + + nactive = new AtomicInteger(0); + nsuccesses = new AtomicInteger(0); + + cleanDb(); + + when(engine.getExecutorService()).thenReturn(exsvc); + + feature = new MyLockingFeature(true); + } + + @After + public void tearDown() throws SQLException { + shutdownFeature(); + cleanDb(); + } + + private void cleanDb() throws SQLException { + try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) { + stmt.executeUpdate(); + } + } + + private void shutdownFeature() { + if (feature != null) { + feature.afterStop(engine); + feature = null; + } + } + + /** + * Tests that the feature is found in the expected service sets. + */ + @Test + public void testServiceApis() { + assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream() + .anyMatch(obj -> obj instanceof DistributedLockManager)); + } + + /** + * Tests constructor() when properties are invalid. + */ + @Test + public void testDistributedLockManagerInvalidProperties() { + // use properties containing an invalid value + Properties props = new Properties(); + props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "abc"); + + feature = new MyLockingFeature(false) { + @Override + protected Properties getProperties(String fileName) { + return props; + } + }; + + assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class); + } + + @Test + public void testGetSequenceNumber() { + assertEquals(1000, feature.getSequenceNumber()); + } + + @Test + public void testStartableApi() { + assertTrue(feature.isAlive()); + assertTrue(feature.start()); + assertTrue(feature.stop()); + feature.shutdown(); + + // above should have had no effect + assertTrue(feature.isAlive()); + + feature.afterStop(engine); + assertFalse(feature.isAlive()); + } + + @Test + public void testLockApi() { + assertFalse(feature.isLocked()); + assertTrue(feature.lock()); + assertTrue(feature.unlock()); + } + + @Test + public void testBeforeCreateLockManager() { + assertSame(feature, feature.beforeCreateLockManager(engine, new Properties())); + } + + /** + * Tests beforeCreate(), when getProperties() throws a runtime exception. + */ + @Test + public void testBeforeCreateLockManagerEx() { + shutdownFeature(); + + feature = new MyLockingFeature(false) { + @Override + protected Properties getProperties(String fileName) { + throw new IllegalArgumentException(EXPECTED_EXCEPTION); + } + }; + + assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, new Properties())) + .isInstanceOf(DistributedLockManagerException.class); + } + + @Test + public void testAfterStart() { + // verify that cleanup & expire check are both added to the queue + verify(exsvc).execute(any()); + verify(exsvc).schedule(any(Runnable.class), anyLong(), any()); + } + + /** + * Tests afterStart(), when thread pool throws a runtime exception. + */ + @Test + public void testAfterStartExInThreadPool() { + shutdownFeature(); + + feature = new MyLockingFeature(false); + + when(exsvc.schedule(any(Runnable.class), anyLong(), any())) + .thenThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)); + + assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class); + } + + @Test + public void testDeleteExpiredDbLocks() throws SQLException { + // add records: two expired, one not + insertRecord(RESOURCE, feature.getUuidString(), -1); + insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2); + insertRecord(RESOURCE3, OTHER_OWNER, 0); + insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC); + + // get the clean-up function and execute it + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc).execute(captor.capture()); + + long tbegin = System.currentTimeMillis(); + Runnable action = captor.getValue(); + action.run(); + + assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin)); + assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin)); + assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin)); + assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin)); + + assertEquals(2, getRecordCount()); + } + + /** + * Tests deleteExpiredDbLocks(), when getConnection() throws an exception. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDeleteExpiredDbLocksEx() throws SQLException { + feature = new InvalidDbLockingFeature(TRANSIENT); + + // get the clean-up function and execute it + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc).execute(captor.capture()); + + Runnable action = captor.getValue(); + + // should not throw an exception + action.run(); + } + + @Test + public void testAfterStop() { + shutdownFeature(); + + feature = new DistributedLockManager(); + + // shutdown without calling afterStart() + + shutdownFeature(); + } + + /** + * Tests afterStop(), when the data source throws an exception when close() is called. + * + * @throws SQLException if an error occurs + */ + @Test + public void testAfterStopEx() throws SQLException { + shutdownFeature(); + + // use a data source that throws an exception when closed + feature = new InvalidDbLockingFeature(TRANSIENT); + + shutdownFeature(); + } + + @Test + public void testCreateLock() throws SQLException { + verify(exsvc).execute(any()); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock.isWaiting()); + + verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any()); + + // this lock should fail + LockCallback callback2 = mock(LockCallback.class); + DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false); + assertTrue(lock2.isUnavailable()); + verify(callback2, never()).lockAvailable(lock2); + verify(callback2).lockUnavailable(lock2); + + // this should fail, too + LockCallback callback3 = mock(LockCallback.class); + DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false); + assertTrue(lock3.isUnavailable()); + verify(callback3, never()).lockAvailable(lock3); + verify(callback3).lockUnavailable(lock3); + + // no change to first + assertTrue(lock.isWaiting()); + + // no callbacks to the first lock + verify(callback, never()).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + assertTrue(lock.isWaiting()); + assertEquals(0, getRecordCount()); + + runLock(0, 0); + assertTrue(lock.isActive()); + assertEquals(1, getRecordCount()); + + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // this should succeed + DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock4.isWaiting()); + + // after running checker, original records should still remain + runChecker(0, 0, EXPIRE_SEC); + assertEquals(1, getRecordCount()); + verify(callback, never()).lockUnavailable(lock); + } + + /** + * Tests lock() when the feature is not the latest instance. + */ + @Test + public void testCreateLockNotLatestInstance() { + DistributedLockManager.setLatestInstance(null); + + Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock.isUnavailable()); + verify(callback, never()).lockAvailable(any()); + verify(callback).lockUnavailable(lock); + } + + @Test + public void testCheckExpired() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + LockCallback callback2 = mock(LockCallback.class); + final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false); + runLock(1, 0); + + LockCallback callback3 = mock(LockCallback.class); + final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false); + runLock(2, 0); + + LockCallback callback4 = mock(LockCallback.class); + final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false); + runLock(3, 0); + + LockCallback callback5 = mock(LockCallback.class); + final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false); + runLock(4, 0); + + assertEquals(5, getRecordCount()); + + // expire one record + updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1); + + // change host of another record + updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC); + + // change uuid of another record + updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC); + + + // run the checker + runChecker(0, 0, EXPIRE_SEC); + + + // check lock states + assertTrue(lock.isUnavailable()); + assertTrue(lock2.isActive()); + assertTrue(lock3.isUnavailable()); + assertTrue(lock4.isActive()); + assertTrue(lock5.isUnavailable()); + + // allow callbacks + runLock(5, 2); + runLock(6, 1); + runLock(7, 0); + verify(callback).lockUnavailable(lock); + verify(callback3).lockUnavailable(lock3); + verify(callback5).lockUnavailable(lock5); + + verify(callback2, never()).lockUnavailable(lock2); + verify(callback4, never()).lockUnavailable(lock4); + + + // another check should have been scheduled, with the normal interval + runChecker(1, 0, EXPIRE_SEC); + } + + /** + * Tests checkExpired(), when schedule() throws an exception. + */ + @Test + public void testCheckExpiredExecRejected() { + // arrange for execution to be rejected + when(exsvc.schedule(any(Runnable.class), anyLong(), any())) + .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION)); + + runChecker(0, 0, EXPIRE_SEC); + } + + /** + * Tests checkExpired(), when getConnection() throws an exception. + */ + @Test + public void testCheckExpiredSqlEx() { + // use a data source that throws an exception when getConnection() is called + feature = new InvalidDbLockingFeature(TRANSIENT); + + runChecker(0, 0, EXPIRE_SEC); + + // it should have scheduled another check, sooner + runChecker(0, 0, RETRY_SEC); + } + + @Test + public void testExpireLocks() throws SQLException { + AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null); + + feature = new MyLockingFeature(true) { + @Override + protected BasicDataSource makeDataSource() throws Exception { + // get the real data source + BasicDataSource src2 = super.makeDataSource(); + + when(datasrc.getConnection()).thenAnswer(answer -> { + DistributedLock lck = freeLock.getAndSet(null); + if (lck != null) { + // free it + lck.free(); + + // run its doUnlock + runLock(4, 0); + } + + return src2.getConnection(); + }); + + return datasrc; + } + }; + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + LockCallback callback2 = mock(LockCallback.class); + final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false); + runLock(1, 0); + + LockCallback callback3 = mock(LockCallback.class); + final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false); + // don't run doLock for lock3 - leave it in the waiting state + + LockCallback callback4 = mock(LockCallback.class); + final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false); + runLock(3, 0); + + assertEquals(3, getRecordCount()); + + // expire one record + updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1); + + // arrange to free lock4 while the checker is running + freeLock.set(lock4); + + // run the checker + runChecker(0, 0, EXPIRE_SEC); + + + // check lock states + assertTrue(lock.isUnavailable()); + assertTrue(lock2.isActive()); + assertTrue(lock3.isWaiting()); + assertTrue(lock4.isUnavailable()); + + runLock(5, 0); + verify(exsvc, times(PRE_LOCK_EXECS + 6)).execute(any()); + + verify(callback).lockUnavailable(lock); + verify(callback2, never()).lockUnavailable(lock2); + verify(callback3, never()).lockUnavailable(lock3); + verify(callback4, never()).lockUnavailable(lock4); + } + + @Test + public void testDistributedLockNoArgs() { + DistributedLock lock = new DistributedLock(); + assertNull(lock.getResourceId()); + assertNull(lock.getOwnerKey()); + assertNull(lock.getCallback()); + assertEquals(0, lock.getHoldSec()); + } + + @Test + public void testDistributedLock() { + assertThatIllegalArgumentException() + .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false)) + .withMessageContaining("holdSec"); + + // should generate no exception + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + } + + @Test + public void testDistributedLockSerializable() throws Exception { + DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + lock = roundTrip(lock); + + assertTrue(lock.isWaiting()); + + assertEquals(RESOURCE, lock.getResourceId()); + assertEquals(OWNER_KEY, lock.getOwnerKey()); + assertNull(lock.getCallback()); + assertEquals(HOLD_SEC, lock.getHoldSec()); + } + + @Test + public void testGrant() { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertFalse(lock.isActive()); + + // execute the doLock() call + runLock(0, 0); + + assertTrue(lock.isActive()); + + // the callback for the lock should have been run in the foreground thread + verify(callback).lockAvailable(lock); + } + + /** + * Tests grant() when the lock is already unavailable. + */ + @Test + public void testDistributedLockGrantUnavailable() { + DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + lock.setState(LockState.UNAVAILABLE); + lock.grant(); + + assertTrue(lock.isUnavailable()); + verify(callback, never()).lockAvailable(any()); + verify(callback, never()).lockUnavailable(any()); + } + + @Test + public void testDistributedLockDeny() { + // get a lock + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // get another lock - should fail + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + assertTrue(lock.isUnavailable()); + + // the callback for the second lock should have been run in the foreground thread + verify(callback).lockUnavailable(lock); + + // should only have a request for the first lock + verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any()); + } + + @Test + public void testDistributedLockFree() { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + assertTrue(lock.free()); + assertTrue(lock.isUnavailable()); + + // run both requests associated with the lock + runLock(0, 1); + runLock(1, 0); + + // should not have changed state + assertTrue(lock.isUnavailable()); + + // attempt to free it again + assertFalse(lock.free()); + + // should not have queued anything else + verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any()); + + // new lock should succeed + DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock2 != lock); + assertTrue(lock2.isWaiting()); + } + + /** + * Tests that free() works on a serialized lock with a new feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testDistributedLockFreeSerialized() throws Exception { + DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + feature = new MyLockingFeature(true); + + lock = roundTrip(lock); + assertTrue(lock.free()); + assertTrue(lock.isUnavailable()); + } + + /** + * Tests free() on a serialized lock without a feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testDistributedLockFreeNoFeature() throws Exception { + DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + DistributedLockManager.setLatestInstance(null); + + lock = roundTrip(lock); + assertFalse(lock.free()); + assertTrue(lock.isUnavailable()); + } + + /** + * Tests the case where the lock is freed and doUnlock called between the call to + * isUnavailable() and the call to compute(). + */ + @Test + public void testDistributedLockFreeUnlocked() { + feature = new FreeWithFreeLockingFeature(true); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + assertFalse(lock.free()); + assertTrue(lock.isUnavailable()); + } + + /** + * Tests the case where the lock is freed, but doUnlock is not completed, between the + * call to isUnavailable() and the call to compute(). + */ + @Test + public void testDistributedLockFreeLockFreed() { + feature = new FreeWithFreeLockingFeature(false); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + assertFalse(lock.free()); + assertTrue(lock.isUnavailable()); + } + + @Test + public void testDistributedLockExtend() { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // lock2 should be denied - called back by this thread + DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + verify(callback, never()).lockAvailable(lock2); + verify(callback).lockUnavailable(lock2); + + // lock2 will still be denied - called back by this thread + lock2.extend(HOLD_SEC, callback); + verify(callback, times(2)).lockUnavailable(lock2); + + // force lock2 to be active - should still be denied + Whitebox.setInternalState(lock2, "state", LockState.ACTIVE); + lock2.extend(HOLD_SEC, callback); + verify(callback, times(3)).lockUnavailable(lock2); + + assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback)) + .withMessageContaining("holdSec"); + + // execute doLock() + runLock(0, 0); + assertTrue(lock.isActive()); + + // now extend the first lock + LockCallback callback2 = mock(LockCallback.class); + lock.extend(HOLD_SEC2, callback2); + assertTrue(lock.isWaiting()); + + // execute doExtend() + runLock(1, 0); + lock.extend(HOLD_SEC2, callback2); + assertEquals(HOLD_SEC2, lock.getHoldSec()); + verify(callback2).lockAvailable(lock); + verify(callback2, never()).lockUnavailable(lock); + } + + /** + * Tests that extend() works on a serialized lock with a new feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testDistributedLockExtendSerialized() throws Exception { + DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // run doLock + runLock(0, 0); + assertTrue(lock.isActive()); + + feature = new MyLockingFeature(true); + + lock = roundTrip(lock); + assertTrue(lock.isActive()); + + LockCallback scallback = mock(LockCallback.class); + + lock.extend(HOLD_SEC, scallback); + assertTrue(lock.isWaiting()); + + // run doExtend (in new feature) + runLock(0, 0); + assertTrue(lock.isActive()); + + verify(scallback).lockAvailable(lock); + verify(scallback, never()).lockUnavailable(lock); + } + + /** + * Tests extend() on a serialized lock without a feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testDistributedLockExtendNoFeature() throws Exception { + DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // run doLock + runLock(0, 0); + assertTrue(lock.isActive()); + + DistributedLockManager.setLatestInstance(null); + + lock = roundTrip(lock); + assertTrue(lock.isActive()); + + LockCallback scallback = mock(LockCallback.class); + + lock.extend(HOLD_SEC, scallback); + assertTrue(lock.isUnavailable()); + + verify(scallback, never()).lockAvailable(lock); + verify(scallback).lockUnavailable(lock); + } + + /** + * Tests the case where the lock is freed and doUnlock called between the call to + * isUnavailable() and the call to compute(). + */ + @Test + public void testDistributedLockExtendUnlocked() { + feature = new FreeWithFreeLockingFeature(true); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + lock.extend(HOLD_SEC2, callback); + assertTrue(lock.isUnavailable()); + verify(callback).lockUnavailable(lock); + } + + /** + * Tests the case where the lock is freed, but doUnlock is not completed, between the + * call to isUnavailable() and the call to compute(). + */ + @Test + public void testDistributedLockExtendLockFreed() { + feature = new FreeWithFreeLockingFeature(false); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + lock.extend(HOLD_SEC2, callback); + assertTrue(lock.isUnavailable()); + verify(callback).lockUnavailable(lock); + } + + @Test + public void testDistributedLockScheduleRequest() { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + verify(callback).lockAvailable(lock); + } + + @Test + public void testDistributedLockRescheduleRequest() throws SQLException { + // use a data source that throws an exception when getConnection() is called + InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT); + feature = invfeat; + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should fail and reschedule + runLock(0, 0); + + // should still be waiting + assertTrue(lock.isWaiting()); + verify(callback, never()).lockUnavailable(lock); + + // free the lock while doLock is executing + invfeat.freeLock = true; + + // try scheduled request - should just invoke doUnlock + runSchedule(0, 0); + + // should still be waiting + assertTrue(lock.isUnavailable()); + verify(callback, never()).lockUnavailable(lock); + + // should have scheduled a retry of doUnlock + verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any()); + } + + @Test + public void testDistributedLockGetNextRequest() { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + /* + * run doLock. This should cause getNextRequest() to be called twice, once with a + * request in the queue, and the second time with request=null. + */ + runLock(0, 0); + } + + /** + * Tests getNextRequest(), where the same request is still in the queue the second + * time it's called. + */ + @Test + public void testDistributedLockGetNextRequestSameRequest() { + // force reschedule to be invoked + feature = new InvalidDbLockingFeature(TRANSIENT); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + /* + * run doLock. This should cause getNextRequest() to be called twice, once with a + * request in the queue, and the second time with the same request again. + */ + runLock(0, 0); + + verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any()); + } + + @Test + public void testDistributedLockDoRequest() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + assertTrue(lock.isWaiting()); + + // run doLock via doRequest + runLock(0, 0); + + assertTrue(lock.isActive()); + } + + /** + * Tests doRequest(), when doRequest() is already running within another thread. + */ + @Test + public void testDistributedLockDoRequestBusy() { + /* + * this feature will invoke a request in a background thread while it's being run + * in a foreground thread. + */ + AtomicBoolean running = new AtomicBoolean(false); + AtomicBoolean returned = new AtomicBoolean(false); + + feature = new MyLockingFeature(true) { + @Override + protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) { + private static final long serialVersionUID = 1L; + + @Override + protected boolean doDbInsert(Connection conn) throws SQLException { + if (running.get()) { + // already inside the thread - don't recurse any further + return super.doDbInsert(conn); + } + + running.set(true); + + Thread thread = new Thread(() -> { + // run doLock from within the new thread + runLock(0, 0); + }); + thread.setDaemon(true); + thread.start(); + + // wait for the background thread to complete before continuing + try { + thread.join(5000); + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + + returned.set(!thread.isAlive()); + + return super.doDbInsert(conn); + } + }; + } + }; + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // run doLock + runLock(0, 0); + + assertTrue(returned.get()); + } + + /** + * Tests doRequest() when an exception occurs while the lock is in the WAITING state. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoRequestRunExWaiting() throws SQLException { + // throw run-time exception + when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION)); + + // use a data source that throws an exception when getConnection() is called + feature = new MyLockingFeature(true) { + @Override + protected BasicDataSource makeDataSource() throws Exception { + return datasrc; + } + }; + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should NOT reschedule + runLock(0, 0); + + assertTrue(lock.isUnavailable()); + verify(callback).lockUnavailable(lock); + + verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any()); + } + + /** + * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE + * state. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoRequestRunExUnavailable() throws SQLException { + // throw run-time exception + when(datasrc.getConnection()).thenAnswer(answer -> { + lock.free(); + throw new IllegalStateException(EXPECTED_EXCEPTION); + }); + + // use a data source that throws an exception when getConnection() is called + feature = new MyLockingFeature(true) { + @Override + protected BasicDataSource makeDataSource() throws Exception { + return datasrc; + } + }; + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should NOT reschedule + runLock(0, 0); + + assertTrue(lock.isUnavailable()); + verify(callback, never()).lockUnavailable(lock); + + verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any()); + } + + /** + * Tests doRequest() when the retry count gets exhausted. + */ + @Test + public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() { + // use a data source that throws an exception when getConnection() is called + feature = new InvalidDbLockingFeature(TRANSIENT); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should fail and reschedule + runLock(0, 0); + + // should still be waiting + assertTrue(lock.isWaiting()); + verify(callback, never()).lockUnavailable(lock); + + // try again, via SCHEDULER - first retry fails + runSchedule(0, 0); + + // should still be waiting + assertTrue(lock.isWaiting()); + verify(callback, never()).lockUnavailable(lock); + + // try again, via SCHEDULER - final retry fails + runSchedule(1, 0); + assertTrue(lock.isUnavailable()); + + // now callback should have been called + verify(callback).lockUnavailable(lock); + } + + /** + * Tests doRequest() when a non-transient DB exception is thrown. + */ + @Test + public void testDistributedLockDoRequestNotTransient() { + /* + * use a data source that throws a PERMANENT exception when getConnection() is + * called + */ + feature = new InvalidDbLockingFeature(PERMANENT); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should fail + runLock(0, 0); + + assertTrue(lock.isUnavailable()); + verify(callback).lockUnavailable(lock); + + // should not have scheduled anything new + verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any()); + verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any()); + } + + @Test + public void testDistributedLockDoLock() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should simply do an insert + long tbegin = System.currentTimeMillis(); + runLock(0, 0); + + assertEquals(1, getRecordCount()); + assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin)); + verify(callback).lockAvailable(lock); + } + + /** + * Tests doLock() when the lock is freed before doLock runs. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoLockFreed() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + lock.setState(LockState.UNAVAILABLE); + + // invoke doLock - should do nothing + runLock(0, 0); + + assertEquals(0, getRecordCount()); + + verify(callback, never()).lockAvailable(lock); + } + + /** + * Tests doLock() when a DB exception is thrown. + */ + @Test + public void testDistributedLockDoLockEx() { + // use a data source that throws an exception when getConnection() is called + feature = new InvalidDbLockingFeature(PERMANENT); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should simply do an insert + runLock(0, 0); + + // lock should have failed due to exception + verify(callback).lockUnavailable(lock); + } + + /** + * Tests doLock() when an (expired) record already exists, thus requiring doUpdate() + * to be called. + */ + @Test + public void testDistributedLockDoLockNeedingUpdate() throws SQLException { + // insert an expired record + insertRecord(RESOURCE, feature.getUuidString(), 0); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should simply do an update + runLock(0, 0); + verify(callback).lockAvailable(lock); + } + + /** + * Tests doLock() when a locked record already exists. + */ + @Test + public void testDistributedLockDoLockAlreadyLocked() throws SQLException { + // insert an expired record + insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock + runLock(0, 0); + + // lock should have failed because it's already locked + verify(callback).lockUnavailable(lock); + } + + @Test + public void testDistributedLockDoUnlock() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock() + runLock(0, 0); + + lock.free(); + + // invoke doUnlock() + long tbegin = System.currentTimeMillis(); + runLock(1, 0); + + assertEquals(0, getRecordCount()); + assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin)); + + assertTrue(lock.isUnavailable()); + + // no more callbacks should have occurred + verify(callback, times(1)).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + } + + /** + * Tests doUnlock() when a DB exception is thrown. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoUnlockEx() throws SQLException { + feature = new InvalidDbLockingFeature(PERMANENT); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // do NOT invoke doLock() - it will fail without a DB connection + + lock.free(); + + // invoke doUnlock() + runLock(1, 0); + + assertTrue(lock.isUnavailable()); + + // no more callbacks should have occurred + verify(callback, never()).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + } + + @Test + public void testDistributedLockDoExtend() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + LockCallback callback2 = mock(LockCallback.class); + lock.extend(HOLD_SEC2, callback2); + + // call doExtend() + long tbegin = System.currentTimeMillis(); + runLock(1, 0); + + assertEquals(1, getRecordCount()); + assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin)); + + assertTrue(lock.isActive()); + + // no more callbacks should have occurred + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // extension should have succeeded + verify(callback2).lockAvailable(lock); + verify(callback2, never()).lockUnavailable(lock); + } + + /** + * Tests doExtend() when the lock is freed before doExtend runs. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoExtendFreed() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + lock.extend(HOLD_SEC2, callback); + + lock.setState(LockState.UNAVAILABLE); + + // invoke doExtend - should do nothing + runLock(1, 0); + + assertEquals(0, getRecordCount()); + + verify(callback, never()).lockAvailable(lock); + } + + /** + * Tests doExtend() when the lock record is missing from the DB, thus requiring an + * insert. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoExtendInsertNeeded() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + LockCallback callback2 = mock(LockCallback.class); + lock.extend(HOLD_SEC2, callback2); + + // delete the record so it's forced to re-insert it + cleanDb(); + + // call doExtend() + long tbegin = System.currentTimeMillis(); + runLock(1, 0); + + assertEquals(1, getRecordCount()); + assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin)); + + assertTrue(lock.isActive()); + + // no more callbacks should have occurred + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // extension should have succeeded + verify(callback2).lockAvailable(lock); + verify(callback2, never()).lockUnavailable(lock); + } + + /** + * Tests doExtend() when both update and insert fail. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException { + /* + * this feature will create a lock that returns false when doDbUpdate() is + * invoked, or when doDbInsert() is invoked a second time + */ + feature = new MyLockingFeature(true) { + @Override + protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) { + private static final long serialVersionUID = 1L; + private int ntimes = 0; + + @Override + protected boolean doDbInsert(Connection conn) throws SQLException { + if (ntimes++ > 0) { + return false; + } + + return super.doDbInsert(conn); + } + + @Override + protected boolean doDbUpdate(Connection conn) { + return false; + } + }; + } + }; + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + LockCallback callback2 = mock(LockCallback.class); + lock.extend(HOLD_SEC2, callback2); + + // call doExtend() + runLock(1, 0); + + assertTrue(lock.isUnavailable()); + + // no more callbacks should have occurred + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // extension should have failed + verify(callback2, never()).lockAvailable(lock); + verify(callback2).lockUnavailable(lock); + } + + /** + * Tests doExtend() when an exception occurs. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoExtendEx() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + /* + * delete the record and insert one with a different owner, which will cause + * doDbInsert() to throw an exception + */ + cleanDb(); + insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC); + + LockCallback callback2 = mock(LockCallback.class); + lock.extend(HOLD_SEC2, callback2); + + // call doExtend() + runLock(1, 0); + + assertTrue(lock.isUnavailable()); + + // no more callbacks should have occurred + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // extension should have failed + verify(callback2, never()).lockAvailable(lock); + verify(callback2).lockUnavailable(lock); + } + + @Test + public void testDistributedLockToString() { + String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString(); + assertNotNull(text); + assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback"); + } + + @Test + public void testMakeThreadPool() { + // use a REAL feature to test this + feature = new DistributedLockManager(); + + // this should create a thread pool + feature.beforeCreateLockManager(engine, new Properties()); + feature.afterStart(engine); + + shutdownFeature(); + } + + /** + * Performs a multi-threaded test of the locking facility. + * + * @throws InterruptedException if the current thread is interrupted while waiting for + * the background threads to complete + */ + @Test + public void testMultiThreaded() throws InterruptedException { + feature = new DistributedLockManager(); + feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties()); + feature.afterStart(PolicyEngineConstants.getManager()); + + List<MyThread> threads = new ArrayList<>(MAX_THREADS); + for (int x = 0; x < MAX_THREADS; ++x) { + threads.add(new MyThread()); + } + + threads.forEach(Thread::start); + + for (MyThread thread : threads) { + thread.join(6000); + assertFalse(thread.isAlive()); + } + + for (MyThread thread : threads) { + if (thread.err != null) { + throw thread.err; + } + } + + assertTrue(nsuccesses.get() > 0); + } + + private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback, + boolean waitForLock) { + return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock); + } + + private DistributedLock roundTrip(DistributedLock lock) throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(lock); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + try (ObjectInputStream ois = new ObjectInputStream(bais)) { + return (DistributedLock) ois.readObject(); + } + } + + /** + * Runs the checkExpired() action. + * + * @param nskip number of actions in the work queue to skip + * @param nadditional number of additional actions that appear in the work queue + * <i>after</i> the checkExpired action + * @param schedSec number of seconds for which the checker should have been scheduled + */ + private void runChecker(int nskip, int nadditional, long schedSec) { + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS)); + Runnable action = captor.getAllValues().get(nskip); + action.run(); + } + + /** + * Runs a lock action (e.g., doLock, doUnlock). + * + * @param nskip number of actions in the work queue to skip + * @param nadditional number of additional actions that appear in the work queue + * <i>after</i> the desired action + */ + void runLock(int nskip, int nadditional) { + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture()); + + Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip); + action.run(); + } + + /** + * Runs a scheduled action (e.g., "retry" action). + * + * @param nskip number of actions in the work queue to skip + * @param nadditional number of additional actions that appear in the work queue + * <i>after</i> the desired action + */ + void runSchedule(int nskip, int nadditional) { + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any()); + + Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip); + action.run(); + } + + /** + * Gets a count of the number of lock records in the DB. + * + * @return the number of lock records in the DB + * @throws SQLException if an error occurs accessing the DB + */ + private int getRecordCount() throws SQLException { + try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks"); + ResultSet result = stmt.executeQuery()) { + + if (result.next()) { + return result.getInt(1); + + } else { + return 0; + } + } + } + + /** + * Determines if there is a record for the given resource whose expiration time is in + * the expected range. + * + * @param resourceId ID of the resource of interest + * @param uuidString UUID string of the owner + * @param holdSec seconds for which the lock was to be held + * @param tbegin earliest time, in milliseconds, at which the record could have been + * inserted into the DB + * @return {@code true} if a record is found, {@code false} otherwise + * @throws SQLException if an error occurs accessing the DB + */ + private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException { + try (PreparedStatement stmt = + conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks" + + " WHERE resourceId=? AND host=? AND owner=?")) { + + stmt.setString(1, resourceId); + stmt.setString(2, feature.getHostName()); + stmt.setString(3, uuidString); + + try (ResultSet result = stmt.executeQuery()) { + if (result.next()) { + int remaining = result.getInt(1); + long maxDiff = System.currentTimeMillis() - tbegin; + return (remaining >= 0 && holdSec - remaining <= maxDiff); + + } else { + return false; + } + } + } + } + + /** + * Inserts a record into the DB. + * + * @param resourceId ID of the resource of interest + * @param uuidString UUID string of the owner + * @param expireOffset offset, in seconds, from "now", at which the lock should expire + * @throws SQLException if an error occurs accessing the DB + */ + private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException { + this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset); + } + + private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset) + throws SQLException { + try (PreparedStatement stmt = + conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) " + + "values (?, ?, ?, timestampadd(second, ?, now()))")) { + + stmt.setString(1, resourceId); + stmt.setString(2, hostName); + stmt.setString(3, uuidString); + stmt.setInt(4, expireOffset); + + assertEquals(1, stmt.executeUpdate()); + } + } + + /** + * Updates a record in the DB. + * + * @param resourceId ID of the resource of interest + * @param newUuid UUID string of the <i>new</i> owner + * @param expireOffset offset, in seconds, from "now", at which the lock should expire + * @throws SQLException if an error occurs accessing the DB + */ + private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException { + try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?," + + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) { + + stmt.setString(1, newHost); + stmt.setString(2, newUuid); + stmt.setInt(3, expireOffset); + stmt.setString(4, resourceId); + + assertEquals(1, stmt.executeUpdate()); + } + } + + /** + * Feature that uses <i>exsvc</i> to execute requests. + */ + private class MyLockingFeature extends DistributedLockManager { + + public MyLockingFeature(boolean init) { + shutdownFeature(); + + exsvc = mock(ScheduledExecutorService.class); + when(engine.getExecutorService()).thenReturn(exsvc); + + if (init) { + beforeCreateLockManager(engine, new Properties()); + afterStart(engine); + } + } + } + + /** + * Feature whose data source all throws exceptions. + */ + private class InvalidDbLockingFeature extends MyLockingFeature { + private int errcode; + private boolean freeLock = false; + + public InvalidDbLockingFeature(int errcode) { + // pass "false" because we have to set the error code BEFORE calling + // afterStart() + super(false); + + this.errcode = errcode; + + this.beforeCreateLockManager(engine, new Properties()); + this.afterStart(engine); + } + + @Override + protected BasicDataSource makeDataSource() throws Exception { + when(datasrc.getConnection()).thenAnswer(answer -> { + if (freeLock) { + freeLock = false; + lock.free(); + } + + throw new SQLException(EXPECTED_EXCEPTION, "", errcode); + }); + + doThrow(new SQLException(EXPECTED_EXCEPTION, "", errcode)).when(datasrc).close(); + + return datasrc; + } + } + + /** + * Feature whose locks free themselves while free() is already running. + */ + private class FreeWithFreeLockingFeature extends MyLockingFeature { + private boolean relock; + + public FreeWithFreeLockingFeature(boolean relock) { + super(true); + this.relock = relock; + } + + @Override + protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + + return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) { + private static final long serialVersionUID = 1L; + private boolean checked = false; + + @Override + public boolean isUnavailable() { + if (checked) { + return super.isUnavailable(); + } + + checked = true; + + // release and relock + free(); + + if (relock) { + // run doUnlock + runLock(1, 0); + + // relock it + createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false); + } + + return false; + } + }; + } + } + + /** + * Thread used with the multi-threaded test. It repeatedly attempts to get a lock, + * extend it, and then unlock it. + */ + private class MyThread extends Thread { + AssertionError err = null; + + public MyThread() { + setDaemon(true); + } + + @Override + public void run() { + try { + for (int x = 0; x < MAX_LOOPS; ++x) { + makeAttempt(); + } + + } catch (AssertionError e) { + err = e; + } + } + + private void makeAttempt() { + try { + Semaphore sem = new Semaphore(0); + + LockCallback cb = new LockCallback() { + @Override + public void lockAvailable(Lock lock) { + sem.release(); + } + + @Override + public void lockUnavailable(Lock lock) { + sem.release(); + } + }; + + Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false); + + // wait for callback, whether available or unavailable + assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS)); + if (!lock.isActive()) { + return; + } + + nsuccesses.incrementAndGet(); + + assertEquals(1, nactive.incrementAndGet()); + + lock.extend(HOLD_SEC2, cb); + assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS)); + assertTrue(lock.isActive()); + + // decrement BEFORE free() + nactive.decrementAndGet(); + + assertTrue(lock.free()); + assertTrue(lock.isUnavailable()); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("interrupted", e); + } + } + } +} diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java new file mode 100644 index 00000000..5f76d657 --- /dev/null +++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java @@ -0,0 +1,81 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distributed.locking; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Properties; +import java.util.TreeSet; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.utils.properties.exception.PropertyException; + +public class DistributedLockPropertiesTest { + + private Properties props; + + /** + * Populates {@link #props}. + */ + @Before + public void setUp() { + props = new Properties(); + + props.setProperty(DistributedLockProperties.DB_DRIVER, "my driver"); + props.setProperty(DistributedLockProperties.DB_URL, "my url"); + props.setProperty(DistributedLockProperties.DB_USER, "my user"); + props.setProperty(DistributedLockProperties.DB_PASS, "my pass"); + props.setProperty(DistributedLockProperties.TRANSIENT_ERROR_CODES, "10,-20,,,30"); + props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "100"); + props.setProperty(DistributedLockProperties.RETRY_SEC, "200"); + props.setProperty(DistributedLockProperties.MAX_RETRIES, "300"); + } + + @Test + public void test() throws PropertyException { + DistributedLockProperties dlp = new DistributedLockProperties(props); + + assertEquals("my driver", dlp.getDbDriver()); + assertEquals("my url", dlp.getDbUrl()); + assertEquals("my user", dlp.getDbUser()); + assertEquals("my pass", dlp.getDbPwd()); + assertEquals("10,-20,,,30", dlp.getErrorCodeStrings()); + assertEquals("[-20, 10, 30]", new TreeSet<>(dlp.getTransientErrorCodes()).toString()); + assertEquals(100, dlp.getExpireCheckSec()); + assertEquals(200, dlp.getRetrySec()); + assertEquals(300, dlp.getMaxRetries()); + + assertTrue(dlp.isTransient(10)); + assertTrue(dlp.isTransient(-20)); + assertTrue(dlp.isTransient(30)); + + assertFalse(dlp.isTransient(-10)); + + // invalid value + props.setProperty(DistributedLockProperties.TRANSIENT_ERROR_CODES, "10,abc,30"); + + assertThatThrownBy(() -> new DistributedLockProperties(props)).isInstanceOf(PropertyException.class) + .hasMessageContaining(DistributedLockProperties.TRANSIENT_ERROR_CODES); + } +} diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java deleted file mode 100644 index 68a5a31b..00000000 --- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.distributed.locking; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.sql.SQLException; -import org.apache.commons.dbcp2.BasicDataSource; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.onap.policy.common.utils.properties.exception.PropertyException; -import org.onap.policy.drools.persistence.SystemPersistenceConstants; - -/** - * Partially tests DistributedLockingFeature; most of the methods are tested via - * {@link TargetLockTest}. - */ -public class DistributedLockingFeatureTest { - private static final String EXPECTED = "expected exception"; - - private BasicDataSource dataSrc; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources"); - } - - @Before - public void setUp() throws Exception { - dataSrc = mock(BasicDataSource.class); - } - - @Test - public void testGetSequenceNumber() { - assertEquals(1000, new DistributedLockingFeature().getSequenceNumber()); - } - - @Test(expected = DistributedLockingFeatureException.class) - public void testAfterStart_PropEx() { - new DistributedLockingFeatureImpl(new PropertyException("prop", "val")).afterStart(null); - } - - @Test(expected = DistributedLockingFeatureException.class) - public void testAfterStart_InterruptEx() { - new DistributedLockingFeatureImpl(new InterruptedException(EXPECTED)).afterStart(null); - } - - @Test(expected = DistributedLockingFeatureException.class) - public void testAfterStart_OtherEx() { - new DistributedLockingFeatureImpl(new RuntimeException(EXPECTED)).afterStart(null); - } - - @Test - public void testCleanLockTable() throws Exception { - when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED)); - - new DistributedLockingFeatureImpl().afterStart(null); - } - - /** - * Feature that overrides {@link #makeDataSource()}. - */ - private class DistributedLockingFeatureImpl extends DistributedLockingFeature { - /** - * Exception to throw when {@link #makeDataSource()} is invoked. - */ - private final Exception makeEx; - - public DistributedLockingFeatureImpl() { - makeEx = null; - } - - public DistributedLockingFeatureImpl(Exception ex) { - this.makeEx = ex; - } - - @Override - protected BasicDataSource makeDataSource() throws Exception { - if (makeEx != null) { - throw makeEx; - } - - return dataSrc; - } - } -} diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java deleted file mode 100644 index 84eba6b6..00000000 --- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java +++ /dev/null @@ -1,345 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * feature-distributed-locking - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.distributed.locking; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import org.apache.commons.dbcp2.BasicDataSource; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi.OperResult; -import org.onap.policy.drools.persistence.SystemPersistenceConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TargetLockTest { - private static final Logger logger = LoggerFactory.getLogger(TargetLockTest.class); - private static final int MAX_AGE_SEC = 4 * 60; - private static final String DB_CONNECTION = - "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling"; - private static final String DB_USER = "user"; - private static final String DB_PASSWORD = "password"; - private static final String EXPECTED = "expected exception"; - private static final String MY_RESOURCE = "my-resource-id"; - private static final String MY_OWNER = "my-owner"; - private static final UUID MY_UUID = UUID.randomUUID(); - private static Connection conn = null; - private static DistributedLockingFeature distLockFeat; - - /** - * Setup the database. - */ - @BeforeClass - public static void setup() { - getDbConnection(); - createTable(); - SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources"); - distLockFeat = new DistributedLockingFeature(); - distLockFeat.afterStart(null); - } - - /** - * Cleanup the lock. - */ - @AfterClass - public static void cleanUp() { - distLockFeat.beforeShutdown(null); - try { - conn.close(); - } catch (SQLException e) { - logger.error("Error in TargetLockTest.cleanUp()", e); - } - } - - /** - * Wipe the database. - */ - @Before - public void wipeDb() { - try (PreparedStatement lockDelete = conn.prepareStatement("DELETE FROM pooling.locks"); ) { - lockDelete.executeUpdate(); - } catch (SQLException e) { - logger.error("Error in TargetLockTest.wipeDb()", e); - throw new RuntimeException(e); - } - } - - @Test - public void testGrabLockSuccess() throws InterruptedException, ExecutionException { - assertEquals( - OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC)); - - // attempt to grab expiredLock - try (PreparedStatement updateStatement = - conn.prepareStatement( - "UPDATE pooling.locks SET expirationTime = timestampadd(second, -1, now()) WHERE resourceId = ?"); ) { - updateStatement.setString(1, "resource1"); - updateStatement.executeUpdate(); - - } catch (SQLException e) { - logger.error("Error in TargetLockTest.testGrabLockSuccess()", e); - throw new RuntimeException(e); - } - - assertEquals( - OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC)); - - // cannot re-lock - assertEquals( - OperResult.OPER_DENIED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC)); - - assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - } - - @Test - public void testExpiredLocks() throws Exception { - - // grab lock - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - - // force lock to expire - try (PreparedStatement lockExpire = - conn.prepareStatement( - "UPDATE pooling.locks SET expirationTime = timestampadd(second, -?, now())"); ) { - lockExpire.setInt(1, MAX_AGE_SEC + 1); - lockExpire.executeUpdate(); - } - - assertEquals( - OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner2", MAX_AGE_SEC)); - } - - @Test - public void testGrabLockFail() throws InterruptedException, ExecutionException { - - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - - assertEquals( - OperResult.OPER_DENIED, distLockFeat.beforeLock("resource1", "owner2", MAX_AGE_SEC)); - } - - @Test - public void testSecondGrab_UpdateOk() throws Exception { - PreparedStatement grabLockInsert = mock(PreparedStatement.class); - when(grabLockInsert.executeUpdate()).thenThrow(new SQLException(EXPECTED)); - - PreparedStatement secondGrabUpdate = mock(PreparedStatement.class); - when(secondGrabUpdate.executeUpdate()).thenReturn(1); - - Connection connMock = mock(Connection.class); - when(connMock.prepareStatement(anyString())).thenReturn(grabLockInsert, secondGrabUpdate); - - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenReturn(connMock); - - assertTrue(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).lock(MAX_AGE_SEC)); - } - - @Test - public void testSecondGrab_UpdateFail_InsertOk() throws Exception { - PreparedStatement grabLockInsert = mock(PreparedStatement.class); - when(grabLockInsert.executeUpdate()).thenThrow(new SQLException(EXPECTED)); - - PreparedStatement secondGrabUpdate = mock(PreparedStatement.class); - when(secondGrabUpdate.executeUpdate()).thenReturn(0); - - PreparedStatement secondGrabInsert = mock(PreparedStatement.class); - when(secondGrabInsert.executeUpdate()).thenReturn(1); - - Connection connMock = mock(Connection.class); - when(connMock.prepareStatement(anyString())).thenReturn(grabLockInsert, secondGrabUpdate, secondGrabInsert); - - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenReturn(connMock); - - assertTrue(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).lock(MAX_AGE_SEC)); - } - - @Test - public void testSecondGrab_UpdateFail_InsertFail() throws Exception { - PreparedStatement grabLockInsert = mock(PreparedStatement.class); - when(grabLockInsert.executeUpdate()).thenThrow(new SQLException(EXPECTED)); - - PreparedStatement secondGrabUpdate = mock(PreparedStatement.class); - when(secondGrabUpdate.executeUpdate()).thenReturn(0); - - PreparedStatement secondGrabInsert = mock(PreparedStatement.class); - when(secondGrabInsert.executeUpdate()).thenReturn(0); - - Connection connMock = mock(Connection.class); - when(connMock.prepareStatement(anyString())).thenReturn(grabLockInsert, secondGrabUpdate, secondGrabInsert); - - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenReturn(connMock); - - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).lock(MAX_AGE_SEC)); - } - - @Test - public void testUpdateLock() throws Exception { - // not locked yet - refresh should fail - assertEquals( - OperResult.OPER_DENIED, distLockFeat.beforeRefresh("resource1", "owner1", MAX_AGE_SEC)); - - // now lock it - assertEquals( - OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC)); - - // refresh should work now - assertEquals( - OperResult.OPER_ACCEPTED, distLockFeat.beforeRefresh("resource1", "owner1", MAX_AGE_SEC)); - - assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - - // expire the lock - try (PreparedStatement updateStatement = - conn.prepareStatement( - "UPDATE pooling.locks SET expirationTime = timestampadd(second, -1, now()) WHERE resourceId = ?"); ) { - updateStatement.setString(1, "resource1"); - updateStatement.executeUpdate(); - } - - // refresh should fail now - assertEquals( - OperResult.OPER_DENIED, distLockFeat.beforeRefresh("resource1", "owner1", MAX_AGE_SEC)); - - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - - // test exception case - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED)); - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).refresh(MAX_AGE_SEC)); - } - - @Test - public void testUnlock() throws Exception { - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - - assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeUnlock("resource1", "owner1")); - assertEquals( - OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner2", MAX_AGE_SEC)); - - // test exception case - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED)); - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).unlock()); - } - - @Test - public void testIsActive() throws Exception { - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner2")); - - // isActive on expiredLock - try (PreparedStatement updateStatement = - conn.prepareStatement( - "UPDATE pooling.locks SET expirationTime = timestampadd(second, -5, now()) WHERE resourceId = ?"); ) { - updateStatement.setString(1, "resource1"); - updateStatement.executeUpdate(); - } - - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - // Unlock record, next isActive attempt should fail - distLockFeat.beforeUnlock("resource1", "owner1"); - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - - // test exception case for outer "try" - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED)); - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isActive()); - - // test exception case for inner "try" - PreparedStatement stmt = mock(PreparedStatement.class); - when(stmt.executeQuery()).thenThrow(new SQLException(EXPECTED)); - Connection connMock = mock(Connection.class); - when(connMock.prepareStatement(anyString())).thenReturn(stmt); - dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenReturn(connMock); - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isActive()); - } - - @Test - public void unlockBeforeLock() { - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeUnlock("resource1", "owner1")); - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeUnlock("resource1", "owner1")); - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeUnlock("resource1", "owner1")); - } - - @Test - public void testIsLocked() throws Exception { - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLocked("resource1")); - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLocked("resource1")); - - // test exception case for outer "try" - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED)); - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isLocked()); - - // test exception case for inner "try" - PreparedStatement stmt = mock(PreparedStatement.class); - when(stmt.executeQuery()).thenThrow(new SQLException(EXPECTED)); - Connection connMock = mock(Connection.class); - when(connMock.prepareStatement(anyString())).thenReturn(stmt); - dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenReturn(connMock); - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isLocked()); - } - - private static void getDbConnection() { - try { - conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD); - } catch (SQLException e) { - logger.error("Error in TargetLockTest.getDBConnection()", e); - } - } - - private static void createTable() { - String createString = - "create table if not exists pooling.locks " - + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), " - + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))"; - try (PreparedStatement createStmt = conn.prepareStatement(createString); ) { - createStmt.executeUpdate(); - - } catch (SQLException e) { - logger.error("Error in TargetLockTest.createTable()", e); - } - } -} diff --git a/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties b/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties index d1a07e82..061cc608 100644 --- a/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties +++ b/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties @@ -2,14 +2,14 @@ # ============LICENSE_START======================================================= # feature-distributed-locking # ================================================================================ -# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. +# Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,5 +22,8 @@ javax.persistence.jdbc.driver=org.h2.Driver javax.persistence.jdbc.url=jdbc:h2:mem:pooling javax.persistence.jdbc.user=user javax.persistence.jdbc.password=password -distributed.locking.lock.aging=150 -distributed.locking.heartbeat.interval=500
\ No newline at end of file + +distributed.locking.transient.error.codes=500 +distributed.locking.expire.check.seconds=900 +distributed.locking.retry.seconds=60 +distributed.locking.max.retries=2 diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/AlwaysFailLock.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/AlwaysFailLock.java new file mode 100644 index 00000000..0a4d327b --- /dev/null +++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/AlwaysFailLock.java @@ -0,0 +1,71 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.core.lock; + + +/** + * Lock implementation whose operations always fail. + */ +public class AlwaysFailLock extends LockImpl { + private static final long serialVersionUID = 1L; + + /** + * Constructs the object. + */ + public AlwaysFailLock() { + super(); + } + + /** + * Constructs the object. + * + * @param resourceId identifier of the resource to be locked + * @param ownerKey information identifying the owner requesting the lock + * @param holdSec amount of time, in seconds, for which the lock should be held once + * it has been granted, after which it will automatically be released + * @param callback callback to be invoked once the lock is granted, or subsequently + * lost; must not be {@code null} + */ + public AlwaysFailLock(String resourceId, String ownerKey, int holdSec, LockCallback callback) { + super(LockState.UNAVAILABLE, resourceId, ownerKey, holdSec, callback); + } + + /** + * Always returns false. + */ + @Override + public boolean free() { + return false; + } + + /** + * Always fails and invokes {@link LockCallback#lockUnavailable(Lock)}. + */ + @Override + public void extend(int holdSec, LockCallback callback) { + synchronized (this) { + setHoldSec(holdSec); + setCallback(callback); + } + + notifyUnavailable(); + } +} diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/Lock.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/Lock.java index de62b24a..b2ed9c7f 100644 --- a/policy-core/src/main/java/org/onap/policy/drools/core/lock/Lock.java +++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/Lock.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,145 +20,67 @@ package org.onap.policy.drools.core.lock; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.Map.Entry; -import org.onap.policy.drools.utils.Pair; - /** - * Lock that is held for a resource. This not only identifies the current owner of the - * lock, but it also includes a queue of requesters. An item is associated with each - * requester that is waiting in the queue. Note: this class is <b>not</b> thread-safe. - * - * @param <T> type of item to be associated with a request + * Lock held on a resource. */ -public class Lock<T> { +public interface Lock { /** - * Result returned by <i>removeRequester()</i>. + * Frees/release the lock. + * + * <p/> + * Note: client code may choose to invoke this method <i>before</i> the lock has been + * granted. + * + * @return {@code true} if the request was accepted, {@code false} if the lock is + * unavailable */ - public enum RemoveResult { - /** - * The requester was the owner of the lock, and the lock is no longer needed, - * because there were no other requesters waiting to get the lock. - */ - UNLOCKED, - - /** - * The requester was the owner of the lock, and has been replaced with the next - * requester waiting in the queue. - */ - RELOCKED, - - /** - * The requester had been waiting in the queue, and has now been removed. - */ - REMOVED, - - /** - * The requester was not the owner, nor was it waiting in the queue. - */ - NOT_FOUND - } + boolean free(); /** - * The last owner to grab the lock, never {@code null}. + * Determines if the lock is active. + * + * @return {@code true} if the lock is <b>ACTIVE</b>, {@code false} otherwise */ - private String owner; + boolean isActive(); /** - * Requesters waiting to get the lock. Maps the requester (i.e., owner for which the - * request is being made) to its associated item. Uses a Linked map so that the order - * of the requesters is maintained. We don't expect many requesters for any given - * lock, thus we'll start with a small hash size. + * Determines if the lock is unavailable. Once a lock object becomes unavailable, it + * will never become active again. + * + * @return {@code true} if the lock is <b>UNAVAILABLE</b>, {@code false} otherwise */ - private LinkedHashMap<String, T> requester2item = new LinkedHashMap<>(5); + boolean isUnavailable(); /** - * Constructor. - * - * @param owner the current owner of this lock + * Determines if this object is waiting for a lock to be granted or denied. This + * applies when the lock is first created, or after {@link #extend(int, LockCallback)} + * has been invoked. + * + * @return {@code true} if the lock is <b>WAITING</b>, {@code false} otherwise */ - public Lock(String owner) { - this.owner = owner; - } + boolean isWaiting(); /** - * Get owner. - * - * @return the current owner of the lock, or the last owner of the lock, if the lock - * is not currently owned. (This will never be {@code null}.) + * Gets the ID of the resource to which the lock applies. + * + * @return the ID of the resource to which the lock applies */ - public String getOwner() { - return owner; - } + String getResourceId(); /** - * Adds a new requester to the queue of requesters. - * - * @param requester the requester - * @param item to be associated with the requester, must not be {@code null} - * @return {@code true} if the requester was added, {@code false} if it already owns - * the lock or is already in the queue - * @throws IllegalArgumentException if the item is null + * Gets the lock's owner key. + * + * @return the lock's owner key */ - public boolean add(String requester, T item) { - if (item == null) { - throw SimpleLockManager.makeNullArgException("lock requester item is null"); - } - - if (requester.equals(owner)) { - // requester already owns the lock - return false; - } - - T prev = requester2item.putIfAbsent(requester, item); - - // if there's a previous value, then that means this requester is already - // waiting for a lock on this resource. In that case, we return false - return (prev == null); - } + String getOwnerKey(); /** - * Removes a requester from the lock. The requester may currently own the lock, or it - * may be in the queue waiting for the lock. Note: as this is agnostic to the type of - * item associated with the requester, it is unable to notify the new owner that it's - * the new owner; that is left up to the code that invokes this method. - * - * @param requester the requester - * @param newOwner the new owner info is placed here, if the result is <i>RELOCKED</i> - * @return the result + * Extends a lock an additional amount of time from now. The callback will always be + * invoked, and may be invoked <i>before</i> this method returns. + * + * @param holdSec the additional amount of time to hold the lock, in seconds + * @param callback callback to be invoked when the extension completes */ - public RemoveResult removeRequester(String requester, Pair<String, T> newOwner) { - - if (!requester.equals(owner)) { - // requester does not currently own the lock - remove it from the - // queue - T ent = requester2item.remove(requester); - - // if there was an entry in the queue, then return true to indicate - // that it was removed. Otherwise, return false - return (ent != null ? RemoveResult.REMOVED : RemoveResult.NOT_FOUND); - } - - /* - * requester was the owner - find something to take over - */ - Iterator<Entry<String, T>> it = requester2item.entrySet().iterator(); - if (!it.hasNext()) { - // no one to take over the lock - it's now unlocked - return RemoveResult.UNLOCKED; - } - - // there's another requester to take over - Entry<String, T> ent = it.next(); - it.remove(); - - owner = ent.getKey(); - - newOwner.first(owner); - newOwner.second(ent.getValue()); - - return RemoveResult.RELOCKED; - } + void extend(int holdSec, LockCallback callback); } diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiConstants.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockCallback.java index 8510e3d9..fae1cb43 100644 --- a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiConstants.java +++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockCallback.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * api-resource-locks + * ONAP * ================================================================================ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ @@ -20,20 +20,25 @@ package org.onap.policy.drools.core.lock; -import lombok.Getter; -import org.onap.policy.common.utils.services.OrderedServiceImpl; - -public class PolicyResourceLockFeatureApiConstants { +/** + * Callback invoked when a lock is granted or lost. + * + * <p/> + * Note: these methods may or may not be invoked by the thread that requested the lock. + */ +public interface LockCallback { /** - * 'FeatureAPI.impl.getList()' returns an ordered list of objects implementing the - * 'FeatureAPI' interface. + * Called to indicate that a lock has been granted. + * + * @param lock lock that has been granted */ - @Getter - private static final OrderedServiceImpl<PolicyResourceLockFeatureApi> impl = - new OrderedServiceImpl<>(PolicyResourceLockFeatureApi.class); + void lockAvailable(Lock lock); - private PolicyResourceLockFeatureApiConstants() { - // do nothing - } + /** + * Called to indicate that a lock is permanently unavailable (e.g., lost, expired). + * + * @param lock lock that has been lost + */ + void lockUnavailable(Lock lock); } diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockImpl.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockImpl.java new file mode 100644 index 00000000..9596dbe8 --- /dev/null +++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockImpl.java @@ -0,0 +1,158 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.core.lock; + +import java.io.Serializable; +import lombok.Getter; +import lombok.NonNull; +import lombok.Setter; +import lombok.ToString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Lock implementation. + */ +@Getter +@Setter +@ToString(exclude = {"callback"}) +public class LockImpl implements Lock, Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger logger = LoggerFactory.getLogger(LockImpl.class); + + private LockState state; + private final String resourceId; + private final String ownerKey; + private transient LockCallback callback; + private int holdSec; + + /** + * Constructs the object. + */ + public LockImpl() { + this.state = LockState.UNAVAILABLE; + this.resourceId = null; + this.ownerKey = null; + this.callback = null; + this.holdSec = 0; + } + + /** + * Constructs the object. + * + * @param state the initial lock state + * @param resourceId identifier of the resource to be locked + * @param ownerKey information identifying the owner requesting the lock + * @param holdSec amount of time, in seconds, for which the lock should be held once + * it has been granted, after which it will automatically be released + * @param callback callback to be invoked once the lock is granted, or subsequently + * lost; must not be {@code null} + */ + public LockImpl(@NonNull LockState state, @NonNull String resourceId, @NonNull String ownerKey, int holdSec, + @NonNull LockCallback callback) { + + if (holdSec < 0) { + throw new IllegalArgumentException("holdSec is negative"); + } + + this.state = state; + this.resourceId = resourceId; + this.ownerKey = ownerKey; + this.callback = callback; + this.holdSec = holdSec; + } + + @Override + public boolean isActive() { + return (getState() == LockState.ACTIVE); + } + + @Override + public boolean isUnavailable() { + return (getState() == LockState.UNAVAILABLE); + } + + @Override + public boolean isWaiting() { + return (getState() == LockState.WAITING); + } + + /** + * This method always succeeds, unless the lock is already unavailable. + */ + @Override + public synchronized boolean free() { + if (isUnavailable()) { + return false; + } + + logger.info("releasing lock: {}", this); + setState(LockState.UNAVAILABLE); + + return true; + } + + /** + * This method always succeeds, unless the lock is already unavailable. + */ + @Override + public void extend(int holdSec, LockCallback callback) { + synchronized (this) { + if (isUnavailable()) { + return; + } + + logger.info("lock granted: {}", this); + setState(LockState.ACTIVE); + setHoldSec(holdSec); + setCallback(callback); + } + + notifyAvailable(); + } + + /** + * Invokes the {@link LockCallback#lockAvailable(Lock)}, <i>from the current + * thread</i>. Note: subclasses may choose to invoke the callback from other threads. + */ + public void notifyAvailable() { + try { + callback.lockAvailable(this); + + } catch (RuntimeException e) { + logger.warn("lock callback threw an exception", e); + } + } + + /** + * Invokes the {@link LockCallback#lockUnavailable(Lock)}, <i>from the current + * thread</i>. Note: subclasses may choose to invoke the callback from other threads. + */ + public void notifyUnavailable() { + try { + callback.lockUnavailable(this); + + } catch (RuntimeException e) { + logger.warn("lock callback threw an exception", e); + } + } +} diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockState.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockState.java new file mode 100644 index 00000000..41699ce6 --- /dev/null +++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockState.java @@ -0,0 +1,43 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.core.lock; + +/** + * States of a Lock. + */ + +public enum LockState { + + /** + * Waiting for the lock request to complete. + */ + WAITING, + + /** + * This lock currently holds the resource. + */ + ACTIVE, + + /** + * The resource is no longer available to the lock. + */ + UNAVAILABLE +} diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApi.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApi.java deleted file mode 100644 index b7968486..00000000 --- a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApi.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * api-resource-locks - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.core.lock; - -import org.onap.policy.common.utils.services.OrderedService; - -/** - * Resource locks. Each lock has an "owner", which is intended to be unique across a - * single instance of a running PolicyEngine. - * - * <p>This interface provides a way to invoke optional features at various points in the - * code. At appropriate points in the application, the code iterates through this list, - * invoking these optional methods. - * - * <p>Implementers may choose to implement a level of locking appropriate to the application. - * For instance, they may choose to implement an engine-wide locking scheme, or they may - * choose to implement a global locking scheme (e.g., through a shared DB). - */ -public interface PolicyResourceLockFeatureApi extends OrderedService { - - /** - * Result of a requested operation. - */ - public enum OperResult { - - /** - * The implementer accepted the request; no additional locking logic should be - * performed. - */ - OPER_ACCEPTED, - - /** - * The implementer denied the request; no additional locking logic should be - * performed. - */ - OPER_DENIED, - - - /** - * The implementer did not handle the request; additional locking logic <i>should - * be</i> performed. - */ - OPER_UNHANDLED - } - - /** - * This method is called before a lock is acquired on a resource. - * - * @param resourceId resource id - * @param owner owner - * @param holdSec the amount of time, in seconds, that the lock should be held - * @return the result, where <b>OPER_DENIED</b> indicates that the lock is currently - * held by another owner - */ - public default OperResult beforeLock(String resourceId, String owner, int holdSec) { - return OperResult.OPER_UNHANDLED; - } - - /** - * This method is called after a lock for a resource has been acquired or denied. - * - * @param resourceId resource id - * @param owner owner - * @param locked {@code true} if the lock was acquired, {@code false} if it was denied - * @return {@code true} if the implementer handled the request, {@code false} - * otherwise - */ - public default boolean afterLock(String resourceId, String owner, boolean locked) { - return false; - } - - /** - * This method is called before a lock is refreshed on a resource. It may be invoked - * repeatedly to extend the time that a lock is held. - * - * @param resourceId resource id - * @param owner owner - * @param holdSec the amount of time, in seconds, that the lock should be held - * @return the result, where <b>OPER_DENIED</b> indicates that the resource is not - * currently locked by the given owner - */ - public default OperResult beforeRefresh(String resourceId, String owner, int holdSec) { - return OperResult.OPER_UNHANDLED; - } - - /** - * This method is called after a lock for a resource has been refreshed (or after the - * refresh has been denied). - * - * @param resourceId resource id - * @param owner owner - * @param locked {@code true} if the lock was acquired, {@code false} if it was denied - * @return {@code true} if the implementer handled the request, {@code false} - * otherwise - */ - public default boolean afterRefresh(String resourceId, String owner, boolean locked) { - return false; - } - - /** - * This method is called before a lock on a resource is released. - * - * @param resourceId resource id - * @param owner owner - * @return the result, where <b>OPER_DENIED</b> indicates that the lock is not - * currently held by the given owner - */ - public default OperResult beforeUnlock(String resourceId, String owner) { - return OperResult.OPER_UNHANDLED; - } - - /** - * This method is called after a lock on a resource is released. - * - * @param resourceId resource id - * @param owner owner - * @param unlocked {@code true} if the lock was released, {@code false} if the owner - * did not have a lock on the resource - * @return {@code true} if the implementer handled the request, {@code false} - * otherwise - */ - public default boolean afterUnlock(String resourceId, String owner, boolean unlocked) { - return false; - } - - /** - * This method is called before a check is made to determine if a resource is locked. - * - * @param resourceId resource id - * @return the result, where <b>OPER_ACCEPTED</b> indicates that the resource is - * locked, while <b>OPER_DENIED</b> indicates that it is not - */ - public default OperResult beforeIsLocked(String resourceId) { - return OperResult.OPER_UNHANDLED; - } - - /** - * This method is called before a check is made to determine if a particular owner - * holds the lock on a resource. - * - * @param resourceId resource id - * @param owner owner - * @return the result, where <b>OPER_ACCEPTED</b> indicates that the resource is - * locked by the given owner, while <b>OPER_DENIED</b> indicates that it is - * not - */ - public default OperResult beforeIsLockedBy(String resourceId, String owner) { - return OperResult.OPER_UNHANDLED; - } -} diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockManager.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockManager.java index 0e73eac1..bbf7d229 100644 --- a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockManager.java +++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockManager.java @@ -20,213 +20,36 @@ package org.onap.policy.drools.core.lock; -import java.util.List; -import java.util.function.Function; -import java.util.function.Supplier; -import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi.OperResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.onap.policy.common.capabilities.Lockable; +import org.onap.policy.common.capabilities.Startable; /** - * Manager of resource locks. Checks for API implementers. + * Manager of resource locks. */ -public class PolicyResourceLockManager extends SimpleLockManager { - - private static Logger logger = LoggerFactory.getLogger(PolicyResourceLockManager.class); - - /** - * Used by junit tests. - */ - protected PolicyResourceLockManager() { - super(); - } - - /** - * Get instance. - * - * @return the manager singleton - */ - public static PolicyResourceLockManager getInstance() { - return Singleton.instance; - } - - @Override - public boolean lock(String resourceId, String owner, int holdSec) { - if (resourceId == null) { - throw makeNullArgException(MSG_NULL_RESOURCE_ID); - } - - if (owner == null) { - throw makeNullArgException(MSG_NULL_OWNER); - } - - - return doBoolIntercept(impl -> impl.beforeLock(resourceId, owner, holdSec), () -> { - - // implementer didn't do the work - defer to the superclass - boolean locked = super.lock(resourceId, owner, holdSec); - - doIntercept(false, impl -> impl.afterLock(resourceId, owner, locked)); - - return locked; - }); - } - - @Override - public boolean refresh(String resourceId, String owner, int holdSec) { - if (resourceId == null) { - throw makeNullArgException(MSG_NULL_RESOURCE_ID); - } - - if (owner == null) { - throw makeNullArgException(MSG_NULL_OWNER); - } - - - return doBoolIntercept(impl -> impl.beforeRefresh(resourceId, owner, holdSec), () -> { - - // implementer didn't do the work - defer to the superclass - boolean refreshed = super.refresh(resourceId, owner, holdSec); - - doIntercept(false, impl -> impl.afterRefresh(resourceId, owner, refreshed)); - - return refreshed; - }); - } - - @Override - public boolean unlock(String resourceId, String owner) { - if (resourceId == null) { - throw makeNullArgException(MSG_NULL_RESOURCE_ID); - } - - if (owner == null) { - throw makeNullArgException(MSG_NULL_OWNER); - } - - - return doBoolIntercept(impl -> impl.beforeUnlock(resourceId, owner), () -> { - - // implementer didn't do the work - defer to the superclass - boolean unlocked = super.unlock(resourceId, owner); - - doIntercept(false, impl -> impl.afterUnlock(resourceId, owner, unlocked)); - - return unlocked; - }); - } - - /** - * Is locked. - * - * @throws IllegalArgumentException if the resourceId is {@code null} - */ - @Override - public boolean isLocked(String resourceId) { - if (resourceId == null) { - throw makeNullArgException(MSG_NULL_RESOURCE_ID); - } - - - return doBoolIntercept(impl -> impl.beforeIsLocked(resourceId), () -> - - // implementer didn't do the work - defer to the superclass - super.isLocked(resourceId) - ); - } +public interface PolicyResourceLockManager extends Startable, Lockable { /** - * Is locked by. + * Requests a lock on a resource. Typically, the lock is not immediately granted, + * though a "lock" object is always returned. Once the lock has been granted (or + * denied), the callback will be invoked to indicate the result. * - * @throws IllegalArgumentException if the resourceId or owner is {@code null} - */ - @Override - public boolean isLockedBy(String resourceId, String owner) { - if (resourceId == null) { - throw makeNullArgException(MSG_NULL_RESOURCE_ID); - } - - if (owner == null) { - throw makeNullArgException(MSG_NULL_OWNER); - } - - return doBoolIntercept(impl -> impl.beforeIsLockedBy(resourceId, owner), () -> - - // implementer didn't do the work - defer to the superclass - super.isLockedBy(resourceId, owner) - ); - } - - /** - * Applies a function to each implementer of the lock feature. Returns as soon as one - * of them returns a result other than <b>OPER_UNHANDLED</b>. If they all return - * <b>OPER_UNHANDLED</b>, then it returns the result of applying the default function. + * <p/> + * Notes: + * <dl> + * <li>The callback may be invoked <i>before</i> this method returns</li> + * <li>The implementation need not honor waitForLock={@code true}</li> + * </dl> * - * @param interceptFunc intercept function - * @param defaultFunc default function - * @return {@code true} if success, {@code false} otherwise - */ - private boolean doBoolIntercept(Function<PolicyResourceLockFeatureApi, OperResult> interceptFunc, - Supplier<Boolean> defaultFunc) { - - OperResult result = doIntercept(OperResult.OPER_UNHANDLED, interceptFunc); - if (result != OperResult.OPER_UNHANDLED) { - return (result == OperResult.OPER_ACCEPTED); - } - - return defaultFunc.get(); - } - - /** - * Applies a function to each implementer of the lock feature. Returns as soon as one - * of them returns a non-null value. - * - * @param continueValue if the implementer returns this value, then it continues to - * check addition implementers - * @param func function to be applied to the implementers - * @return first non-null value returned by an implementer, <i>continueValue</i> if - * they all returned <i>continueValue</i> - */ - private <T> T doIntercept(T continueValue, Function<PolicyResourceLockFeatureApi, T> func) { - - for (PolicyResourceLockFeatureApi impl : getImplementers()) { - try { - T result = func.apply(impl); - if (result != continueValue) { - return result; - } - - } catch (RuntimeException e) { - logger.warn("lock feature {} threw an exception", impl, e); - } - } - - return continueValue; - } - - // these may be overridden by junit tests - - /** - * Get implementers. - * - * @return the list of feature implementers - */ - protected List<PolicyResourceLockFeatureApi> getImplementers() { - return PolicyResourceLockFeatureApiConstants.getImpl().getList(); - } - - /** - * Initialization-on-demand holder idiom. - */ - private static class Singleton { - - private static final PolicyResourceLockManager instance = new PolicyResourceLockManager(); - - /** - * Not invoked. - */ - private Singleton() { - super(); - } - } + * @param resourceId identifier of the resource to be locked + * @param ownerKey information identifying the owner requesting the lock + * @param holdSec amount of time, in seconds, for which the lock should be held once + * it has been granted, after which it will automatically be released + * @param callback callback to be invoked once the lock is granted, or subsequently + * lost; must not be {@code null} + * @param waitForLock {@code true} to wait for the lock, if it is currently locked, + * {@code false} otherwise + * @return a new lock + */ + public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback, + boolean waitForLock); } diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/SimpleLockManager.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/SimpleLockManager.java deleted file mode 100644 index 427fbbc6..00000000 --- a/policy-core/src/main/java/org/onap/policy/drools/core/lock/SimpleLockManager.java +++ /dev/null @@ -1,384 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.core.lock; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; -import org.onap.policy.common.utils.time.CurrentTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Simple lock manager. Callbacks are ignored. Does not redirect to lock feature - * implementers. - */ -public class SimpleLockManager { - - protected static Logger logger = LoggerFactory.getLogger(SimpleLockManager.class); - - // messages used in exceptions - public static final String MSG_NULL_RESOURCE_ID = "null resourceId"; - public static final String MSG_NULL_OWNER = "null owner"; - - /** - * Used to access the current time. May be overridden by junit tests. - */ - private static CurrentTime currentTime = new CurrentTime(); - - /** - * Used to synchronize updates to {@link #resource2data} and {@link #locks}. - */ - private final Object locker = new Object(); - - /** - * Maps a resource to its lock data. Lock data is stored in both this and in - * {@link #locks}. - */ - private final Map<String, Data> resource2data = new HashMap<>(); - - /** - * Lock data, sorted by expiration time. Lock data is stored in both this and in - * {@link #resource2data}. Whenever a lock operation is performed, this structure is - * examined and any expired locks are removed; thus no timer threads are needed to - * remove expired locks. - */ - private final SortedSet<Data> locks = new TreeSet<>(); - - /** - * Constructor. - */ - public SimpleLockManager() { - super(); - } - - /** - * Attempts to lock a resource, rejecting the lock if it is already owned, even if - * it's the same owner; the same owner can use {@link #refresh(String, String, int) - * refresh()}, instead, to extend a lock on a resource. - * - * @param resourceId resource id - * @param owner owner - * @param holdSec the amount of time, in seconds, that the lock should be held - * @return {@code true} if locked, {@code false} if the resource is already locked by - * a different owner - * @throws IllegalArgumentException if the resourceId or owner is {@code null} - */ - public boolean lock(String resourceId, String owner, int holdSec) { - - if (resourceId == null) { - throw makeNullArgException(MSG_NULL_RESOURCE_ID); - } - - if (owner == null) { - throw makeNullArgException(MSG_NULL_OWNER); - } - - boolean locked = false; - - synchronized (locker) { - cleanUpLocks(); - - if (!resource2data.containsKey(resourceId)) { - Data data = new Data(owner, resourceId, currentTime.getMillis() + TimeUnit.SECONDS.toMillis(holdSec)); - resource2data.put(resourceId, data); - locks.add(data); - locked = true; - } - } - - logger.info("lock {} for resource {} owner {}", locked, resourceId, owner); - - return locked; - } - - /** - * Attempts to refresh a lock on a resource. - * - * @param resourceId resource id - * @param owner owner - * @param holdSec the amount of time, in seconds, that the lock should be held - * @return {@code true} if locked, {@code false} if the resource is not currently - * locked by the given owner - * @throws IllegalArgumentException if the resourceId or owner is {@code null} - */ - public boolean refresh(String resourceId, String owner, int holdSec) { - - if (resourceId == null) { - throw makeNullArgException(MSG_NULL_RESOURCE_ID); - } - - if (owner == null) { - throw makeNullArgException(MSG_NULL_OWNER); - } - - boolean refreshed = false; - - synchronized (locker) { - cleanUpLocks(); - - Data existingLock = resource2data.get(resourceId); - if (existingLock != null && existingLock.getOwner().equals(owner)) { - // MUST remove the existing lock from the set - locks.remove(existingLock); - - refreshed = true; - - Data data = new Data(owner, resourceId, currentTime.getMillis() + TimeUnit.SECONDS.toMillis(holdSec)); - resource2data.put(resourceId, data); - locks.add(data); - } - } - - logger.info("refresh lock {} for resource {} owner {}", refreshed, resourceId, owner); - - return refreshed; - } - - /** - * Unlocks a resource. - * - * @param resourceId resource id - * @param owner owner - * @return {@code true} if unlocked, {@code false} if the given owner does not - * currently hold a lock on the resource - * @throws IllegalArgumentException if the resourceId or owner is {@code null} - */ - public boolean unlock(String resourceId, String owner) { - if (resourceId == null) { - throw makeNullArgException(MSG_NULL_RESOURCE_ID); - } - - if (owner == null) { - throw makeNullArgException(MSG_NULL_OWNER); - } - - Data data; - - synchronized (locker) { - cleanUpLocks(); - - if ((data = resource2data.get(resourceId)) != null) { - if (owner.equals(data.getOwner())) { - resource2data.remove(resourceId); - locks.remove(data); - - } else { - data = null; - } - } - } - - boolean unlocked = (data != null); - logger.info("unlock resource {} owner {} = {}", resourceId, owner, unlocked); - - return unlocked; - } - - /** - * Determines if a resource is locked by anyone. - * - * @param resourceId resource id - * @return {@code true} if the resource is locked, {@code false} otherwise - * @throws IllegalArgumentException if the resourceId is {@code null} - */ - public boolean isLocked(String resourceId) { - - if (resourceId == null) { - throw makeNullArgException(MSG_NULL_RESOURCE_ID); - } - - boolean locked; - - synchronized (locker) { - cleanUpLocks(); - - locked = resource2data.containsKey(resourceId); - } - - logger.debug("resource {} isLocked = {}", resourceId, locked); - - return locked; - } - - /** - * Determines if a resource is locked by a particular owner. - * - * @param resourceId resource id - * @param owner owner - * @return {@code true} if the resource is locked, {@code false} otherwise - * @throws IllegalArgumentException if the resourceId or owner is {@code null} - */ - public boolean isLockedBy(String resourceId, String owner) { - - if (resourceId == null) { - throw makeNullArgException(MSG_NULL_RESOURCE_ID); - } - - if (owner == null) { - throw makeNullArgException(MSG_NULL_OWNER); - } - - Data data; - - synchronized (locker) { - cleanUpLocks(); - - data = resource2data.get(resourceId); - } - - boolean locked = (data != null && owner.equals(data.getOwner())); - logger.debug("resource {} isLockedBy {} = {}", resourceId, owner, locked); - - return locked; - } - - /** - * Releases expired locks. - */ - private void cleanUpLocks() { - long tcur = currentTime.getMillis(); - - synchronized (locker) { - Iterator<Data> it = locks.iterator(); - while (it.hasNext()) { - Data data = it.next(); - if (data.getExpirationMs() <= tcur) { - it.remove(); - resource2data.remove(data.getResource()); - } else { - break; - } - } - } - } - - /** - * Makes an exception for when an argument is {@code null}. - * - * @param msg exception message - * @return a new Exception - */ - public static IllegalArgumentException makeNullArgException(String msg) { - return new IllegalArgumentException(msg); - } - - /** - * Data for a single Lock. Sorts by expiration time, then resource, and - * then owner. - */ - protected static class Data implements Comparable<Data> { - - /** - * Owner of the lock. - */ - private final String owner; - - /** - * Resource that is locked. - */ - private final String resource; - - /** - * Time when the lock will expire, in milliseconds. - */ - private final long texpireMs; - - /** - * Constructor. - * - * @param resource resource - * @param owner owner - * @param texpireMs time expire in milliseconds - */ - public Data(String owner, String resource, long texpireMs) { - this.owner = owner; - this.resource = resource; - this.texpireMs = texpireMs; - } - - public String getOwner() { - return owner; - } - - public String getResource() { - return resource; - } - - public long getExpirationMs() { - return texpireMs; - } - - @Override - public int compareTo(Data data) { - int diff = Long.compare(texpireMs, data.texpireMs); - if (diff == 0) { - diff = resource.compareTo(data.resource); - } - if (diff == 0) { - diff = owner.compareTo(data.owner); - } - return diff; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((owner == null) ? 0 : owner.hashCode()); - result = prime * result + ((resource == null) ? 0 : resource.hashCode()); - result = prime * result + (int) (texpireMs ^ (texpireMs >>> 32)); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - Data other = (Data) obj; - if (owner == null) { - if (other.owner != null) { - return false; - } - } else if (!owner.equals(other.owner)) { - return false; - } - if (resource == null) { - if (other.resource != null) { - return false; - } - } else if (!resource.equals(other.resource)) { - return false; - } - return (texpireMs == other.texpireMs); - } - } -} diff --git a/policy-core/src/test/java/org/onap/policy/drools/core/lock/AlwaysFailLockTest.java b/policy-core/src/test/java/org/onap/policy/drools/core/lock/AlwaysFailLockTest.java new file mode 100644 index 00000000..ce4ca5fd --- /dev/null +++ b/policy-core/src/test/java/org/onap/policy/drools/core/lock/AlwaysFailLockTest.java @@ -0,0 +1,106 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.core.lock; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import org.junit.Before; +import org.junit.Test; + +public class AlwaysFailLockTest { + private static final String RESOURCE = "hello"; + private static final String OWNER_KEY = "world"; + private static final int HOLD_SEC = 10; + private static final int HOLD_SEC2 = 10; + + private LockCallback callback; + private AlwaysFailLock lock; + + /** + * Populates {@link #lock}. + */ + @Before + public void setUp() { + callback = mock(LockCallback.class); + + lock = new AlwaysFailLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback); + } + + @Test + public void testSerializable() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(lock); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + try (ObjectInputStream ois = new ObjectInputStream(bais)) { + lock = (AlwaysFailLock) ois.readObject(); + } + + assertEquals(LockState.UNAVAILABLE, lock.getState()); + assertEquals(RESOURCE, lock.getResourceId()); + assertEquals(OWNER_KEY, lock.getOwnerKey()); + assertEquals(HOLD_SEC, lock.getHoldSec()); + + // these fields are transient + assertNull(lock.getCallback()); + } + + @Test + public void testAlwaysFailLockNoArgs() { + // verify that no-arg constructor doesn't throw an exception + new AlwaysFailLock(); + } + + @Test + public void testAlwaysFailLock() { + assertTrue(lock.isUnavailable()); + assertEquals(RESOURCE, lock.getResourceId()); + assertEquals(OWNER_KEY, lock.getOwnerKey()); + assertEquals(HOLD_SEC, lock.getHoldSec()); + assertSame(callback, lock.getCallback()); + } + + @Test + public void testFree() { + assertFalse(lock.free()); + assertTrue(lock.isUnavailable()); + } + + @Test + public void testExtend() { + LockCallback callback2 = mock(LockCallback.class); + lock.extend(HOLD_SEC2, callback2); + + assertEquals(HOLD_SEC2, lock.getHoldSec()); + assertSame(callback2, lock.getCallback()); + } +} diff --git a/policy-core/src/test/java/org/onap/policy/drools/core/lock/LockImplTest.java b/policy-core/src/test/java/org/onap/policy/drools/core/lock/LockImplTest.java new file mode 100644 index 00000000..aab04dc4 --- /dev/null +++ b/policy-core/src/test/java/org/onap/policy/drools/core/lock/LockImplTest.java @@ -0,0 +1,261 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.core.lock; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import org.junit.Before; +import org.junit.Test; + +public class LockImplTest { + private static final LockState STATE = LockState.WAITING; + private static final String RESOURCE = "hello"; + private static final String OWNER_KEY = "world"; + private static final int HOLD_SEC = 10; + private static final int HOLD_SEC2 = 20; + private static final String EXPECTED_EXCEPTION = "expected exception"; + + private LockCallback callback; + private LockImpl lock; + + /** + * Populates {@link #lock}. + */ + @Before + public void setUp() { + callback = mock(LockCallback.class); + + lock = new LockImpl(STATE, RESOURCE, OWNER_KEY, HOLD_SEC, callback); + } + + @Test + public void testSerializable() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(lock); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + try (ObjectInputStream ois = new ObjectInputStream(bais)) { + lock = (LockImpl) ois.readObject(); + } + + assertEquals(STATE, lock.getState()); + assertEquals(RESOURCE, lock.getResourceId()); + assertEquals(OWNER_KEY, lock.getOwnerKey()); + assertEquals(HOLD_SEC, lock.getHoldSec()); + + // these fields are transient + assertNull(lock.getCallback()); + } + + @Test + public void testLockImplNoArgs() { + // use no-arg constructor + lock = new LockImpl(); + assertEquals(LockState.UNAVAILABLE, lock.getState()); + assertNull(lock.getResourceId()); + assertNull(lock.getOwnerKey()); + assertNull(lock.getCallback()); + assertEquals(0, lock.getHoldSec()); + } + + @Test + public void testLockImpl_testGetters() { + assertEquals(STATE, lock.getState()); + assertEquals(RESOURCE, lock.getResourceId()); + assertEquals(OWNER_KEY, lock.getOwnerKey()); + assertSame(callback, lock.getCallback()); + assertEquals(HOLD_SEC, lock.getHoldSec()); + + // test illegal args + assertThatThrownBy(() -> new LockImpl(null, RESOURCE, OWNER_KEY, HOLD_SEC, callback)) + .hasMessageContaining("state"); + assertThatThrownBy(() -> new LockImpl(STATE, null, OWNER_KEY, HOLD_SEC, callback)) + .hasMessageContaining("resourceId"); + assertThatThrownBy(() -> new LockImpl(STATE, RESOURCE, null, HOLD_SEC, callback)) + .hasMessageContaining("ownerKey"); + assertThatIllegalArgumentException().isThrownBy(() -> new LockImpl(STATE, RESOURCE, OWNER_KEY, -1, callback)) + .withMessageContaining("holdSec"); + assertThatThrownBy(() -> new LockImpl(STATE, RESOURCE, OWNER_KEY, HOLD_SEC, null)) + .hasMessageContaining("callback"); + } + + @Test + public void testFree() { + assertTrue(lock.free()); + assertTrue(lock.isUnavailable()); + + // should fail this time + assertFalse(lock.free()); + assertTrue(lock.isUnavailable()); + + // no call-backs should have been invoked + verify(callback, never()).lockAvailable(any()); + verify(callback, never()).lockUnavailable(any()); + } + + @Test + public void testExtend() { + lock.setState(LockState.WAITING); + + LockCallback callback2 = mock(LockCallback.class); + lock.extend(HOLD_SEC2, callback2); + assertTrue(lock.isActive()); + assertEquals(HOLD_SEC2, lock.getHoldSec()); + assertSame(callback2, lock.getCallback()); + verify(callback2).lockAvailable(lock); + verify(callback2, never()).lockUnavailable(any()); + + // first call-back should never have been invoked + verify(callback, never()).lockAvailable(any()); + verify(callback, never()).lockUnavailable(any()); + + // extend again + LockCallback callback3 = mock(LockCallback.class); + lock.extend(HOLD_SEC, callback3); + assertEquals(HOLD_SEC, lock.getHoldSec()); + assertSame(callback3, lock.getCallback()); + assertTrue(lock.isActive()); + verify(callback3).lockAvailable(lock); + verify(callback3, never()).lockUnavailable(any()); + + // other call-backs should not have been invoked again + verify(callback, never()).lockAvailable(any()); + verify(callback, never()).lockUnavailable(any()); + + verify(callback2).lockAvailable(any()); + verify(callback2, never()).lockUnavailable(any()); + + assertTrue(lock.free()); + + // extend after free - should fail + lock.extend(HOLD_SEC2, callback); + assertTrue(lock.isUnavailable()); + + // call-backs should not have been invoked again + verify(callback, never()).lockAvailable(any()); + verify(callback, never()).lockUnavailable(any()); + + verify(callback2).lockAvailable(any()); + verify(callback2, never()).lockUnavailable(any()); + + verify(callback3).lockAvailable(lock); + verify(callback3, never()).lockUnavailable(any()); + } + + @Test + public void testNotifyAvailable() { + lock.notifyAvailable(); + + verify(callback).lockAvailable(any()); + verify(callback, never()).lockUnavailable(any()); + } + + @Test + public void testNotifyAvailable_Ex() { + doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(callback).lockAvailable(any()); + doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(callback).lockUnavailable(any()); + + // should not throw an exception + lock.notifyAvailable(); + } + + @Test + public void testNotifyUnavailable() { + lock.notifyUnavailable(); + + verify(callback, never()).lockAvailable(any()); + verify(callback).lockUnavailable(any()); + } + + @Test + public void testNotifyUnavailable_Ex() { + doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(callback).lockAvailable(any()); + doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(callback).lockUnavailable(any()); + + // should not throw an exception + lock.notifyUnavailable(); + } + + @Test + public void testSetState_testIsActive_testIsWaiting_testIsUnavailable() { + lock.setState(LockState.WAITING); + assertEquals(LockState.WAITING, lock.getState()); + assertFalse(lock.isActive()); + assertFalse(lock.isUnavailable()); + assertTrue(lock.isWaiting()); + + lock.setState(LockState.ACTIVE); + assertEquals(LockState.ACTIVE, lock.getState()); + assertTrue(lock.isActive()); + assertFalse(lock.isUnavailable()); + assertFalse(lock.isWaiting()); + + lock.setState(LockState.UNAVAILABLE); + assertEquals(LockState.UNAVAILABLE, lock.getState()); + assertFalse(lock.isActive()); + assertTrue(lock.isUnavailable()); + assertFalse(lock.isWaiting()); + } + + @Test + public void testSetHoldSec() { + assertEquals(HOLD_SEC, lock.getHoldSec()); + + lock.setHoldSec(HOLD_SEC2); + assertEquals(HOLD_SEC2, lock.getHoldSec()); + } + + @Test + public void testSetCallback() { + assertSame(callback, lock.getCallback()); + + LockCallback callback2 = mock(LockCallback.class); + lock.setCallback(callback2); + assertSame(callback2, lock.getCallback()); + } + + @Test + public void testToString() { + String text = lock.toString(); + + assertNotNull(text); + assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback").doesNotContain("succeed"); + } +} diff --git a/policy-core/src/test/java/org/onap/policy/drools/core/lock/LockTest.java b/policy-core/src/test/java/org/onap/policy/drools/core/lock/LockTest.java deleted file mode 100644 index 5a88b02f..00000000 --- a/policy-core/src/test/java/org/onap/policy/drools/core/lock/LockTest.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.core.lock; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.junit.Before; -import org.junit.Test; -import org.onap.policy.drools.core.lock.Lock.RemoveResult; -import org.onap.policy.drools.utils.Pair; - -public class LockTest { - - private static final String OWNER = "my.owner"; - private static final String OWNER2 = "another.owner"; - private static final String OWNER3 = "third.owner"; - - private static final Integer ITEM2 = 10; - private static final Integer ITEM3 = 20; - - private Lock<Integer> lock; - private Pair<String, Integer> newOwner; - - @Before - public void setUp() { - lock = new Lock<>(OWNER); - newOwner = new Pair<>(null, null); - } - - - @Test - public void testLock() { - assertEquals(OWNER, lock.getOwner()); - } - - @Test - public void testGetOwner() { - assertEquals(OWNER, lock.getOwner()); - } - - @Test - public void testAdd() { - assertTrue(lock.add(OWNER2, ITEM2)); - assertTrue(lock.add(OWNER3, ITEM3)); - - // attempt to re-add owner2 with the same item - should fail - assertFalse(lock.add(OWNER2, ITEM2)); - - // attempt to re-add owner2 with a different item - should fail - assertFalse(lock.add(OWNER2, ITEM3)); - } - - @Test(expected = IllegalArgumentException.class) - public void testAdd_ArgEx() { - lock.add(OWNER2, null); - } - - @Test - public void testAdd_AlreadyOwner() { - assertFalse(lock.add(OWNER, ITEM2)); - } - - @Test - public void testAdd_AlreadyInQueue() { - lock.add(OWNER2, ITEM2); - - assertFalse(lock.add(OWNER2, ITEM2)); - } - - @Test - public void testRemoveRequester_Owner_QueueEmpty() { - assertEquals(RemoveResult.UNLOCKED, lock.removeRequester(OWNER, newOwner)); - } - - @Test - public void testRemoveRequester_Owner_QueueHasOneItem() { - lock.add(OWNER2, ITEM2); - - assertEquals(RemoveResult.RELOCKED, lock.removeRequester(OWNER, newOwner)); - assertEquals(OWNER2, newOwner.first()); - assertEquals(ITEM2, newOwner.second()); - - assertEquals(RemoveResult.UNLOCKED, lock.removeRequester(OWNER2, newOwner)); - } - - @Test - public void testRemoveRequester_Owner_QueueHasMultipleItems() { - lock.add(OWNER2, ITEM2); - lock.add(OWNER3, ITEM3); - - assertEquals(RemoveResult.RELOCKED, lock.removeRequester(OWNER, newOwner)); - assertEquals(OWNER2, newOwner.first()); - assertEquals(ITEM2, newOwner.second()); - - assertEquals(RemoveResult.RELOCKED, lock.removeRequester(OWNER2, newOwner)); - assertEquals(OWNER3, newOwner.first()); - assertEquals(ITEM3, newOwner.second()); - - assertEquals(RemoveResult.UNLOCKED, lock.removeRequester(OWNER3, newOwner)); - } - - @Test - public void testRemoveRequester_InQueue() { - lock.add(OWNER2, ITEM2); - - assertEquals(RemoveResult.REMOVED, lock.removeRequester(OWNER2, newOwner)); - } - - @Test - public void testRemoveRequester_NeitherOwnerNorInQueue() { - assertEquals(RemoveResult.NOT_FOUND, lock.removeRequester(OWNER2, newOwner)); - } - -} diff --git a/policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiTest.java b/policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiTest.java deleted file mode 100644 index 999ae50f..00000000 --- a/policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * api-resource-locks - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.core.lock; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - -import org.junit.Before; -import org.junit.Test; -import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi.OperResult; - -public class PolicyResourceLockFeatureApiTest { - - private static final String RESOURCE_ID = "the resource"; - private static final String OWNER = "the owner"; - - private PolicyResourceLockFeatureApi api; - - /** - * set up. - */ - @Before - public void setUp() { - api = new PolicyResourceLockFeatureApi() { - @Override - public int getSequenceNumber() { - return 0; - } - }; - } - - @Test - public void testBeforeLock() { - assertEquals(OperResult.OPER_UNHANDLED, api.beforeLock(RESOURCE_ID, OWNER, 0)); - } - - @Test - public void testAfterLock() { - assertFalse(api.afterLock(RESOURCE_ID, OWNER, true)); - assertFalse(api.afterLock(RESOURCE_ID, OWNER, false)); - } - - @Test - public void testBeforeRefresh() { - assertEquals(OperResult.OPER_UNHANDLED, api.beforeRefresh(RESOURCE_ID, OWNER, 0)); - } - - @Test - public void testAfterRefresh() { - assertFalse(api.afterRefresh(RESOURCE_ID, OWNER, true)); - assertFalse(api.afterRefresh(RESOURCE_ID, OWNER, false)); - } - - @Test - public void testBeforeUnlock() { - assertEquals(OperResult.OPER_UNHANDLED, api.beforeUnlock(RESOURCE_ID, OWNER)); - } - - @Test - public void testAfterUnlock() { - assertFalse(api.afterUnlock(RESOURCE_ID, OWNER, true)); - assertFalse(api.afterUnlock(RESOURCE_ID, OWNER, false)); - } - - @Test - public void testBeforeIsLocked() { - assertEquals(OperResult.OPER_UNHANDLED, api.beforeIsLocked(RESOURCE_ID)); - } - - @Test - public void testBeforeIsLockedBy() { - assertEquals(OperResult.OPER_UNHANDLED, api.beforeIsLockedBy(RESOURCE_ID, OWNER)); - } -} diff --git a/policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockManagerTest.java b/policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockManagerTest.java deleted file mode 100644 index f575ce49..00000000 --- a/policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockManagerTest.java +++ /dev/null @@ -1,595 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.core.lock; - -import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import org.junit.Before; -import org.junit.Test; -import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi.OperResult; - -public class PolicyResourceLockManagerTest { - - private static final int MAX_AGE_SEC = 4 * 60; - - private static final String NULL_RESOURCE_ID = "null resourceId"; - private static final String NULL_OWNER = "null owner"; - - private static final String RESOURCE_A = "resource.a"; - private static final String RESOURCE_B = "resource.b"; - private static final String RESOURCE_C = "resource.c"; - - private static final String OWNER1 = "owner.one"; - private static final String OWNER2 = "owner.two"; - private static final String OWNER3 = "owner.three"; - - private PolicyResourceLockFeatureApi impl1; - private PolicyResourceLockFeatureApi impl2; - private List<PolicyResourceLockFeatureApi> implList; - - private PolicyResourceLockManager mgr; - - /** - * Set up. - */ - @Before - public void setUp() { - impl1 = mock(PolicyResourceLockFeatureApi.class); - impl2 = mock(PolicyResourceLockFeatureApi.class); - - initImplementer(impl1); - initImplementer(impl2); - - // list of feature API implementers - implList = new LinkedList<>(Arrays.asList(impl1, impl2)); - - mgr = new PolicyResourceLockManager() { - @Override - protected List<PolicyResourceLockFeatureApi> getImplementers() { - return implList; - } - }; - } - - /** - * Initializes an implementer so it always returns {@code null}. - * - * @param impl implementer - */ - private void initImplementer(PolicyResourceLockFeatureApi impl) { - when(impl.beforeLock(anyString(), anyString(), anyInt())).thenReturn(OperResult.OPER_UNHANDLED); - when(impl.beforeRefresh(anyString(), anyString(), anyInt())).thenReturn(OperResult.OPER_UNHANDLED); - when(impl.beforeUnlock(anyString(), anyString())).thenReturn(OperResult.OPER_UNHANDLED); - when(impl.beforeIsLocked(anyString())).thenReturn(OperResult.OPER_UNHANDLED); - when(impl.beforeIsLockedBy(anyString(), anyString())).thenReturn(OperResult.OPER_UNHANDLED); - } - - @Test - public void testLock() throws Exception { - assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - verify(impl1).beforeLock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - verify(impl2).beforeLock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - verify(impl1).afterLock(RESOURCE_A, OWNER1, true); - verify(impl2).afterLock(RESOURCE_A, OWNER1, true); - - assertTrue(mgr.isLocked(RESOURCE_A)); - assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1)); - assertFalse(mgr.isLocked(RESOURCE_B)); - assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2)); - - // null callback - not locked yet - assertTrue(mgr.lock(RESOURCE_C, OWNER3, MAX_AGE_SEC)); - - // null callback - already locked - assertFalse(mgr.lock(RESOURCE_A, OWNER3, MAX_AGE_SEC)); - } - - @Test - public void testLock_ArgEx() { - assertThatIllegalArgumentException().isThrownBy(() -> mgr.lock(null, OWNER1, MAX_AGE_SEC)) - .withMessage(NULL_RESOURCE_ID); - - assertThatIllegalArgumentException().isThrownBy(() -> mgr.lock(RESOURCE_A, null, MAX_AGE_SEC)) - .withMessage(NULL_OWNER); - - // this should not throw an exception - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - } - - @Test - public void testLock_Acquired_BeforeIntercepted() { - // have impl1 intercept - when(impl1.beforeLock(RESOURCE_A, OWNER1, MAX_AGE_SEC)).thenReturn(OperResult.OPER_ACCEPTED); - - assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - verify(impl1).beforeLock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - verify(impl2, never()).beforeLock(anyString(), anyString(), anyInt()); - - verify(impl1, never()).afterLock(anyString(), anyString(), anyBoolean()); - verify(impl2, never()).afterLock(anyString(), anyString(), anyBoolean()); - } - - @Test - public void testLock_Denied_BeforeIntercepted() { - // have impl1 intercept - when(impl1.beforeLock(RESOURCE_A, OWNER1, MAX_AGE_SEC)).thenReturn(OperResult.OPER_DENIED); - - assertFalse(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - verify(impl1).beforeLock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - verify(impl2, never()).beforeLock(anyString(), anyString(), anyInt()); - - verify(impl1, never()).afterLock(anyString(), anyString(), anyBoolean()); - verify(impl2, never()).afterLock(anyString(), anyString(), anyBoolean()); - } - - @Test - public void testLock_Acquired_AfterIntercepted() throws Exception { - - // impl1 intercepts during afterLock() - when(impl1.afterLock(RESOURCE_A, OWNER1, true)).thenReturn(true); - - assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - // impl1 sees it, but impl2 does not - verify(impl1).afterLock(RESOURCE_A, OWNER1, true); - verify(impl2, never()).afterLock(anyString(), anyString(), anyBoolean()); - } - - @Test - public void testLock_Acquired() throws Exception { - assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - verify(impl1).afterLock(RESOURCE_A, OWNER1, true); - verify(impl2).afterLock(RESOURCE_A, OWNER1, true); - } - - @Test - public void testLock_Denied_AfterIntercepted() throws Exception { - - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - // impl1 intercepts during afterLock() - when(impl1.afterLock(RESOURCE_A, OWNER2, false)).thenReturn(true); - - // owner2 tries to lock - assertFalse(mgr.lock(RESOURCE_A, OWNER2, MAX_AGE_SEC)); - - // impl1 sees it, but impl2 does not - verify(impl1).afterLock(RESOURCE_A, OWNER2, false); - verify(impl2, never()).afterLock(RESOURCE_A, OWNER2, false); - } - - @Test - public void testLock_Denied() { - - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - // owner2 tries to lock - mgr.lock(RESOURCE_A, OWNER2, MAX_AGE_SEC); - - verify(impl1).afterLock(RESOURCE_A, OWNER2, false); - verify(impl2).afterLock(RESOURCE_A, OWNER2, false); - } - - @Test - public void testRefresh() throws Exception { - assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - verify(impl1).beforeRefresh(RESOURCE_A, OWNER1, MAX_AGE_SEC); - verify(impl2).beforeRefresh(RESOURCE_A, OWNER1, MAX_AGE_SEC); - verify(impl1).afterRefresh(RESOURCE_A, OWNER1, true); - verify(impl2).afterRefresh(RESOURCE_A, OWNER1, true); - - assertTrue(mgr.isLocked(RESOURCE_A)); - assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1)); - assertFalse(mgr.isLocked(RESOURCE_B)); - assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2)); - - // different owner and resource - assertFalse(mgr.refresh(RESOURCE_C, OWNER3, MAX_AGE_SEC)); - - // different owner - assertFalse(mgr.refresh(RESOURCE_A, OWNER3, MAX_AGE_SEC)); - } - - @Test - public void testRefresh_ArgEx() { - assertThatIllegalArgumentException().isThrownBy(() -> mgr.refresh(null, OWNER1, MAX_AGE_SEC)) - .withMessage(NULL_RESOURCE_ID); - - assertThatIllegalArgumentException().isThrownBy(() -> mgr.refresh(RESOURCE_A, null, MAX_AGE_SEC)) - .withMessage(NULL_OWNER); - - // this should not throw an exception - mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC); - } - - @Test - public void testRefresh_Acquired_BeforeIntercepted() { - assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - // have impl1 intercept - when(impl1.beforeRefresh(RESOURCE_A, OWNER1, MAX_AGE_SEC)).thenReturn(OperResult.OPER_ACCEPTED); - - assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - verify(impl1).beforeRefresh(RESOURCE_A, OWNER1, MAX_AGE_SEC); - verify(impl2, never()).beforeRefresh(anyString(), anyString(), anyInt()); - - verify(impl1, never()).afterRefresh(anyString(), anyString(), anyBoolean()); - verify(impl2, never()).afterRefresh(anyString(), anyString(), anyBoolean()); - } - - @Test - public void testRefresh_Denied_BeforeIntercepted() { - assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - // have impl1 intercept - when(impl1.beforeRefresh(RESOURCE_A, OWNER1, MAX_AGE_SEC)).thenReturn(OperResult.OPER_DENIED); - - assertFalse(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - verify(impl1).beforeRefresh(RESOURCE_A, OWNER1, MAX_AGE_SEC); - verify(impl2, never()).beforeRefresh(anyString(), anyString(), anyInt()); - - verify(impl1, never()).afterRefresh(anyString(), anyString(), anyBoolean()); - verify(impl2, never()).afterRefresh(anyString(), anyString(), anyBoolean()); - } - - @Test - public void testRefresh_Acquired_AfterIntercepted() throws Exception { - assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - // impl1 intercepts during afterRefresh() - when(impl1.afterRefresh(RESOURCE_A, OWNER1, true)).thenReturn(true); - - assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - // impl1 sees it, but impl2 does not - verify(impl1).afterRefresh(RESOURCE_A, OWNER1, true); - verify(impl2, never()).afterRefresh(anyString(), anyString(), anyBoolean()); - } - - @Test - public void testRefresh_Acquired() throws Exception { - assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - verify(impl1).afterRefresh(RESOURCE_A, OWNER1, true); - verify(impl2).afterRefresh(RESOURCE_A, OWNER1, true); - } - - @Test - public void testRefresh_Denied_AfterIntercepted() throws Exception { - - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - // impl1 intercepts during afterRefresh() - when(impl1.afterRefresh(RESOURCE_A, OWNER2, false)).thenReturn(true); - - // owner2 tries to lock - assertFalse(mgr.refresh(RESOURCE_A, OWNER2, MAX_AGE_SEC)); - - // impl1 sees it, but impl2 does not - verify(impl1).afterRefresh(RESOURCE_A, OWNER2, false); - verify(impl2, never()).afterRefresh(RESOURCE_A, OWNER2, false); - } - - @Test - public void testRefresh_Denied() { - - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - // owner2 tries to lock - mgr.refresh(RESOURCE_A, OWNER2, MAX_AGE_SEC); - - verify(impl1).afterRefresh(RESOURCE_A, OWNER2, false); - verify(impl2).afterRefresh(RESOURCE_A, OWNER2, false); - } - - @Test - public void testUnlock() throws Exception { - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - mgr.lock(RESOURCE_B, OWNER1, MAX_AGE_SEC); - - assertTrue(mgr.unlock(RESOURCE_A, OWNER1)); - - verify(impl1).beforeUnlock(RESOURCE_A, OWNER1); - verify(impl2).beforeUnlock(RESOURCE_A, OWNER1); - - verify(impl1).afterUnlock(RESOURCE_A, OWNER1, true); - verify(impl2).afterUnlock(RESOURCE_A, OWNER1, true); - } - - @Test - public void testUnlock_ArgEx() { - assertThatIllegalArgumentException().isThrownBy(() -> mgr.unlock(null, OWNER1)).withMessage(NULL_RESOURCE_ID); - - assertThatIllegalArgumentException().isThrownBy(() -> mgr.unlock(RESOURCE_A, null)).withMessage(NULL_OWNER); - } - - @Test - public void testUnlock_BeforeInterceptedTrue() { - - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - // have impl1 intercept - when(impl1.beforeUnlock(RESOURCE_A, OWNER1)).thenReturn(OperResult.OPER_ACCEPTED); - - assertTrue(mgr.unlock(RESOURCE_A, OWNER1)); - - verify(impl1).beforeUnlock(RESOURCE_A, OWNER1); - verify(impl2, never()).beforeUnlock(anyString(), anyString()); - - verify(impl1, never()).afterUnlock(anyString(), anyString(), anyBoolean()); - verify(impl2, never()).afterUnlock(anyString(), anyString(), anyBoolean()); - } - - @Test - public void testUnlock_BeforeInterceptedFalse() { - - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - // have impl1 intercept - when(impl1.beforeUnlock(RESOURCE_A, OWNER1)).thenReturn(OperResult.OPER_DENIED); - - assertFalse(mgr.unlock(RESOURCE_A, OWNER1)); - - verify(impl1).beforeUnlock(RESOURCE_A, OWNER1); - verify(impl2, never()).beforeUnlock(anyString(), anyString()); - - verify(impl1, never()).afterUnlock(anyString(), anyString(), anyBoolean()); - verify(impl2, never()).afterUnlock(anyString(), anyString(), anyBoolean()); - } - - @Test - public void testUnlock_Unlocked() { - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - assertTrue(mgr.unlock(RESOURCE_A, OWNER1)); - - verify(impl1).beforeUnlock(RESOURCE_A, OWNER1); - verify(impl2).beforeUnlock(RESOURCE_A, OWNER1); - - verify(impl1).afterUnlock(RESOURCE_A, OWNER1, true); - verify(impl2).afterUnlock(RESOURCE_A, OWNER1, true); - } - - @Test - public void testUnlock_Unlocked_AfterIntercepted() { - // have impl1 intercept - when(impl1.afterUnlock(RESOURCE_A, OWNER1, true)).thenReturn(true); - - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - assertTrue(mgr.unlock(RESOURCE_A, OWNER1)); - - verify(impl1).beforeUnlock(RESOURCE_A, OWNER1); - verify(impl2).beforeUnlock(RESOURCE_A, OWNER1); - - verify(impl1).afterUnlock(RESOURCE_A, OWNER1, true); - verify(impl2, never()).afterUnlock(RESOURCE_A, OWNER1, true); - } - - @Test - public void testUnlock_NotUnlocked() { - assertFalse(mgr.unlock(RESOURCE_A, OWNER1)); - - verify(impl1).beforeUnlock(RESOURCE_A, OWNER1); - verify(impl2).beforeUnlock(RESOURCE_A, OWNER1); - - verify(impl1).afterUnlock(RESOURCE_A, OWNER1, false); - verify(impl2).afterUnlock(RESOURCE_A, OWNER1, false); - } - - @Test - public void testUnlock_NotUnlocked_AfterIntercepted() { - // have impl1 intercept - when(impl1.afterUnlock(RESOURCE_A, OWNER1, false)).thenReturn(true); - - assertFalse(mgr.unlock(RESOURCE_A, OWNER1)); - - verify(impl1).beforeUnlock(RESOURCE_A, OWNER1); - verify(impl2).beforeUnlock(RESOURCE_A, OWNER1); - - verify(impl1).afterUnlock(RESOURCE_A, OWNER1, false); - verify(impl2, never()).afterUnlock(RESOURCE_A, OWNER1, false); - } - - @Test - public void testIsLocked_True() { - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - assertTrue(mgr.isLocked(RESOURCE_A)); - - verify(impl1).beforeIsLocked(RESOURCE_A); - verify(impl2).beforeIsLocked(RESOURCE_A); - } - - @Test - public void testIsLocked_False() { - assertFalse(mgr.isLocked(RESOURCE_A)); - - verify(impl1).beforeIsLocked(RESOURCE_A); - verify(impl2).beforeIsLocked(RESOURCE_A); - } - - @Test - public void testIsLocked_ArgEx() { - assertThatIllegalArgumentException().isThrownBy(() -> mgr.isLocked(null)).withMessage(NULL_RESOURCE_ID); - } - - @Test - public void testIsLocked_BeforeIntercepted_True() { - - // have impl1 intercept - when(impl1.beforeIsLocked(RESOURCE_A)).thenReturn(OperResult.OPER_ACCEPTED);; - - assertTrue(mgr.isLocked(RESOURCE_A)); - - verify(impl1).beforeIsLocked(RESOURCE_A); - verify(impl2, never()).beforeIsLocked(RESOURCE_A); - } - - @Test - public void testIsLocked_BeforeIntercepted_False() { - - // lock it so we can verify that impl1 overrides the superclass isLocker() - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - // have impl1 intercept - when(impl1.beforeIsLocked(RESOURCE_A)).thenReturn(OperResult.OPER_DENIED); - - assertFalse(mgr.isLocked(RESOURCE_A)); - - verify(impl1).beforeIsLocked(RESOURCE_A); - verify(impl2, never()).beforeIsLocked(RESOURCE_A); - } - - @Test - public void testIsLockedBy_True() { - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1)); - - verify(impl1).beforeIsLockedBy(RESOURCE_A, OWNER1); - verify(impl2).beforeIsLockedBy(RESOURCE_A, OWNER1); - } - - @Test - public void testIsLockedBy_False() { - // different owner - mgr.lock(RESOURCE_A, OWNER2, MAX_AGE_SEC); - - assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER1)); - - verify(impl1).beforeIsLockedBy(RESOURCE_A, OWNER1); - verify(impl2).beforeIsLockedBy(RESOURCE_A, OWNER1); - } - - @Test - public void testIsLockedBy_ArgEx() { - assertThatIllegalArgumentException().isThrownBy(() -> mgr.isLockedBy(null, OWNER1)) - .withMessage(NULL_RESOURCE_ID); - - assertThatIllegalArgumentException().isThrownBy(() -> mgr.isLockedBy(RESOURCE_A, null)).withMessage(NULL_OWNER); - } - - @Test - public void testIsLockedBy_BeforeIntercepted_True() { - - // have impl1 intercept - when(impl1.beforeIsLockedBy(RESOURCE_A, OWNER1)).thenReturn(OperResult.OPER_ACCEPTED);; - - assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1)); - - verify(impl1).beforeIsLockedBy(RESOURCE_A, OWNER1); - verify(impl2, never()).beforeIsLockedBy(RESOURCE_A, OWNER1); - } - - @Test - public void testIsLockedBy_BeforeIntercepted_False() { - - // lock it so we can verify that impl1 overrides the superclass isLocker() - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - // have impl1 intercept - when(impl1.beforeIsLockedBy(RESOURCE_A, OWNER1)).thenReturn(OperResult.OPER_DENIED); - - assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER1)); - - verify(impl1).beforeIsLockedBy(RESOURCE_A, OWNER1); - verify(impl2, never()).beforeIsLockedBy(RESOURCE_A, OWNER1); - } - - @Test - public void testGetInstance() { - PolicyResourceLockManager inst = PolicyResourceLockManager.getInstance(); - assertNotNull(inst); - - // should return the same instance each time - assertEquals(inst, PolicyResourceLockManager.getInstance()); - assertEquals(inst, PolicyResourceLockManager.getInstance()); - } - - @Test - public void testDoIntercept_Empty() { - // clear the implementer list - implList.clear(); - - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - assertTrue(mgr.isLocked(RESOURCE_A)); - assertFalse(mgr.isLocked(RESOURCE_B)); - - verify(impl1, never()).beforeIsLocked(anyString()); - } - - @Test - public void testDoIntercept_Impl1() { - when(impl1.beforeIsLocked(RESOURCE_A)).thenReturn(OperResult.OPER_ACCEPTED);; - - assertTrue(mgr.isLocked(RESOURCE_A)); - - verify(impl1).beforeIsLocked(RESOURCE_A); - verify(impl2, never()).beforeIsLocked(anyString()); - } - - @Test - public void testDoIntercept_Impl2() { - when(impl2.beforeIsLocked(RESOURCE_A)).thenReturn(OperResult.OPER_ACCEPTED);; - - assertTrue(mgr.isLocked(RESOURCE_A)); - - verify(impl1).beforeIsLocked(RESOURCE_A); - verify(impl2).beforeIsLocked(RESOURCE_A); - } - - @Test - public void testDoIntercept_Ex() { - doThrow(new RuntimeException("expected exception")).when(impl1).beforeIsLocked(RESOURCE_A); - - assertFalse(mgr.isLocked(RESOURCE_A)); - - verify(impl1).beforeIsLocked(RESOURCE_A); - verify(impl2).beforeIsLocked(RESOURCE_A); - } -} diff --git a/policy-core/src/test/java/org/onap/policy/drools/core/lock/SimpleLockManagerTest.java b/policy-core/src/test/java/org/onap/policy/drools/core/lock/SimpleLockManagerTest.java deleted file mode 100644 index 51cf68fc..00000000 --- a/policy-core/src/test/java/org/onap/policy/drools/core/lock/SimpleLockManagerTest.java +++ /dev/null @@ -1,527 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.core.lock; - -import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.util.LinkedList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.onap.policy.common.utils.time.CurrentTime; -import org.onap.policy.common.utils.time.TestTime; -import org.onap.policy.drools.core.lock.SimpleLockManager.Data; -import org.powermock.reflect.Whitebox; - -public class SimpleLockManagerTest { - - // Note: this must be a multiple of four - private static final int MAX_AGE_SEC = 4 * 60; - private static final int MAX_AGE_MS = MAX_AGE_SEC * 1000; - - private static final String EXPECTED_EXCEPTION = "expected exception"; - - private static final String NULL_RESOURCE_ID = "null resourceId"; - private static final String NULL_OWNER = "null owner"; - - private static final String RESOURCE_A = "resource.a"; - private static final String RESOURCE_B = "resource.b"; - private static final String RESOURCE_C = "resource.c"; - private static final String RESOURCE_D = "resource.d"; - - private static final String OWNER1 = "owner.one"; - private static final String OWNER2 = "owner.two"; - private static final String OWNER3 = "owner.three"; - - /** - * Name of the "current time" field within the {@link SimpleLockManager} class. - */ - private static final String TIME_FIELD = "currentTime"; - - private static CurrentTime savedTime; - - private TestTime testTime; - private SimpleLockManager mgr; - - @BeforeClass - public static void setUpBeforeClass() { - savedTime = Whitebox.getInternalState(SimpleLockManager.class, TIME_FIELD); - } - - @AfterClass - public static void tearDownAfterClass() { - Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, savedTime); - } - - /** - * Set up. - */ - @Before - public void setUp() { - testTime = new TestTime(); - Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, testTime); - - mgr = new SimpleLockManager(); - } - - @Test - public void testCurrentTime() { - assertNotNull(savedTime); - } - - @Test - public void testLock() throws Exception { - assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - assertTrue(mgr.isLocked(RESOURCE_A)); - assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1)); - assertFalse(mgr.isLocked(RESOURCE_B)); - assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2)); - - // different owner and resource - should succeed - assertTrue(mgr.lock(RESOURCE_C, OWNER3, MAX_AGE_SEC)); - - // different owner - already locked - assertFalse(mgr.lock(RESOURCE_A, OWNER3, MAX_AGE_SEC)); - } - - @Test - public void testLock_ExtendLock() throws Exception { - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - // sleep half of the cycle - testTime.sleep(MAX_AGE_MS / 2); - assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1)); - - // extend the lock - mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - // verify still locked after sleeping the other half of the cycle - testTime.sleep(MAX_AGE_MS / 2 + 1); - assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1)); - - // and should release after another half cycle - testTime.sleep(MAX_AGE_MS / 2); - assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER1)); - } - - @Test - public void testLock_AlreadyLocked() throws Exception { - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - // same owner - assertFalse(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - // different owner - assertFalse(mgr.lock(RESOURCE_A, OWNER2, MAX_AGE_SEC)); - } - - @Test - public void testLock_ArgEx() { - assertThatIllegalArgumentException().isThrownBy(() -> mgr.lock(null, OWNER1, MAX_AGE_SEC)) - .withMessage(NULL_RESOURCE_ID); - - assertThatIllegalArgumentException().isThrownBy(() -> mgr.lock(RESOURCE_A, null, MAX_AGE_SEC)) - .withMessage(NULL_OWNER); - - // this should not throw an exception - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - } - - @Test - public void testRefresh() throws Exception { - // don't own the lock yet - cannot refresh - assertFalse(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - // now the lock is owned - assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - // refresh again - assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC + 1)); - - assertTrue(mgr.isLocked(RESOURCE_A)); - assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1)); - assertFalse(mgr.isLocked(RESOURCE_B)); - assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2)); - - // different owner - assertFalse(mgr.refresh(RESOURCE_A, OWNER3, MAX_AGE_SEC)); - - // different resource - assertFalse(mgr.refresh(RESOURCE_C, OWNER1, MAX_AGE_SEC)); - } - - @Test - public void testRefresh_ExtendLock() throws Exception { - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - // sleep half of the cycle - testTime.sleep(MAX_AGE_MS / 2); - assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1)); - - // extend the lock - mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - // verify still locked after sleeping the other half of the cycle - testTime.sleep(MAX_AGE_MS / 2 + 1); - assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1)); - - // and should release after another half cycle - testTime.sleep(MAX_AGE_MS / 2); - - // cannot refresh expired lock - assertFalse(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER1)); - } - - @Test - public void testRefresh_AlreadyLocked() throws Exception { - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - // same owner - assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC)); - - // different owner - assertFalse(mgr.refresh(RESOURCE_A, OWNER2, MAX_AGE_SEC)); - assertFalse(mgr.lock(RESOURCE_A, OWNER2, MAX_AGE_SEC)); - } - - @Test - public void testRefresh_ArgEx() { - assertThatIllegalArgumentException().isThrownBy(() -> mgr.refresh(null, OWNER1, MAX_AGE_SEC)) - .withMessage(NULL_RESOURCE_ID); - - assertThatIllegalArgumentException().isThrownBy(() -> mgr.refresh(RESOURCE_A, null, MAX_AGE_SEC)) - .withMessage(NULL_OWNER); - - // this should not throw an exception - mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC); - } - - @Test - public void testUnlock() throws Exception { - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - // unlock it - assertTrue(mgr.unlock(RESOURCE_A, OWNER1)); - } - - @Test - public void testUnlock_ArgEx() { - assertThatIllegalArgumentException().isThrownBy(() -> mgr.unlock(null, OWNER1)).withMessage(NULL_RESOURCE_ID); - - assertThatIllegalArgumentException().isThrownBy(() -> mgr.unlock(RESOURCE_A, null)).withMessage(NULL_OWNER); - } - - @Test - public void testUnlock_NotLocked() { - assertFalse(mgr.unlock(RESOURCE_A, OWNER1)); - } - - @Test - public void testUnlock_DiffOwner() { - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - assertFalse(mgr.unlock(RESOURCE_A, OWNER2)); - } - - @Test - public void testIsLocked() { - assertFalse(mgr.isLocked(RESOURCE_A)); - - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - mgr.lock(RESOURCE_B, OWNER1, MAX_AGE_SEC); - - assertTrue(mgr.isLocked(RESOURCE_A)); - assertTrue(mgr.isLocked(RESOURCE_B)); - assertFalse(mgr.isLocked(RESOURCE_C)); - - // unlock from first resource - mgr.unlock(RESOURCE_A, OWNER1); - assertFalse(mgr.isLocked(RESOURCE_A)); - assertTrue(mgr.isLocked(RESOURCE_B)); - assertFalse(mgr.isLocked(RESOURCE_C)); - - // unlock from second resource - mgr.unlock(RESOURCE_B, OWNER1); - assertFalse(mgr.isLocked(RESOURCE_A)); - assertFalse(mgr.isLocked(RESOURCE_B)); - assertFalse(mgr.isLocked(RESOURCE_C)); - } - - @Test - public void testIsLocked_ArgEx() { - assertThatIllegalArgumentException().isThrownBy(() -> mgr.isLocked(null)).withMessage(NULL_RESOURCE_ID); - } - - @Test - public void testIsLockedBy() { - assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER1)); - - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - assertFalse(mgr.isLockedBy(RESOURCE_B, OWNER1)); - - assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1)); - assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2)); - - // unlock from the resource - mgr.unlock(RESOURCE_A, OWNER1); - assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER1)); - assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2)); - assertFalse(mgr.isLockedBy(RESOURCE_B, OWNER1)); - } - - @Test - public void testIsLockedBy_ArgEx() { - assertThatIllegalArgumentException().isThrownBy(() -> mgr.isLockedBy(null, OWNER1)) - .withMessage(NULL_RESOURCE_ID); - - assertThatIllegalArgumentException().isThrownBy(() -> mgr.isLockedBy(RESOURCE_A, null)).withMessage(NULL_OWNER); - } - - @Test - public void testIsLockedBy_NotLocked() { - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - // different resource, thus no lock - assertFalse(mgr.isLockedBy(RESOURCE_B, OWNER1)); - } - - @Test - public void testIsLockedBy_LockedButNotOwner() { - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - - // different owner - assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2)); - } - - @Test - public void testCleanUpLocks() throws Exception { - // note: this assumes that MAX_AGE_MS is divisible by 4 - mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC); - assertTrue(mgr.isLocked(RESOURCE_A)); - - testTime.sleep(10); - mgr.lock(RESOURCE_B, OWNER1, MAX_AGE_SEC); - assertTrue(mgr.isLocked(RESOURCE_A)); - assertTrue(mgr.isLocked(RESOURCE_B)); - - testTime.sleep(MAX_AGE_MS / 4); - mgr.lock(RESOURCE_C, OWNER1, MAX_AGE_SEC); - assertTrue(mgr.isLocked(RESOURCE_A)); - assertTrue(mgr.isLocked(RESOURCE_B)); - assertTrue(mgr.isLocked(RESOURCE_C)); - - testTime.sleep(MAX_AGE_MS / 4); - mgr.lock(RESOURCE_D, OWNER1, MAX_AGE_SEC); - assertTrue(mgr.isLocked(RESOURCE_A)); - assertTrue(mgr.isLocked(RESOURCE_B)); - assertTrue(mgr.isLocked(RESOURCE_C)); - assertTrue(mgr.isLocked(RESOURCE_D)); - - // sleep remainder of max age - first two should expire - testTime.sleep(MAX_AGE_MS / 2); - assertFalse(mgr.isLocked(RESOURCE_A)); - assertFalse(mgr.isLocked(RESOURCE_B)); - assertTrue(mgr.isLocked(RESOURCE_C)); - assertTrue(mgr.isLocked(RESOURCE_D)); - - // another quarter - next one should expire - testTime.sleep(MAX_AGE_MS / 4); - assertFalse(mgr.isLocked(RESOURCE_C)); - assertTrue(mgr.isLocked(RESOURCE_D)); - - // another quarter - last one should expire - testTime.sleep(MAX_AGE_MS / 4); - assertFalse(mgr.isLocked(RESOURCE_D)); - } - - @Test - public void testMakeNullArgException() { - IllegalArgumentException ex = SimpleLockManager.makeNullArgException(EXPECTED_EXCEPTION); - assertEquals(EXPECTED_EXCEPTION, ex.getMessage()); - } - - @Test - public void testDataGetXxx() { - long ttime = System.currentTimeMillis() + 5; - Data data = new Data(OWNER1, RESOURCE_A, ttime); - - assertEquals(OWNER1, data.getOwner()); - assertEquals(RESOURCE_A, data.getResource()); - assertEquals(ttime, data.getExpirationMs()); - } - - @Test - public void testDataCompareTo() { - long ttime = System.currentTimeMillis() + 50; - Data data = new Data(OWNER1, RESOURCE_A, ttime); - Data dataSame = new Data(OWNER1, RESOURCE_A, ttime); - Data dataDiffExpire = new Data(OWNER1, RESOURCE_A, ttime + 1); - - assertEquals(0, data.compareTo(data)); - assertEquals(0, data.compareTo(dataSame)); - - assertTrue(data.compareTo(dataDiffExpire) < 0); - assertTrue(dataDiffExpire.compareTo(data) > 0); - - Data dataDiffOwner = new Data(OWNER2, RESOURCE_A, ttime); - Data dataDiffResource = new Data(OWNER1, RESOURCE_B, ttime); - - assertTrue(data.compareTo(dataDiffOwner) < 0); - assertTrue(dataDiffOwner.compareTo(data) > 0); - - assertTrue(data.compareTo(dataDiffResource) < 0); - assertTrue(dataDiffResource.compareTo(data) > 0); - } - - @Test - public void testDataHashCode() { - long ttime = System.currentTimeMillis() + 1; - Data data = new Data(OWNER1, RESOURCE_A, ttime); - Data dataSame = new Data(OWNER1, RESOURCE_A, ttime); - Data dataDiffExpire = new Data(OWNER1, RESOURCE_A, ttime + 1); - Data dataDiffOwner = new Data(OWNER2, RESOURCE_A, ttime); - - int hc1 = data.hashCode(); - assertEquals(hc1, dataSame.hashCode()); - - assertTrue(hc1 != dataDiffExpire.hashCode()); - assertTrue(hc1 != dataDiffOwner.hashCode()); - - Data dataDiffResource = new Data(OWNER1, RESOURCE_B, ttime); - Data dataNullOwner = new Data(null, RESOURCE_A, ttime); - Data dataNullResource = new Data(OWNER1, null, ttime); - - assertTrue(hc1 != dataDiffResource.hashCode()); - assertTrue(hc1 != dataNullOwner.hashCode()); - assertTrue(hc1 != dataNullResource.hashCode()); - } - - @Test - public void testDataEquals() { - long ttime = System.currentTimeMillis() + 50; - Data data = new Data(OWNER1, RESOURCE_A, ttime); - Data dataSame = new Data(OWNER1, RESOURCE_A, ttime); - Data dataDiffExpire = new Data(OWNER1, RESOURCE_A, ttime + 1); - - assertTrue(data.equals(data)); - assertTrue(data.equals(dataSame)); - - Data dataDiffOwner = new Data(OWNER2, RESOURCE_A, ttime); - Data dataDiffResource = new Data(OWNER1, RESOURCE_B, ttime); - - assertFalse(data.equals(dataDiffExpire)); - assertFalse(data.equals(dataDiffOwner)); - assertFalse(data.equals(dataDiffResource)); - - assertFalse(data.equals(null)); - assertFalse(data.equals("string")); - - Data dataNullOwner = new Data(null, RESOURCE_A, ttime); - Data dataNullResource = new Data(OWNER1, null, ttime); - - assertFalse(dataNullOwner.equals(data)); - assertFalse(dataNullResource.equals(data)); - - assertTrue(dataNullOwner.equals(new Data(null, RESOURCE_A, ttime))); - assertTrue(dataNullResource.equals(new Data(OWNER1, null, ttime))); - } - - @Test - public void testMultiThreaded() throws InterruptedException { - int nthreads = 10; - int nlocks = 100; - - LinkedList<Thread> threads = new LinkedList<>(); - - String[] resources = {RESOURCE_A, RESOURCE_B}; - - final AtomicInteger nfail = new AtomicInteger(0); - - CountDownLatch stopper = new CountDownLatch(1); - CountDownLatch completed = new CountDownLatch(nthreads); - - for (int x = 0; x < nthreads; ++x) { - String owner = "owner." + x; - - Thread thread = new Thread(() -> { - - for (int y = 0; y < nlocks; ++y) { - String res = resources[y % resources.length]; - - try { - // some locks will be acquired, some denied - mgr.lock(res, owner, MAX_AGE_SEC); - - // do some "work" - stopper.await(1L, TimeUnit.MILLISECONDS); - - mgr.unlock(res, owner); - - } catch (InterruptedException expected) { - Thread.currentThread().interrupt(); - break; - } - } - - completed.countDown(); - }); - - thread.setDaemon(true); - threads.add(thread); - } - - // start the threads - for (Thread t : threads) { - t.start(); - } - - // wait for them to complete - completed.await(5000L, TimeUnit.SECONDS); - - // stop the threads from sleeping - stopper.countDown(); - - completed.await(1L, TimeUnit.SECONDS); - - // interrupt those that are still alive - for (Thread t : threads) { - if (t.isAlive()) { - t.interrupt(); - } - } - - assertEquals(0, nfail.get()); - } - -} diff --git a/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java b/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java index fe31eb50..87001ad6 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java +++ b/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java @@ -23,6 +23,7 @@ package org.onap.policy.drools.features; import java.util.Properties; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.utils.services.OrderedService; +import org.onap.policy.drools.core.lock.PolicyResourceLockManager; import org.onap.policy.drools.protocol.configuration.PdpdConfiguration; import org.onap.policy.drools.system.PolicyEngine; @@ -271,4 +272,26 @@ public interface PolicyEngineFeatureApi extends OrderedService { default boolean afterOpen(PolicyEngine engine) { return false; } + + /** + * Called before the PolicyEngine creates a lock manager. + * + * @return a lock manager if this feature intercepts and takes ownership of the + * operation preventing the invocation of lower priority features. Null, + * otherwise + */ + default PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) { + return null; + } + + /** + * Called after the PolicyEngine creates a lock manager. + * + * @return True if this feature intercepts and takes ownership of the operation + * preventing the invocation of lower priority features. False, otherwise + */ + default boolean afterCreateLockManager(PolicyEngine engine, Properties properties, + PolicyResourceLockManager lockManager) { + return false; + } } diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java index 653ff72e..cb0749d9 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java @@ -22,6 +22,7 @@ package org.onap.policy.drools.system; import java.util.List; import java.util.Properties; +import java.util.concurrent.ScheduledExecutorService; import org.onap.policy.common.capabilities.Lockable; import org.onap.policy.common.capabilities.Startable; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; @@ -29,6 +30,8 @@ import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.http.server.HttpServletServer; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; import org.onap.policy.drools.features.PolicyEngineFeatureApi; import org.onap.policy.drools.protocol.configuration.ControllerConfiguration; import org.onap.policy.drools.protocol.configuration.PdpdConfiguration; @@ -198,6 +201,11 @@ public interface PolicyEngine extends Startable, Lockable, TopicListener { List<HttpServletServer> getHttpServers(); /** + * Gets a thread pool that can be used to execute background tasks. + */ + ScheduledExecutorService getExecutorService(); + + /** * get properties configuration. * * @return properties objects @@ -280,6 +288,31 @@ public interface PolicyEngine extends Startable, Lockable, TopicListener { boolean deliver(CommInfrastructure busType, String topic, String event); /** + * Requests a lock on a resource. Typically, the lock is not immediately granted, + * though a "lock" object is always returned. Once the lock has been granted (or + * denied), the callback will be invoked to indicate the result. + * + * <p/> + * Notes: + * <dl> + * <li>The callback may be invoked <i>before</i> this method returns</li> + * <li>The implementation need not honor waitForLock={@code true}</li> + * </dl> + * + * @param resourceId identifier of the resource to be locked + * @param ownerKey information identifying the owner requesting the lock + * @param holdSec amount of time, in seconds, for which the lock should be held once + * it has been granted, after which it will automatically be released + * @param callback callback to be invoked once the lock is granted, or subsequently + * lost; must not be {@code null} + * @param waitForLock {@code true} to wait for the lock, if it is currently locked, + * {@code false} otherwise + * @return a new lock + */ + public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback, + boolean waitForLock); + + /** * Invoked when the host goes into the active state. */ void activate(); diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java index 766848c6..c924fd69 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java @@ -24,7 +24,6 @@ import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERV import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME; import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_PORT; -import com.att.aft.dme2.internal.apache.commons.lang3.StringUtils; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.gson.Gson; @@ -32,11 +31,16 @@ import com.google.gson.GsonBuilder; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Stream; +import lombok.AccessLevel; import lombok.Getter; +import lombok.NonNull; +import org.apache.commons.lang.StringUtils; import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; @@ -54,6 +58,9 @@ import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.controller.DroolsControllerConstants; import org.onap.policy.drools.core.PolicyContainer; import org.onap.policy.drools.core.jmx.PdpJmxListener; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.PolicyResourceLockManager; import org.onap.policy.drools.features.PolicyControllerFeatureApi; import org.onap.policy.drools.features.PolicyControllerFeatureApiConstants; import org.onap.policy.drools.features.PolicyEngineFeatureApi; @@ -67,6 +74,7 @@ import org.onap.policy.drools.protocol.configuration.ControllerConfiguration; import org.onap.policy.drools.protocol.configuration.PdpdConfiguration; import org.onap.policy.drools.server.restful.RestManager; import org.onap.policy.drools.server.restful.aaf.AafTelemetryAuthFilter; +import org.onap.policy.drools.system.internal.SimpleLockManager; import org.onap.policy.drools.utils.PropertyUtil; import org.onap.policy.drools.utils.logging.LoggerUtil; import org.onap.policy.drools.utils.logging.MdcTransaction; @@ -78,7 +86,6 @@ import org.slf4j.LoggerFactory; * Policy Engine Manager Implementation. */ class PolicyEngineManager implements PolicyEngine { - /** * String literals. */ @@ -88,6 +95,9 @@ class PolicyEngineManager implements PolicyEngine { private static final String ENGINE_STOPPED_MSG = "Policy Engine is stopped"; private static final String ENGINE_LOCKED_MSG = "Policy Engine is locked"; + public static final String EXECUTOR_THREAD_PROP = "executor.threads"; + protected static final int DEFAULT_EXECUTOR_THREADS = 5; + /** * logger. */ @@ -134,6 +144,17 @@ class PolicyEngineManager implements PolicyEngine { private List<HttpServletServer> httpServers = new ArrayList<>(); /** + * Thread pool used to execute background tasks. + */ + private ScheduledExecutorService executorService = null; + + /** + * Lock manager used to create locks. + */ + @Getter(AccessLevel.PROTECTED) + private PolicyResourceLockManager lockManager = null; + + /** * gson parser to decode configuration requests. */ private final Gson decoder = new GsonBuilder().disableHtmlEscaping().create(); @@ -214,6 +235,53 @@ class PolicyEngineManager implements PolicyEngine { } @Override + @JsonIgnore + @GsonJsonIgnore + public ScheduledExecutorService getExecutorService() { + return executorService; + } + + private ScheduledExecutorService makeExecutorService(Properties properties) { + int nthreads = DEFAULT_EXECUTOR_THREADS; + try { + nthreads = Integer.valueOf( + properties.getProperty(EXECUTOR_THREAD_PROP, String.valueOf(DEFAULT_EXECUTOR_THREADS))); + + } catch (NumberFormatException e) { + logger.error("invalid number for " + EXECUTOR_THREAD_PROP + " property", e); + } + + return makeScheduledExecutor(nthreads); + } + + private void createLockManager(Properties properties) { + for (PolicyEngineFeatureApi feature : getEngineProviders()) { + try { + this.lockManager = feature.beforeCreateLockManager(this, properties); + if (this.lockManager != null) { + return; + } + } catch (RuntimeException e) { + logger.error("{}: feature {} before-create-lock-manager failure because of {}", this, + feature.getClass().getName(), e.getMessage(), e); + } + } + + try { + this.lockManager = new SimpleLockManager(this, properties); + } catch (RuntimeException e) { + logger.error("{}: cannot create simple lock manager because of {}", this, e.getMessage(), e); + this.lockManager = new SimpleLockManager(this, new Properties()); + } + + /* policy-engine dispatch post operation hook */ + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterCreateLockManager(this, properties, this.lockManager), + (feature, ex) -> logger.error("{}: feature {} after-create-lock-manager failure because of {}", + this, feature.getClass().getName(), ex.getMessage(), ex)); + } + + @Override public synchronized void configure(Properties properties) { if (properties == null) { @@ -257,6 +325,10 @@ class PolicyEngineManager implements PolicyEngine { logger.error("{}: add-http-servers failed", this, e); } + executorService = makeExecutorService(properties); + + createLockManager(properties); + /* policy-engine dispatch post configure hook */ FeatureApiUtils.apply(getEngineProviders(), feature -> feature.afterConfigure(this), @@ -499,6 +571,13 @@ class PolicyEngineManager implements PolicyEngine { AtomicReference<Boolean> success = new AtomicReference<>(true); + try { + success.compareAndSet(true, this.lockManager.start()); + } catch (final RuntimeException e) { + logger.warn("{}: cannot start lock manager because of {}", this, e.getMessage(), e); + success.set(false); + } + /* Start Policy Engine exclusively-owned (unmanaged) http servers */ attempt(success, this.httpServers, @@ -639,9 +718,16 @@ class PolicyEngineManager implements PolicyEngine { (item, ex) -> logger.error("{}: cannot stop http-server {} because of {}", this, item, ex.getMessage(), ex)); + try { + success.compareAndSet(true, this.lockManager.stop()); + } catch (final RuntimeException e) { + logger.warn("{}: cannot stop lock manager because of {}", this, e.getMessage(), e); + success.set(false); + } + // stop JMX? - /* policy-engine dispatch pre stop hook */ + /* policy-engine dispatch post stop hook */ FeatureApiUtils.apply(getEngineProviders(), feature -> feature.afterStop(this), (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this, @@ -688,6 +774,14 @@ class PolicyEngineManager implements PolicyEngine { getTopicEndpointManager().shutdown(); getServletFactory().destroy(); + try { + this.lockManager.shutdown(); + } catch (final RuntimeException e) { + logger.warn("{}: cannot shutdown lock manager because of {}", this, e.getMessage(), e); + } + + executorService.shutdownNow(); + // Stop the JMX listener stopPdpJmxListener(); @@ -806,6 +900,13 @@ class PolicyEngineManager implements PolicyEngine { success = getTopicEndpointManager().lock() && success; + try { + success = (this.lockManager == null || this.lockManager.lock()) && success; + } catch (final RuntimeException e) { + logger.warn("{}: cannot lock() lock manager because of {}", this, e.getMessage(), e); + success = false; + } + /* policy-engine dispatch post lock hook */ FeatureApiUtils.apply(getEngineProviders(), feature -> feature.afterLock(this), @@ -833,6 +934,14 @@ class PolicyEngineManager implements PolicyEngine { this.locked = false; boolean success = true; + + try { + success = (this.lockManager == null || this.lockManager.unlock()) && success; + } catch (final RuntimeException e) { + logger.warn("{}: cannot unlock() lock manager because of {}", this, e.getMessage(), e); + success = false; + } + final List<PolicyController> controllers = getControllerFactory().inventory(); for (final PolicyController controller : controllers) { try { @@ -1167,6 +1276,21 @@ class PolicyEngineManager implements PolicyEngine { feature.getClass().getName(), ex.getMessage(), ex)); } + @Override + public Lock createLock(@NonNull String resourceId, @NonNull String ownerKey, int holdSec, + @NonNull LockCallback callback, boolean waitForLock) { + + if (holdSec < 0) { + throw new IllegalArgumentException("holdSec is negative"); + } + + if (lockManager == null) { + throw new IllegalStateException("lock manager has not been initialized"); + } + + return lockManager.createLock(resourceId, ownerKey, holdSec, callback, waitForLock); + } + private boolean controllerConfig(PdpdConfiguration config) { /* only this one supported for now */ final List<ControllerConfiguration> configControllers = config.getControllers(); @@ -1234,4 +1358,13 @@ class PolicyEngineManager implements PolicyEngine { protected PolicyEngine getPolicyEngine() { return PolicyEngineConstants.getManager(); } + + protected ScheduledExecutorService makeScheduledExecutor(int nthreads) { + ScheduledThreadPoolExecutor exsvc = new ScheduledThreadPoolExecutor(nthreads); + exsvc.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + exsvc.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + exsvc.setRemoveOnCancelPolicy(true); + + return exsvc; + } } diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java new file mode 100644 index 00000000..f5163e9b --- /dev/null +++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java @@ -0,0 +1,426 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.utils.properties.exception.PropertyException; +import org.onap.policy.common.utils.time.CurrentTime; +import org.onap.policy.drools.core.lock.AlwaysFailLock; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockImpl; +import org.onap.policy.drools.core.lock.LockState; +import org.onap.policy.drools.core.lock.PolicyResourceLockManager; +import org.onap.policy.drools.system.PolicyEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Simple implementation of the Lock Feature. Locks do not span across instances of this + * object (i.e., locks do not span across servers). + * + * <p/> + * Note: this implementation does <i>not</i> honor the waitForLocks={@code true} + * parameter. + * + * <p/> + * When a lock is deserialized, it will not initially appear in this feature's map; it + * will be added to the map once free() or extend() is invoked, provided there isn't + * already an entry. + */ +public class SimpleLockManager implements PolicyResourceLockManager { + + private static final Logger logger = LoggerFactory.getLogger(SimpleLockManager.class); + + private static final String NOT_LOCKED_MSG = "not locked"; + private static final String LOCK_LOST_MSG = "lock lost"; + + /** + * Provider of current time. May be overridden by junit tests. + */ + private static CurrentTime currentTime = new CurrentTime(); + + @Getter(AccessLevel.PROTECTED) + @Setter(AccessLevel.PROTECTED) + private static SimpleLockManager latestInstance = null; + + + /** + * Engine with which this manager is associated. + */ + private final PolicyEngine engine; + + /** + * Feature properties. + */ + private final SimpleLockProperties featProps; + + /** + * Maps a resource to the lock that owns it. + */ + private final Map<String, SimpleLock> resource2lock = new ConcurrentHashMap<>(); + + /** + * Thread pool used to check for lock expiration and to notify owners when locks are + * lost. + */ + private ScheduledExecutorService exsvc = null; + + /** + * Used to cancel the expiration checker on shutdown. + */ + private ScheduledFuture<?> checker = null; + + + /** + * Constructs the object. + * + * @param engine engine with which this manager is associated + * @param properties properties used to configure the manager + */ + public SimpleLockManager(PolicyEngine engine, Properties properties) { + try { + this.engine = engine; + this.featProps = new SimpleLockProperties(properties); + + } catch (PropertyException e) { + throw new SimpleLockManagerException(e); + } + } + + @Override + public boolean isAlive() { + return (checker != null); + } + + @Override + public boolean start() { + if (checker != null) { + return false; + } + + exsvc = getThreadPool(); + + checker = exsvc.scheduleWithFixedDelay(this::checkExpired, featProps.getExpireCheckSec(), + featProps.getExpireCheckSec(), TimeUnit.SECONDS); + + setLatestInstance(this); + + return true; + } + + /** + * Stops the expiration checker. Does <i>not</i> invoke any lock call-backs. + */ + @Override + public synchronized boolean stop() { + exsvc = null; + + if (checker == null) { + return false; + } + + ScheduledFuture<?> checker2 = checker; + checker = null; + + checker2.cancel(true); + + return true; + } + + @Override + public void shutdown() { + stop(); + } + + @Override + public boolean isLocked() { + return false; + } + + @Override + public boolean lock() { + return true; + } + + @Override + public boolean unlock() { + return true; + } + + @Override + public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback, + boolean waitForLock) { + + if (latestInstance != this) { + AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback); + lock.notifyUnavailable(); + return lock; + } + + SimpleLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback); + + SimpleLock existingLock = resource2lock.putIfAbsent(resourceId, lock); + + if (existingLock == null) { + lock.grant(); + } else { + lock.deny("resource is busy"); + } + + return lock; + } + + /** + * Checks for expired locks. + */ + private void checkExpired() { + long currentMs = currentTime.getMillis(); + logger.info("checking for expired locks at {}", currentMs); + + /* + * Could do this via an iterator, but using compute() guarantees that the lock + * doesn't get extended while it's being removed from the map. + */ + for (Entry<String, SimpleLock> ent : resource2lock.entrySet()) { + if (!ent.getValue().expired(currentMs)) { + continue; + } + + AtomicReference<SimpleLock> lockref = new AtomicReference<>(null); + + resource2lock.computeIfPresent(ent.getKey(), (resourceId, lock) -> { + if (lock.expired(currentMs)) { + lockref.set(lock); + return null; + } + + return lock; + }); + + SimpleLock lock = lockref.get(); + if (lock != null) { + lock.deny("lock expired"); + } + } + } + + /** + * Simple Lock implementation. + */ + public static class SimpleLock extends LockImpl { + private static final long serialVersionUID = 1L; + + /** + * Time, in milliseconds, when the lock expires. + */ + @Getter + private long holdUntilMs; + + /** + * Feature containing this lock. May be {@code null} until the feature is + * identified. Note: this can only be null if the lock has been de-serialized. + */ + private transient SimpleLockManager feature; + + /** + * Constructs the object. + */ + public SimpleLock() { + this.holdUntilMs = 0; + this.feature = null; + } + + /** + * Constructs the object. + * + * @param state initial state of the lock + * @param resourceId identifier of the resource to be locked + * @param ownerKey information identifying the owner requesting the lock + * @param holdSec amount of time, in seconds, for which the lock should be held, + * after which it will automatically be released + * @param callback callback to be invoked once the lock is granted, or + * subsequently lost; must not be {@code null} + * @param feature feature containing this lock + */ + public SimpleLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback, + SimpleLockManager feature) { + super(state, resourceId, ownerKey, holdSec, callback); + this.feature = feature; + } + + /** + * Determines if the owner's lock has expired. + * + * @param currentMs current time, in milliseconds + * @return {@code true} if the owner's lock has expired, {@code false} otherwise + */ + public boolean expired(long currentMs) { + return (holdUntilMs <= currentMs); + } + + /** + * Grants this lock. The notification is <i>always</i> invoked via a background + * thread. + */ + protected synchronized void grant() { + if (isUnavailable()) { + return; + } + + setState(LockState.ACTIVE); + holdUntilMs = currentTime.getMillis() + TimeUnit.SECONDS.toMillis(getHoldSec()); + + logger.info("lock granted: {}", this); + + feature.exsvc.execute(this::notifyAvailable); + } + + /** + * Permanently denies this lock. The notification is invoked via a background + * thread, if a feature instance is attached, otherwise it uses the foreground + * thread. + * + * @param reason the reason the lock was denied + */ + protected void deny(String reason) { + synchronized (this) { + setState(LockState.UNAVAILABLE); + } + + logger.info("{}: {}", reason, this); + + if (feature == null) { + notifyUnavailable(); + + } else { + feature.exsvc.execute(this::notifyUnavailable); + } + } + + @Override + public boolean free() { + // do a quick check of the state + if (isUnavailable()) { + return false; + } + + logger.info("releasing lock: {}", this); + + if (!attachFeature()) { + setState(LockState.UNAVAILABLE); + return false; + } + + AtomicBoolean result = new AtomicBoolean(false); + + feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> { + + if (curlock == this) { + // this lock was the owner - resource is now available + result.set(true); + setState(LockState.UNAVAILABLE); + return null; + + } else { + return curlock; + } + }); + + return result.get(); + } + + @Override + public void extend(int holdSec, LockCallback callback) { + if (holdSec < 0) { + throw new IllegalArgumentException("holdSec is negative"); + } + + setHoldSec(holdSec); + setCallback(callback); + + // do a quick check of the state + if (isUnavailable() || !attachFeature()) { + deny(LOCK_LOST_MSG); + return; + } + + if (feature.resource2lock.get(getResourceId()) == this) { + grant(); + } else { + deny(NOT_LOCKED_MSG); + } + } + + /** + * Attaches to the feature instance, if not already attached. + * + * @return {@code true} if the lock is now attached to a feature, {@code false} + * otherwise + */ + private synchronized boolean attachFeature() { + if (feature != null) { + // already attached + return true; + } + + feature = latestInstance; + if (feature == null) { + logger.warn("no feature yet for {}", this); + return false; + } + + // put this lock into the map + feature.resource2lock.putIfAbsent(getResourceId(), this); + + return true; + } + + @Override + public String toString() { + return "SimpleLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey=" + getOwnerKey() + + ", holdSec=" + getHoldSec() + ", holdUntilMs=" + holdUntilMs + "]"; + } + } + + // these may be overridden by junit tests + + protected ScheduledExecutorService getThreadPool() { + return engine.getExecutorService(); + } + + protected SimpleLock makeLock(LockState waiting, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, this); + } +} diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManagerException.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManagerException.java new file mode 100644 index 00000000..ff02f39e --- /dev/null +++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManagerException.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +public class SimpleLockManagerException extends RuntimeException { + private static final long serialVersionUID = 1L; + + /** + * Constructor. + * + * @param ex exception to be wrapped + */ + public SimpleLockManagerException(Exception ex) { + super(ex); + } +} diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockProperties.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockProperties.java new file mode 100644 index 00000000..0d1ca89b --- /dev/null +++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockProperties.java @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +import java.util.Properties; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.utils.properties.BeanConfigurator; +import org.onap.policy.common.utils.properties.Property; +import org.onap.policy.common.utils.properties.exception.PropertyException; + + +@Getter +@Setter +public class SimpleLockProperties { + public static final String PREFIX = "simple.locking."; + public static final String EXPIRE_CHECK_SEC = PREFIX + "expire.check.seconds"; + + /** + * Time, in seconds, to wait between checks for expired locks. + */ + @Property(name = EXPIRE_CHECK_SEC, defaultValue = "900") + private int expireCheckSec; + + /** + * Constructs the object, populating fields from the properties. + * + * @param props properties from which to configure this + * @throws PropertyException if an error occurs + */ + public SimpleLockProperties(Properties props) throws PropertyException { + new BeanConfigurator().configureFromProperties(this, props); + } +} diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java index 5e0ead9d..fe1a2345 100644 --- a/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java +++ b/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java @@ -27,12 +27,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -41,6 +43,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; import org.junit.Before; @@ -53,6 +56,10 @@ import org.onap.policy.common.endpoints.http.server.HttpServletServer; import org.onap.policy.common.endpoints.http.server.HttpServletServerFactory; import org.onap.policy.common.utils.gson.GsonTestUtils; import org.onap.policy.drools.controller.DroolsController; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockImpl; +import org.onap.policy.drools.core.lock.PolicyResourceLockManager; import org.onap.policy.drools.features.PolicyControllerFeatureApi; import org.onap.policy.drools.features.PolicyEngineFeatureApi; import org.onap.policy.drools.persistence.SystemPersistence; @@ -61,9 +68,10 @@ import org.onap.policy.drools.protocol.coders.EventProtocolCoder; import org.onap.policy.drools.protocol.configuration.ControllerConfiguration; import org.onap.policy.drools.protocol.configuration.DroolsConfiguration; import org.onap.policy.drools.protocol.configuration.PdpdConfiguration; +import org.onap.policy.drools.system.internal.SimpleLockManager; +import org.onap.policy.drools.system.internal.SimpleLockProperties; public class PolicyEngineManagerTest { - private static final String EXPECTED = "expected exception"; private static final String NOOP_STR = CommInfrastructure.NOOP.name(); @@ -76,6 +84,8 @@ public class PolicyEngineManagerTest { private static final String FEATURE2 = "feature-b"; private static final String MY_TOPIC = "my-topic"; private static final String MESSAGE = "my-message"; + private static final String MY_OWNER = "my-owner"; + private static final String MY_RESOURCE = "my-resource"; private static final Object MY_EVENT = new Object(); @@ -125,6 +135,8 @@ public class PolicyEngineManagerTest { private PdpdConfiguration pdpConfig; private String pdpConfigJson; private PolicyEngineManager mgr; + private ScheduledExecutorService exsvc; + private PolicyResourceLockManager lockmgr; /** * Initializes the object to be tested. @@ -176,6 +188,15 @@ public class PolicyEngineManagerTest { config3 = new ControllerConfiguration(); config4 = new ControllerConfiguration(); pdpConfig = new PdpdConfiguration(); + exsvc = mock(ScheduledExecutorService.class); + lockmgr = mock(PolicyResourceLockManager.class); + + when(lockmgr.start()).thenReturn(true); + when(lockmgr.stop()).thenReturn(true); + when(lockmgr.lock()).thenReturn(true); + when(lockmgr.unlock()).thenReturn(true); + + when(prov2.beforeCreateLockManager(any(), any())).thenReturn(lockmgr); when(prov1.getName()).thenReturn(FEATURE1); when(prov2.getName()).thenReturn(FEATURE2); @@ -387,6 +408,86 @@ public class PolicyEngineManagerTest { assertFalse(config.isEmpty()); } + /** + * Tests that makeExecutorService() uses the value from the thread + * property. + */ + @Test + public void testMakeExecutorServicePropertyProvided() { + PolicyEngineManager mgrspy = spy(mgr); + + properties.setProperty(PolicyEngineManager.EXECUTOR_THREAD_PROP, "3"); + mgrspy.configure(properties); + assertSame(exsvc, mgrspy.getExecutorService()); + verify(mgrspy).makeScheduledExecutor(3); + } + + /** + * Tests that makeExecutorService() uses the default thread count when no thread + * property is provided. + */ + @Test + public void testMakeExecutorServiceNoProperty() { + PolicyEngineManager mgrspy = spy(mgr); + + mgrspy.configure(properties); + assertSame(exsvc, mgrspy.getExecutorService()); + verify(mgrspy).makeScheduledExecutor(PolicyEngineManager.DEFAULT_EXECUTOR_THREADS); + } + + /** + * Tests that makeExecutorService() uses the default thread count when the thread + * property is invalid. + */ + @Test + public void testMakeExecutorServiceInvalidProperty() { + PolicyEngineManager mgrspy = spy(mgr); + + properties.setProperty(PolicyEngineManager.EXECUTOR_THREAD_PROP, "abc"); + mgrspy.configure(properties); + assertSame(exsvc, mgrspy.getExecutorService()); + verify(mgrspy).makeScheduledExecutor(PolicyEngineManager.DEFAULT_EXECUTOR_THREADS); + } + + /** + * Tests createLockManager() when beforeCreateLock throws an exception and returns a + * manager. + */ + @Test + public void testCreateLockManagerHaveProvider() { + // first provider throws an exception + when(prov1.beforeCreateLockManager(any(), any())).thenThrow(new RuntimeException(EXPECTED)); + + mgr.configure(properties); + assertSame(lockmgr, mgr.getLockManager()); + } + + /** + * Tests createLockManager() when SimpleLockManager throws an exception. + */ + @Test + public void testCreateLockManagerSimpleEx() { + when(prov2.beforeCreateLockManager(any(), any())).thenReturn(null); + + // invalid property for SimpleLockManager + properties.setProperty(SimpleLockProperties.EXPIRE_CHECK_SEC, "abc"); + mgr.configure(properties); + + // should create a manager using default properties + assertTrue(mgr.getLockManager() instanceof SimpleLockManager); + } + + /** + * Tests createLockManager() when SimpleLockManager is returned. + */ + @Test + public void testCreateLockManagerSimple() { + when(prov2.beforeCreateLockManager(any(), any())).thenReturn(null); + + mgr.configure(properties); + assertTrue(mgr.getLockManager() instanceof SimpleLockManager); + } + @Test public void testConfigureProperties() throws Exception { // arrange for first provider to throw exceptions @@ -667,6 +768,12 @@ public class PolicyEngineManagerTest { when(sink1.start()).thenThrow(new RuntimeException(EXPECTED)); }); + // lock manager fails to start - still does everything + testStart(false, () -> when(lockmgr.start()).thenReturn(false)); + + // lock manager throws an exception - still does everything + testStart(false, () -> when(lockmgr.start()).thenThrow(new RuntimeException(EXPECTED))); + // servlet wait fails - still does everything testStart(false, () -> when(server1.waitedStart(anyLong())).thenReturn(false)); @@ -796,6 +903,12 @@ public class PolicyEngineManagerTest { // servlet fails to stop - still does everything testStop(false, () -> when(server1.stop()).thenReturn(false)); + // lock manager fails to stop - still does everything + testStop(false, () -> when(lockmgr.stop()).thenReturn(false)); + + // lock manager throws an exception - still does everything + testStop(false, () -> when(lockmgr.stop()).thenThrow(new RuntimeException(EXPECTED))); + // other tests checkBeforeAfter( (prov, flag) -> when(prov.beforeStop(mgr)).thenReturn(flag), @@ -861,6 +974,10 @@ public class PolicyEngineManagerTest { assertTrue(threadStarted); assertTrue(threadInterrupted); + + // lock manager throws an exception - still does everything + testShutdown(() -> doThrow(new RuntimeException(EXPECTED)).when(lockmgr).shutdown()); + // other tests checkBeforeAfter( (prov, flag) -> when(prov.beforeShutdown(mgr)).thenReturn(flag), @@ -906,6 +1023,8 @@ public class PolicyEngineManagerTest { verify(prov1).afterShutdown(mgr); verify(prov2).afterShutdown(mgr); + + verify(exsvc).shutdownNow(); } @Test @@ -985,6 +1104,12 @@ public class PolicyEngineManagerTest { // endpoint manager fails to lock - still does everything testLock(false, () -> when(endpoint.lock()).thenReturn(false)); + // lock manager fails to lock - still does everything + testLock(false, () -> when(lockmgr.lock()).thenReturn(false)); + + // lock manager throws an exception - still does everything + testLock(false, () -> when(lockmgr.lock()).thenThrow(new RuntimeException(EXPECTED))); + // other tests checkBeforeAfter( (prov, flag) -> when(prov.beforeLock(mgr)).thenReturn(flag), @@ -1055,6 +1180,12 @@ public class PolicyEngineManagerTest { // endpoint manager fails to unlock - still does everything testUnlock(false, () -> when(endpoint.unlock()).thenReturn(false)); + // lock manager fails to lock - still does everything + testUnlock(false, () -> when(lockmgr.unlock()).thenReturn(false)); + + // lock manager throws an exception - still does everything + testUnlock(false, () -> when(lockmgr.unlock()).thenThrow(new RuntimeException(EXPECTED))); + // other tests checkBeforeAfter( (prov, flag) -> when(prov.beforeUnlock(mgr)).thenReturn(flag), @@ -1484,6 +1615,32 @@ public class PolicyEngineManagerTest { } @Test + public void testCreateLock() { + Lock lock = mock(Lock.class); + LockCallback callback = mock(LockCallback.class); + when(lockmgr.createLock(MY_RESOURCE, MY_OWNER, 10, callback, false)).thenReturn(lock); + + // not configured yet, thus no lock manager + assertThatIllegalStateException() + .isThrownBy(() -> mgr.createLock(MY_RESOURCE, MY_OWNER, 10, callback, false)); + + // now configure it and try again + mgr.configure(properties); + assertSame(lock, mgr.createLock(MY_RESOURCE, MY_OWNER, 10, callback, false)); + + // test illegal args + assertThatThrownBy(() -> mgr.createLock(null, MY_OWNER, 10, callback, false)) + .hasMessageContaining("resourceId"); + assertThatThrownBy(() -> mgr.createLock(MY_RESOURCE, null, 10, callback, false)) + .hasMessageContaining("ownerKey"); + assertThatIllegalArgumentException() + .isThrownBy(() -> mgr.createLock(MY_RESOURCE, MY_OWNER, -1, callback, false)) + .withMessageContaining("holdSec"); + assertThatThrownBy(() -> mgr.createLock(MY_RESOURCE, MY_OWNER, 10, null, false)) + .hasMessageContaining("callback"); + } + + @Test public void testOpen() throws Throwable { when(prov1.beforeOpen(mgr)).thenThrow(new RuntimeException(EXPECTED)); when(prov1.afterOpen(mgr)).thenThrow(new RuntimeException(EXPECTED)); @@ -1789,6 +1946,11 @@ public class PolicyEngineManagerTest { return engine; } + @Override + protected ScheduledExecutorService makeScheduledExecutor(int nthreads) { + return exsvc; + } + /** * Shutdown thread with overrides. */ diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerExceptionTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerExceptionTest.java new file mode 100644 index 00000000..7ffc72ff --- /dev/null +++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerExceptionTest.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.onap.policy.common.utils.test.ExceptionsTester; +import org.onap.policy.drools.system.internal.SimpleLockManagerException; + +public class SimpleLockManagerExceptionTest extends ExceptionsTester { + + @Test + public void test() { + assertEquals(1, test(SimpleLockManagerException.class)); + } +} diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java new file mode 100644 index 00000000..66406898 --- /dev/null +++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java @@ -0,0 +1,781 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.system.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.common.utils.time.CurrentTime; +import org.onap.policy.common.utils.time.TestTime; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockState; +import org.onap.policy.drools.system.PolicyEngine; +import org.onap.policy.drools.system.PolicyEngineConstants; +import org.onap.policy.drools.system.internal.SimpleLockManager.SimpleLock; +import org.powermock.reflect.Whitebox; + +public class SimpleLockManagerTest { + private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService"; + private static final String TIME_FIELD = "currentTime"; + private static final String OWNER_KEY = "my key"; + private static final String RESOURCE = "my resource"; + private static final String RESOURCE2 = "my resource #2"; + private static final String RESOURCE3 = "my resource #3"; + private static final int HOLD_SEC = 100; + private static final int HOLD_SEC2 = 120; + private static final int HOLD_MS = HOLD_SEC * 1000; + private static final int HOLD_MS2 = HOLD_SEC2 * 1000; + private static final int MAX_THREADS = 10; + private static final int MAX_LOOPS = 50; + + private static CurrentTime saveTime; + private static ScheduledExecutorService saveExec; + private static ScheduledExecutorService realExec; + + private TestTime testTime; + private AtomicInteger nactive; + private AtomicInteger nsuccesses; + private SimpleLockManager feature; + + @Mock + private ScheduledExecutorService exsvc; + + @Mock + private PolicyEngine engine; + + @Mock + private ScheduledFuture<?> future; + + @Mock + private LockCallback callback; + + + /** + * Saves static fields and configures the location of the property files. + */ + @BeforeClass + public static void setUpBeforeClass() { + saveTime = Whitebox.getInternalState(SimpleLockManager.class, TIME_FIELD); + saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD); + + realExec = Executors.newScheduledThreadPool(3); + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec); + } + + /** + * Restores static fields. + */ + @AfterClass + public static void tearDownAfterClass() { + Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, saveTime); + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec); + + realExec.shutdown(); + } + + /** + * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute + * tasks. + */ + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + testTime = new TestTime(); + nactive = new AtomicInteger(0); + nsuccesses = new AtomicInteger(0); + + Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, testTime); + + when(engine.getExecutorService()).thenReturn(exsvc); + + feature = new MyLockingFeature(); + feature.start(); + } + + /** + * Tests constructor() when properties are invalid. + */ + @Test + public void testSimpleLockManagerInvalidProperties() { + // use properties containing an invalid value + Properties props = new Properties(); + props.setProperty(SimpleLockProperties.EXPIRE_CHECK_SEC, "abc"); + + assertThatThrownBy(() -> new MyLockingFeature(engine, props)).isInstanceOf(SimpleLockManagerException.class); + } + + @Test + public void testIsAlive() { + assertTrue(feature.isAlive()); + + feature.stop(); + assertFalse(feature.isAlive()); + } + + @Test + public void testStart() { + assertFalse(feature.start()); + + feature.stop(); + assertTrue(feature.start()); + } + + @Test + public void testStop() { + assertTrue(feature.stop()); + verify(future).cancel(true); + + assertFalse(feature.stop()); + + // no more invocations + verify(future).cancel(anyBoolean()); + } + + @Test + public void testShutdown() { + feature.shutdown(); + + verify(future).cancel(true); + } + + @Test + public void testLockApi() { + assertFalse(feature.isLocked()); + assertTrue(feature.lock()); + assertTrue(feature.unlock()); + } + + @Test + public void testCreateLock() { + // this lock should be granted immediately + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock.isActive()); + assertEquals(testTime.getMillis() + HOLD_MS, lock.getHoldUntilMs()); + + invokeCallback(1); + + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + + // this time it should be busy + Lock lock2 = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertFalse(lock2.isActive()); + assertTrue(lock2.isUnavailable()); + + invokeCallback(2); + + verify(callback, never()).lockAvailable(lock2); + verify(callback).lockUnavailable(lock2); + + // should have been no change to the original lock + assertTrue(lock.isActive()); + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // should work with "true" value also + Lock lock3 = feature.createLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, true); + assertTrue(lock3.isActive()); + invokeCallback(3); + verify(callback).lockAvailable(lock3); + verify(callback, never()).lockUnavailable(lock3); + } + + /** + * Tests lock() when the feature is not the latest instance. + */ + @Test + public void testCreateLockNotLatestInstance() { + SimpleLockManager.setLatestInstance(null); + + Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock.isUnavailable()); + verify(callback, never()).lockAvailable(any()); + verify(callback).lockUnavailable(lock); + } + + @Test + public void testCheckExpired() throws InterruptedException { + final SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + final SimpleLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false); + final SimpleLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC2, callback, false); + + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any()); + + Runnable checker = captor.getValue(); + + // time unchanged - checker should have no impact + checker.run(); + assertTrue(lock.isActive()); + assertTrue(lock2.isActive()); + assertTrue(lock3.isActive()); + + // expire the first two locks + testTime.sleep(HOLD_MS); + checker.run(); + assertFalse(lock.isActive()); + assertFalse(lock2.isActive()); + assertTrue(lock3.isActive()); + + // run the callbacks + captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(5)).execute(captor.capture()); + captor.getAllValues().forEach(Runnable::run); + verify(callback).lockUnavailable(lock); + verify(callback).lockUnavailable(lock2); + verify(callback, never()).lockUnavailable(lock3); + + // should be able to get a lock on the first two resources + assertTrue(feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC + HOLD_SEC2, callback, false).isActive()); + assertTrue(feature.createLock(RESOURCE2, OWNER_KEY, HOLD_SEC + HOLD_SEC2, callback, false).isActive()); + + // lock is still busy on the last resource + assertFalse(feature.createLock(RESOURCE3, OWNER_KEY, HOLD_SEC + HOLD_SEC2, callback, false).isActive()); + + // expire the last lock + testTime.sleep(HOLD_MS2); + checker.run(); + assertFalse(lock3.isActive()); + + // run the callback + captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(9)).execute(captor.capture()); + captor.getValue().run(); + verify(callback).lockUnavailable(lock3); + } + + /** + * Tests checkExpired(), where the lock is removed from the map between invoking + * expired() and compute(). Should cause "null" to be returned by compute(). + * + * @throws InterruptedException if the test is interrupted + */ + @Test + public void testCheckExpiredLockDeleted() throws InterruptedException { + feature = new MyLockingFeature() { + @Override + protected SimpleLock makeLock(LockState waiting, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, feature) { + private static final long serialVersionUID = 1L; + + @Override + public boolean expired(long currentMs) { + // remove the lock from the map + free(); + return true; + } + }; + } + }; + + feature.start(); + + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(1); + + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any()); + + Runnable checker = captor.getValue(); + + checker.run(); + + // lock should now be gone and we should be able to get another + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(2); + + // should have succeeded twice + verify(callback, times(2)).lockAvailable(any()); + + // lock should not be available now + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(3); + verify(callback).lockUnavailable(any()); + } + + /** + * Tests checkExpired(), where the lock is removed from the map and replaced with a + * new lock, between invoking expired() and compute(). Should cause the new lock to be + * returned. + * + * @throws InterruptedException if the test is interrupted + */ + @Test + public void testCheckExpiredLockReplaced() throws InterruptedException { + feature = new MyLockingFeature() { + private boolean madeLock = false; + + @Override + protected SimpleLock makeLock(LockState waiting, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + if (madeLock) { + return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, feature); + } + + madeLock = true; + + return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, feature) { + private static final long serialVersionUID = 1L; + + @Override + public boolean expired(long currentMs) { + // remove the lock from the map and add a new lock + free(); + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + return true; + } + }; + } + }; + + feature.start(); + + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(1); + + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any()); + + Runnable checker = captor.getValue(); + + checker.run(); + + // lock should not be available now + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(3); + verify(callback).lockUnavailable(any()); + } + + @Test + public void testGetThreadPool() { + // use a real feature + feature = new SimpleLockManager(engine, new Properties()); + + // load properties + feature.start(); + + // should create thread pool + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // should shut down thread pool + feature.stop(); + } + + @Test + public void testSimpleLockNoArgs() { + SimpleLock lock = new SimpleLock(); + assertNull(lock.getResourceId()); + assertNull(lock.getOwnerKey()); + assertNull(lock.getCallback()); + assertEquals(0, lock.getHoldSec()); + + assertEquals(0, lock.getHoldUntilMs()); + } + + @Test + public void testSimpleLockSimpleLock() { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertEquals(RESOURCE, lock.getResourceId()); + assertEquals(OWNER_KEY, lock.getOwnerKey()); + assertSame(callback, lock.getCallback()); + assertEquals(HOLD_SEC, lock.getHoldSec()); + + assertThatIllegalArgumentException() + .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false)) + .withMessageContaining("holdSec"); + } + + @Test + public void testSimpleLockSerializable() throws Exception { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + lock = roundTrip(lock); + + assertTrue(lock.isActive()); + + assertEquals(RESOURCE, lock.getResourceId()); + assertEquals(OWNER_KEY, lock.getOwnerKey()); + assertNull(lock.getCallback()); + assertEquals(HOLD_SEC, lock.getHoldSec()); + } + + @Test + public void testSimpleLockExpired() { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + lock.grant(); + + assertFalse(lock.expired(testTime.getMillis())); + assertFalse(lock.expired(testTime.getMillis() + HOLD_MS - 1)); + assertTrue(lock.expired(testTime.getMillis() + HOLD_MS)); + } + + /** + * Tests grant() when the lock is already unavailable. + */ + @Test + public void testSimpleLockGrantUnavailable() { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + lock.setState(LockState.UNAVAILABLE); + lock.grant(); + + assertTrue(lock.isUnavailable()); + verify(callback, never()).lockAvailable(any()); + verify(callback, never()).lockUnavailable(any()); + } + + @Test + public void testSimpleLockFree() { + final SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // lock2 should be denied + SimpleLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(2); + verify(callback, never()).lockAvailable(lock2); + verify(callback).lockUnavailable(lock2); + + // lock2 was denied, so nothing new should happen when freed + assertFalse(lock2.free()); + invokeCallback(2); + + // force lock2 to be active - still nothing should happen + Whitebox.setInternalState(lock2, "state", LockState.ACTIVE); + assertFalse(lock2.free()); + invokeCallback(2); + + // now free the first lock + assertTrue(lock.free()); + assertEquals(LockState.UNAVAILABLE, lock.getState()); + + // should be able to get the lock now + SimpleLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock3.isActive()); + } + + /** + * Tests that free() works on a serialized lock with a new feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testSimpleLockFreeSerialized() throws Exception { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + feature = new MyLockingFeature(); + feature.start(); + + lock = roundTrip(lock); + assertTrue(lock.free()); + assertTrue(lock.isUnavailable()); + } + + /** + * Tests free() on a serialized lock without a feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testSimpleLockFreeNoFeature() throws Exception { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + SimpleLockManager.setLatestInstance(null); + + lock = roundTrip(lock); + assertFalse(lock.free()); + assertTrue(lock.isUnavailable()); + } + + @Test + public void testSimpleLockExtend() { + final SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // lock2 should be denied + SimpleLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + invokeCallback(2); + verify(callback, never()).lockAvailable(lock2); + verify(callback).lockUnavailable(lock2); + + // lock2 will still be denied + lock2.extend(HOLD_SEC, callback); + invokeCallback(3); + verify(callback, times(2)).lockUnavailable(lock2); + + // force lock2 to be active - should still be denied + Whitebox.setInternalState(lock2, "state", LockState.ACTIVE); + lock2.extend(HOLD_SEC, callback); + invokeCallback(4); + verify(callback, times(3)).lockUnavailable(lock2); + + assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback)) + .withMessageContaining("holdSec"); + + // now extend the first lock + lock.extend(HOLD_SEC2, callback); + assertEquals(HOLD_SEC2, lock.getHoldSec()); + assertEquals(testTime.getMillis() + HOLD_MS2, lock.getHoldUntilMs()); + invokeCallback(5); + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + } + + /** + * Tests that extend() works on a serialized lock with a new feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testSimpleLockExtendSerialized() throws Exception { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + feature = new MyLockingFeature(); + feature.start(); + + lock = roundTrip(lock); + LockCallback scallback = mock(LockCallback.class); + + lock.extend(HOLD_SEC, scallback); + assertTrue(lock.isActive()); + + invokeCallback(1); + verify(scallback).lockAvailable(lock); + verify(scallback, never()).lockUnavailable(lock); + } + + /** + * Tests extend() on a serialized lock without a feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testSimpleLockExtendNoFeature() throws Exception { + SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + SimpleLockManager.setLatestInstance(null); + + lock = roundTrip(lock); + LockCallback scallback = mock(LockCallback.class); + + lock.extend(HOLD_SEC, scallback); + assertTrue(lock.isUnavailable()); + + invokeCallback(1); + verify(scallback, never()).lockAvailable(lock); + verify(scallback).lockUnavailable(lock); + } + + @Test + public void testSimpleLockToString() { + String text = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString(); + assertNotNull(text); + assertThat(text).contains("holdUntil").doesNotContain("ownerInfo").doesNotContain("callback"); + } + + /** + * Performs a multi-threaded test of the locking facility. + * + * @throws InterruptedException if the current thread is interrupted while waiting for + * the background threads to complete + */ + @Test + public void testMultiThreaded() throws InterruptedException { + Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, testTime); + feature = new SimpleLockManager(PolicyEngineConstants.getManager(), new Properties()); + feature.start(); + + List<MyThread> threads = new ArrayList<>(MAX_THREADS); + for (int x = 0; x < MAX_THREADS; ++x) { + threads.add(new MyThread()); + } + + threads.forEach(Thread::start); + + for (MyThread thread : threads) { + thread.join(6000); + assertFalse(thread.isAlive()); + } + + for (MyThread thread : threads) { + if (thread.err != null) { + throw thread.err; + } + } + + assertTrue(nsuccesses.get() > 0); + } + + private SimpleLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback, + boolean waitForLock) { + return (SimpleLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock); + } + + private SimpleLock roundTrip(SimpleLock lock) throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(lock); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + try (ObjectInputStream ois = new ObjectInputStream(bais)) { + return (SimpleLock) ois.readObject(); + } + } + + /** + * Invokes the last call-back in the work queue. + * + * @param nexpected number of call-backs expected in the work queue + */ + private void invokeCallback(int nexpected) { + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(nexpected)).execute(captor.capture()); + + if (nexpected > 0) { + captor.getAllValues().get(nexpected - 1).run(); + } + } + + /** + * Feature that uses <i>exsvc</i> to execute requests. + */ + private class MyLockingFeature extends SimpleLockManager { + + public MyLockingFeature() { + this(engine, new Properties()); + } + + public MyLockingFeature(PolicyEngine engine, Properties props) { + super(engine, props); + + exsvc = mock(ScheduledExecutorService.class); + when(engine.getExecutorService()).thenReturn(exsvc); + + when(exsvc.scheduleWithFixedDelay(any(), anyLong(), anyLong(), any())).thenAnswer(answer -> { + return future; + }); + } + } + + /** + * Thread used with the multi-threaded test. It repeatedly attempts to get a lock, + * extend it, and then unlock it. + */ + private class MyThread extends Thread { + AssertionError err = null; + + public MyThread() { + setDaemon(true); + } + + @Override + public void run() { + try { + for (int x = 0; x < MAX_LOOPS; ++x) { + makeAttempt(); + } + + } catch (AssertionError e) { + err = e; + } + } + + private void makeAttempt() { + try { + Semaphore sem = new Semaphore(0); + + LockCallback cb = new LockCallback() { + @Override + public void lockAvailable(Lock lock) { + sem.release(); + } + + @Override + public void lockUnavailable(Lock lock) { + sem.release(); + } + }; + + Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false); + + // wait for callback, whether available or unavailable + assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS)); + if (!lock.isActive()) { + return; + } + + nsuccesses.incrementAndGet(); + + assertEquals(1, nactive.incrementAndGet()); + + lock.extend(HOLD_SEC2, cb); + assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS)); + assertTrue(lock.isActive()); + + // decrement BEFORE free() + nactive.decrementAndGet(); + + assertTrue(lock.free()); + assertTrue(lock.isUnavailable()); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("interrupted", e); + } + } + } +} |