diff options
Diffstat (limited to 'feature-distributed-locking')
15 files changed, 2998 insertions, 1060 deletions
diff --git a/feature-distributed-locking/pom.xml b/feature-distributed-locking/pom.xml index af777efa..1b6063b0 100644 --- a/feature-distributed-locking/pom.xml +++ b/feature-distributed-locking/pom.xml @@ -2,7 +2,7 @@ ============LICENSE_START======================================================= ONAP Policy Engine - Drools PDP ================================================================================ - Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. ================================================================================ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -117,6 +117,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.powermock</groupId> <artifactId>powermock-api-mockito</artifactId> <scope>test</scope> diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java new file mode 100644 index 00000000..523c0d93 --- /dev/null +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java @@ -0,0 +1,936 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distributed.locking; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.commons.dbcp2.BasicDataSourceFactory; +import org.onap.policy.common.utils.network.NetworkUtil; +import org.onap.policy.drools.core.lock.AlwaysFailLock; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockImpl; +import org.onap.policy.drools.core.lock.LockState; +import org.onap.policy.drools.core.lock.PolicyResourceLockManager; +import org.onap.policy.drools.features.PolicyEngineFeatureApi; +import org.onap.policy.drools.persistence.SystemPersistenceConstants; +import org.onap.policy.drools.system.PolicyEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Distributed implementation of the Lock Feature. Maintains locks across servers using a + * shared DB. + * + * <p/> + * Note: this implementation does <i>not</i> honor the waitForLocks={@code true} + * parameter. + * + * <p/> + * Additional Notes: + * <dl> + * <li>The <i>owner</i> field in the DB is not derived from the lock's owner info, but is + * instead populated with the {@link #uuidString}.</li> + * <li>A periodic check of the DB is made to determine if any of the locks have + * expired.</li> + * <li>When a lock is deserialized, it will not initially appear in this feature's map; it + * will be added to the map once free() or extend() is invoked, provided there isn't + * already an entry. In addition, it initially has the host and UUID of the feature + * instance that created it. However, as soon as doExtend() completes successfully, the + * host and UUID of the lock will be updated to reflect the values within this feature + * instance.</li> + * </dl> + */ +public class DistributedLockManager + implements PolicyResourceLockManager, PolicyEngineFeatureApi { + + private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class); + + private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking"; + private static final String LOCK_LOST_MSG = "lock lost"; + private static final String NOT_LOCKED_MSG = "not locked"; + + @Getter(AccessLevel.PROTECTED) + @Setter(AccessLevel.PROTECTED) + private static DistributedLockManager latestInstance = null; + + + /** + * Name of the host on which this JVM is running. + */ + @Getter + private final String hostName; + + /** + * UUID of this object. + */ + @Getter + private final String uuidString = UUID.randomUUID().toString(); + + /** + * Maps a resource to the lock that owns it, or is awaiting a request for it. Once a + * lock is added to the map, it remains in the map until the lock is lost or until the + * unlock request completes. + */ + private final Map<String, DistributedLock> resource2lock = new ConcurrentHashMap<>(); + + /** + * Engine with which this manager is associated. + */ + private PolicyEngine engine; + + /** + * Feature properties. + */ + private DistributedLockProperties featProps; + + /** + * Thread pool used to check for lock expiration and to notify owners when locks are + * granted or lost. + */ + private ScheduledExecutorService exsvc = null; + + /** + * Data source used to connect to the DB. + */ + private BasicDataSource dataSource = null; + + + /** + * Constructs the object. + */ + public DistributedLockManager() { + this.hostName = NetworkUtil.getHostname(); + } + + @Override + public int getSequenceNumber() { + return 1000; + } + + @Override + public boolean isAlive() { + return (exsvc != null); + } + + @Override + public boolean start() { + // handled via engine API + return true; + } + + @Override + public boolean stop() { + // handled via engine API + return true; + } + + @Override + public void shutdown() { + // handled via engine API + } + + @Override + public boolean isLocked() { + return false; + } + + @Override + public boolean lock() { + return true; + } + + @Override + public boolean unlock() { + return true; + } + + @Override + public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) { + + try { + this.engine = engine; + this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME)); + this.exsvc = getThreadPool(); + this.dataSource = makeDataSource(); + + return this; + + } catch (Exception e) { + throw new DistributedLockManagerException(e); + } + } + + @Override + public boolean afterStart(PolicyEngine engine) { + + try { + exsvc.execute(this::deleteExpiredDbLocks); + exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS); + + setLatestInstance(this); + + } catch (Exception e) { + throw new DistributedLockManagerException(e); + } + + return false; + } + + /** + * Make data source. + * + * @return a new, pooled data source + * @throws Exception exception + */ + protected BasicDataSource makeDataSource() throws Exception { + Properties props = new Properties(); + props.put("driverClassName", featProps.getDbDriver()); + props.put("url", featProps.getDbUrl()); + props.put("username", featProps.getDbUser()); + props.put("password", featProps.getDbPwd()); + props.put("testOnBorrow", "true"); + props.put("poolPreparedStatements", "true"); + + // additional properties are listed in the GenericObjectPool API + + return BasicDataSourceFactory.createDataSource(props); + } + + /** + * Deletes expired locks from the DB. + */ + private void deleteExpiredDbLocks() { + logger.info("deleting all expired locks from the DB"); + + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn + .prepareStatement("DELETE FROM pooling.locks WHERE expirationTime <= now()")) { + + int ndel = stmt.executeUpdate(); + logger.info("deleted {} expired locks from the DB", ndel); + + } catch (SQLException e) { + logger.warn("failed to delete expired locks from the DB", e); + } + } + + /** + * Closes the data source. Does <i>not</i> invoke any lock call-backs. + */ + @Override + public boolean afterStop(PolicyEngine engine) { + exsvc = null; + closeDataSource(); + return false; + } + + /** + * Closes {@link #dataSource} and sets it to {@code null}. + */ + private void closeDataSource() { + try { + if (dataSource != null) { + dataSource.close(); + } + + } catch (SQLException e) { + logger.error("cannot close the distributed locking DB", e); + } + + dataSource = null; + } + + @Override + public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback, + boolean waitForLock) { + + if (latestInstance != this) { + AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback); + lock.notifyUnavailable(); + return lock; + } + + DistributedLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback); + + DistributedLock existingLock = resource2lock.putIfAbsent(resourceId, lock); + + // do these outside of compute() to avoid blocking other map operations + if (existingLock == null) { + logger.debug("added lock to map {}", lock); + lock.scheduleRequest(lock::doLock); + } else { + lock.deny("resource is busy", true); + } + + return lock; + } + + /** + * Checks for expired locks. + */ + private void checkExpired() { + + try { + logger.info("checking for expired locks"); + Set<String> expiredIds = new HashSet<>(resource2lock.keySet()); + identifyDbLocks(expiredIds); + expireLocks(expiredIds); + + exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS); + + } catch (RejectedExecutionException e) { + logger.warn("thread pool is no longer accepting requests", e); + + } catch (SQLException | RuntimeException e) { + logger.error("error checking expired locks", e); + exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS); + } + + logger.info("done checking for expired locks"); + } + + /** + * Identifies this feature instance's locks that the DB indicates are still active. + * + * @param expiredIds IDs of resources that have expired locks. If a resource is still + * locked, it's ID is removed from this set + * @throws SQLException if a DB error occurs + */ + private void identifyDbLocks(Set<String> expiredIds) throws SQLException { + /* + * We could query for host and UUIDs that actually appear within the locks, but + * those might change while the query is running so no real value in doing that. + * On the other hand, there's only a brief instance between the time a + * deserialized lock is added to this feature instance and its doExtend() method + * updates its host and UUID to match this feature instance. If this happens to + * run during that brief instance, then the lock will be lost and the callback + * invoked. It isn't worth complicating this code further to handle those highly + * unlikely cases. + */ + + // @formatter:off + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement( + "SELECT resourceId FROM pooling.locks WHERE host=? AND owner=? AND expirationTime > now()")) { + // @formatter:on + + stmt.setString(1, hostName); + stmt.setString(2, uuidString); + + try (ResultSet resultSet = stmt.executeQuery()) { + while (resultSet.next()) { + String resourceId = resultSet.getString(1); + + // we have now seen this resource id + expiredIds.remove(resourceId); + } + } + } + } + + /** + * Expires locks for the resources that no longer appear within the DB. + * + * @param expiredIds IDs of resources that have expired locks + */ + private void expireLocks(Set<String> expiredIds) { + for (String resourceId : expiredIds) { + AtomicReference<DistributedLock> lockref = new AtomicReference<>(null); + + resource2lock.computeIfPresent(resourceId, (key, lock) -> { + if (lock.isActive()) { + // it thinks it's active, but it isn't - remove from the map + lockref.set(lock); + return null; + } + + return lock; + }); + + DistributedLock lock = lockref.get(); + if (lock != null) { + logger.debug("removed lock from map {}", lock); + lock.deny(LOCK_LOST_MSG, false); + } + } + } + + /** + * Distributed Lock implementation. + */ + public static class DistributedLock extends LockImpl { + private static final long serialVersionUID = 1L; + + /** + * Feature containing this lock. May be {@code null} until the feature is + * identified. Note: this can only be null if the lock has been de-serialized. + */ + private transient DistributedLockManager feature; + + /** + * Host name from the feature instance that created this object. Replaced with the + * host name from the current feature instance whenever the lock is successfully + * extended. + */ + private String hostName; + + /** + * UUID string from the feature instance that created this object. Replaced with + * the UUID string from the current feature instance whenever the lock is + * successfully extended. + */ + private String uuidString; + + /** + * {@code True} if the lock is busy making a request, {@code false} otherwise. + */ + private transient boolean busy = false; + + /** + * Request to be performed. + */ + private transient RunnableWithEx request = null; + + /** + * Number of times we've retried a request. + */ + private transient int nretries = 0; + + /** + * Constructs the object. + */ + public DistributedLock() { + this.hostName = ""; + this.uuidString = ""; + } + + /** + * Constructs the object. + * + * @param state initial state of the lock + * @param resourceId identifier of the resource to be locked + * @param ownerKey information identifying the owner requesting the lock + * @param holdSec amount of time, in seconds, for which the lock should be held, + * after which it will automatically be released + * @param callback callback to be invoked once the lock is granted, or + * subsequently lost; must not be {@code null} + * @param feature feature containing this lock + */ + public DistributedLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback, + DistributedLockManager feature) { + super(state, resourceId, ownerKey, holdSec, callback); + + this.feature = feature; + this.hostName = feature.hostName; + this.uuidString = feature.uuidString; + } + + /** + * Grants this lock. The notification is <i>always</i> invoked via the + * <i>foreground</i> thread. + */ + protected void grant() { + synchronized (this) { + if (isUnavailable()) { + return; + } + + setState(LockState.ACTIVE); + } + + logger.info("lock granted: {}", this); + + notifyAvailable(); + } + + /** + * Permanently denies this lock. + * + * @param reason the reason the lock was denied + * @param foreground {@code true} if the callback can be invoked in the current + * (i.e., foreground) thread, {@code false} if it should be invoked via the + * executor + */ + protected void deny(String reason, boolean foreground) { + synchronized (this) { + setState(LockState.UNAVAILABLE); + } + + logger.info("{}: {}", reason, this); + + if (feature == null || foreground) { + notifyUnavailable(); + + } else { + feature.exsvc.execute(this::notifyUnavailable); + } + } + + @Override + public boolean free() { + // do a quick check of the state + if (isUnavailable()) { + return false; + } + + logger.info("releasing lock: {}", this); + + if (!attachFeature()) { + setState(LockState.UNAVAILABLE); + return false; + } + + AtomicBoolean result = new AtomicBoolean(false); + + feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> { + if (curlock == this && !isUnavailable()) { + // this lock was the owner + result.set(true); + setState(LockState.UNAVAILABLE); + + /* + * NOTE: do NOT return null; curlock must remain until doUnlock + * completes. + */ + } + + return curlock; + }); + + if (result.get()) { + scheduleRequest(this::doUnlock); + return true; + } + + return false; + } + + @Override + public void extend(int holdSec, LockCallback callback) { + if (holdSec < 0) { + throw new IllegalArgumentException("holdSec is negative"); + } + + setHoldSec(holdSec); + setCallback(callback); + + // do a quick check of the state + if (isUnavailable() || !attachFeature()) { + deny(LOCK_LOST_MSG, true); + return; + } + + AtomicBoolean success = new AtomicBoolean(false); + + feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> { + if (curlock == this && !isUnavailable()) { + success.set(true); + setState(LockState.WAITING); + } + + // note: leave it in the map until doUnlock() removes it + + return curlock; + }); + + if (success.get()) { + scheduleRequest(this::doExtend); + + } else { + deny(NOT_LOCKED_MSG, true); + } + } + + /** + * Attaches to the feature instance, if not already attached. + * + * @return {@code true} if the lock is now attached to a feature, {@code false} + * otherwise + */ + private synchronized boolean attachFeature() { + if (feature != null) { + // already attached + return true; + } + + feature = latestInstance; + if (feature == null) { + logger.warn("no feature yet for {}", this); + return false; + } + + // put this lock into the map + feature.resource2lock.putIfAbsent(getResourceId(), this); + + return true; + } + + /** + * Schedules a request for execution. + * + * @param schedreq the request that should be scheduled + */ + private synchronized void scheduleRequest(RunnableWithEx schedreq) { + logger.debug("schedule lock action {}", this); + nretries = 0; + request = schedreq; + feature.exsvc.execute(this::doRequest); + } + + /** + * Reschedules a request for execution, if there is not already a request in the + * queue, and if the retry count has not been exhausted. + * + * @param req request to be rescheduled + */ + private void rescheduleRequest(RunnableWithEx req) { + synchronized (this) { + if (request != null) { + // a new request has already been scheduled - it supersedes "req" + logger.debug("not rescheduling lock action {}", this); + return; + } + + if (nretries++ < feature.featProps.getMaxRetries()) { + logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this); + request = req; + feature.exsvc.schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS); + return; + } + } + + logger.warn("retry count {} exhausted for lock: {}", feature.featProps.getMaxRetries(), this); + removeFromMap(); + } + + /** + * Gets, and removes, the next request from the queue. Clears {@link #busy} if + * there are no more requests in the queue. + * + * @param prevReq the previous request that was just run + * + * @return the next request, or {@code null} if the queue is empty + */ + private synchronized RunnableWithEx getNextRequest(RunnableWithEx prevReq) { + if (request == null || request == prevReq) { + logger.debug("no more requests for {}", this); + busy = false; + return null; + } + + RunnableWithEx req = request; + request = null; + + return req; + } + + /** + * Executes the current request, if none are currently executing. + */ + private void doRequest() { + synchronized (this) { + if (busy) { + // another thread is already processing the request(s) + return; + } + busy = true; + } + + /* + * There is a race condition wherein this thread could invoke run() while the + * next scheduled thread checks the busy flag and finds that work is being + * done and returns, leaving the next work item in "request". In that case, + * the next work item may never be executed, thus we use a loop here, instead + * of just executing a single request. + */ + RunnableWithEx req = null; + while ((req = getNextRequest(req)) != null) { + if (feature.resource2lock.get(getResourceId()) != this) { + /* + * no longer in the map - don't apply the action, as it may interfere + * with any newly added Lock object + */ + logger.debug("discard lock action {}", this); + synchronized (this) { + busy = false; + } + return; + } + + try { + /* + * Run the request. If it throws an exception, then it will be + * rescheduled for execution a little later. + */ + req.run(); + + } catch (SQLException e) { + logger.warn("request failed for lock: {}", this, e); + + if (feature.featProps.isTransient(e.getErrorCode())) { + // retry the request a little later + rescheduleRequest(req); + } else { + removeFromMap(); + } + + } catch (RuntimeException e) { + logger.warn("request failed for lock: {}", this, e); + removeFromMap(); + } + } + } + + /** + * Attempts to add a lock to the DB. Generates a callback, indicating success or + * failure. + * + * @throws SQLException if a DB error occurs + */ + private void doLock() throws SQLException { + if (!isWaiting()) { + logger.debug("discard doLock {}", this); + return; + } + + /* + * There is a small window in which a client could invoke free() before the DB + * is updated. In that case, doUnlock will be added to the queue to run after + * this, which will delete the record, as desired. In addition, grant() will + * not do anything, because the lock state will have been set to UNAVAILABLE + * by free(). + */ + + logger.debug("doLock {}", this); + try (Connection conn = feature.dataSource.getConnection()) { + boolean success = false; + try { + success = doDbInsert(conn); + + } catch (SQLException e) { + logger.info("failed to insert lock record - attempting update: {}", this, e); + success = doDbUpdate(conn); + } + + if (success) { + grant(); + return; + } + } + + removeFromMap(); + } + + /** + * Attempts to remove a lock from the DB. Does <i>not</i> generate a callback if + * it fails, as this should only be executed in response to a call to + * {@link #free()}. + * + * @throws SQLException if a DB error occurs + */ + private void doUnlock() throws SQLException { + logger.debug("unlock {}", this); + try (Connection conn = feature.dataSource.getConnection()) { + doDbDelete(conn); + } + + removeFromMap(); + } + + /** + * Attempts to extend a lock in the DB. Generates a callback, indicating success + * or failure. + * + * @throws SQLException if a DB error occurs + */ + private void doExtend() throws SQLException { + if (!isWaiting()) { + logger.debug("discard doExtend {}", this); + return; + } + + /* + * There is a small window in which a client could invoke free() before the DB + * is updated. In that case, doUnlock will be added to the queue to run after + * this, which will delete the record, as desired. In addition, grant() will + * not do anything, because the lock state will have been set to UNAVAILABLE + * by free(). + */ + + logger.debug("doExtend {}", this); + try (Connection conn = feature.dataSource.getConnection()) { + /* + * invoker may have called extend() before free() had a chance to insert + * the record, thus we have to try to insert, if the update fails + */ + if (doDbUpdate(conn) || doDbInsert(conn)) { + grant(); + return; + } + } + + removeFromMap(); + } + + /** + * Inserts the lock into the DB. + * + * @param conn DB connection + * @return {@code true} if a record was successfully inserted, {@code false} + * otherwise + * @throws SQLException if a DB error occurs + */ + protected boolean doDbInsert(Connection conn) throws SQLException { + logger.debug("insert lock record {}", this); + try (PreparedStatement stmt = + conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) " + + "values (?, ?, ?, timestampadd(second, ?, now()))")) { + + stmt.setString(1, getResourceId()); + stmt.setString(2, feature.hostName); + stmt.setString(3, feature.uuidString); + stmt.setInt(4, getHoldSec()); + + stmt.executeUpdate(); + + this.hostName = feature.hostName; + this.uuidString = feature.uuidString; + + return true; + } + } + + /** + * Updates the lock in the DB. + * + * @param conn DB connection + * @return {@code true} if a record was successfully updated, {@code false} + * otherwise + * @throws SQLException if a DB error occurs + */ + protected boolean doDbUpdate(Connection conn) throws SQLException { + logger.debug("update lock record {}", this); + try (PreparedStatement stmt = + conn.prepareStatement("UPDATE pooling.locks SET resourceId=?, host=?, owner=?," + + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?" + + " AND ((host=? AND owner=?) OR expirationTime < now())")) { + + stmt.setString(1, getResourceId()); + stmt.setString(2, feature.hostName); + stmt.setString(3, feature.uuidString); + stmt.setInt(4, getHoldSec()); + + stmt.setString(5, getResourceId()); + stmt.setString(6, this.hostName); + stmt.setString(7, this.uuidString); + + if (stmt.executeUpdate() != 1) { + return false; + } + + this.hostName = feature.hostName; + this.uuidString = feature.uuidString; + + return true; + } + } + + /** + * Deletes the lock from the DB. + * + * @param conn DB connection + * @throws SQLException if a DB error occurs + */ + protected void doDbDelete(Connection conn) throws SQLException { + logger.debug("delete lock record {}", this); + try (PreparedStatement stmt = + conn.prepareStatement("DELETE pooling.locks WHERE resourceId=? AND host=? AND owner=?")) { + + stmt.setString(1, getResourceId()); + stmt.setString(2, this.hostName); + stmt.setString(3, this.uuidString); + + stmt.executeUpdate(); + } + } + + /** + * Removes the lock from the map, and sends a notification using the current + * thread. + */ + private void removeFromMap() { + logger.debug("remove lock from map {}", this); + feature.resource2lock.remove(getResourceId(), this); + + synchronized (this) { + if (!isUnavailable()) { + deny(LOCK_LOST_MSG, true); + } + } + } + + @Override + public String toString() { + return "DistributedLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey=" + + getOwnerKey() + ", holdSec=" + getHoldSec() + ", hostName=" + hostName + ", uuidString=" + + uuidString + "]"; + } + } + + @FunctionalInterface + private static interface RunnableWithEx { + void run() throws SQLException; + } + + // these may be overridden by junit tests + + protected Properties getProperties(String fileName) { + return SystemPersistenceConstants.getManager().getProperties(fileName); + } + + protected ScheduledExecutorService getThreadPool() { + return engine.getExecutorService(); + } + + protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this); + } +} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java index 55fc4fab..e720f9a1 100644 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * feature-distributed-locking * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,15 +20,15 @@ package org.onap.policy.distributed.locking; -public class DistributedLockingFeatureException extends RuntimeException { +public class DistributedLockManagerException extends RuntimeException { private static final long serialVersionUID = 1L; /** * Constructor. - * + * * @param ex exception to be wrapped */ - public DistributedLockingFeatureException(Exception ex) { + public DistributedLockManagerException(Exception ex) { super(ex); } } diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java new file mode 100644 index 00000000..f470c8e2 --- /dev/null +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java @@ -0,0 +1,136 @@ +/* + * ============LICENSE_START======================================================= + * feature-distributed-locking + * ================================================================================ + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distributed.locking; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.utils.properties.BeanConfigurator; +import org.onap.policy.common.utils.properties.Property; +import org.onap.policy.common.utils.properties.exception.PropertyException; + + +@Getter +@Setter +public class DistributedLockProperties { + public static final String PREFIX = "distributed.locking."; + + public static final String DB_DRIVER = "javax.persistence.jdbc.driver"; + public static final String DB_URL = "javax.persistence.jdbc.url"; + public static final String DB_USER = "javax.persistence.jdbc.user"; + public static final String DB_PASS = "javax.persistence.jdbc.password"; + public static final String TRANSIENT_ERROR_CODES = PREFIX + "transient.error.codes"; + public static final String EXPIRE_CHECK_SEC = PREFIX + "expire.check.seconds"; + public static final String RETRY_SEC = PREFIX + "retry.seconds"; + public static final String MAX_RETRIES = PREFIX + "max.retries"; + + /** + * Database driver. + */ + @Property(name = DB_DRIVER) + private String dbDriver; + + /** + * Database url. + */ + @Property(name = DB_URL) + private String dbUrl; + + /** + * Database user. + */ + @Property(name = DB_USER) + private String dbUser; + + /** + * Database password. + */ + @Property(name = DB_PASS) + private String dbPwd; + + /** + * Vendor-specific error codes that are "transient", meaning they may go away if the + * command is repeated (e.g., connection issue), as opposed to something like a syntax + * error or a duplicate key. + */ + @Property(name = TRANSIENT_ERROR_CODES) + private String errorCodeStrings; + + private final Set<Integer> transientErrorCodes; + + /** + * Time, in seconds, to wait between checks for expired locks. + */ + @Property(name = EXPIRE_CHECK_SEC, defaultValue = "900") + private int expireCheckSec; + + /** + * Number of seconds to wait before retrying, after a DB error. + */ + @Property(name = RETRY_SEC, defaultValue = "60") + private int retrySec; + + /** + * Maximum number of times to retry a DB operation. + */ + @Property(name = MAX_RETRIES, defaultValue = "2") + private int maxRetries; + + /** + * Constructs the object, populating fields from the properties. + * + * @param props properties from which to configure this + * @throws PropertyException if an error occurs + */ + public DistributedLockProperties(Properties props) throws PropertyException { + new BeanConfigurator().configureFromProperties(this, props); + + Set<Integer> set = new HashSet<>(); + for (String text : errorCodeStrings.split(",")) { + text = text.trim(); + if (text.isEmpty()) { + continue; + } + + try { + set.add(Integer.valueOf(text)); + + } catch (NumberFormatException e) { + throw new PropertyException(TRANSIENT_ERROR_CODES, "errorCodeStrings", e); + } + } + + transientErrorCodes = Collections.unmodifiableSet(set); + } + + /** + * Determines if an error is transient. + * + * @param errorCode error code to check + * @return {@code true} if the error is transient, {@code false} otherwise + */ + public boolean isTransient(int errorCode) { + return transientErrorCodes.contains(errorCode); + } +} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java deleted file mode 100644 index d5e07a30..00000000 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * feature-distributed-locking - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.distributed.locking; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.Properties; -import java.util.UUID; -import org.apache.commons.dbcp2.BasicDataSource; -import org.apache.commons.dbcp2.BasicDataSourceFactory; -import org.onap.policy.common.utils.properties.exception.PropertyException; -import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi; -import org.onap.policy.drools.features.PolicyEngineFeatureApi; -import org.onap.policy.drools.persistence.SystemPersistenceConstants; -import org.onap.policy.drools.system.PolicyEngine; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DistributedLockingFeature implements PolicyEngineFeatureApi, PolicyResourceLockFeatureApi { - - /** - * Logger instance. - */ - private static final Logger logger = LoggerFactory.getLogger(DistributedLockingFeature.class); - - /** - * Properties Configuration Name. - */ - public static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking"; - - /** - * Properties for locking feature. - */ - private DistributedLockingProperties lockProps; - - /** - * Data source used to connect to the DB containing locks. - */ - private BasicDataSource dataSource; - - /** - * UUID. - */ - private static final UUID uuid = UUID.randomUUID(); - - @Override - public int getSequenceNumber() { - return 1000; - } - - @Override - public OperResult beforeLock(String resourceId, String owner, int holdSec) { - - TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource); - - return (lock.lock(holdSec) ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); - } - - @Override - public OperResult beforeRefresh(String resourceId, String owner, int holdSec) { - - TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource); - - return (lock.refresh(holdSec) ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); - } - - @Override - public OperResult beforeUnlock(String resourceId, String owner) { - TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource); - - return (lock.unlock() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); - } - - @Override - public OperResult beforeIsLockedBy(String resourceId, String owner) { - TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource); - - return (lock.isActive() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); - } - - @Override - public OperResult beforeIsLocked(String resourceId) { - TargetLock lock = new TargetLock(resourceId, uuid, "dummyOwner", dataSource); - - return (lock.isLocked() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); - } - - @Override - public boolean afterStart(PolicyEngine engine) { - - try { - this.lockProps = new DistributedLockingProperties(SystemPersistenceConstants.getManager() - .getProperties(DistributedLockingFeature.CONFIGURATION_PROPERTIES_NAME)); - this.dataSource = makeDataSource(); - } catch (PropertyException e) { - logger.error("DistributedLockingFeature feature properies have not been loaded", e); - throw new DistributedLockingFeatureException(e); - } catch (InterruptedException e) { - logger.error("DistributedLockingFeature failed to create data source", e); - Thread.currentThread().interrupt(); - throw new DistributedLockingFeatureException(e); - } catch (Exception e) { - logger.error("DistributedLockingFeature failed to create data source", e); - throw new DistributedLockingFeatureException(e); - } - - cleanLockTable(); - - return false; - } - - /** - * Make data source. - * - * @return a new, pooled data source - * @throws Exception exception - */ - protected BasicDataSource makeDataSource() throws Exception { - Properties props = new Properties(); - props.put("driverClassName", lockProps.getDbDriver()); - props.put("url", lockProps.getDbUrl()); - props.put("username", lockProps.getDbUser()); - props.put("password", lockProps.getDbPwd()); - props.put("testOnBorrow", "true"); - props.put("poolPreparedStatements", "true"); - - // additional properties are listed in the GenericObjectPool API - - return BasicDataSourceFactory.createDataSource(props); - } - - /** - * This method kills the heartbeat thread and calls refreshLockTable which removes - * any records from the db where the current host is the owner. - */ - @Override - public boolean beforeShutdown(PolicyEngine engine) { - cleanLockTable(); - return false; - } - - /** - * This method removes all records owned by the current host from the db. - */ - private void cleanLockTable() { - - try (Connection conn = dataSource.getConnection(); - PreparedStatement statement = conn.prepareStatement( - "DELETE FROM pooling.locks WHERE host = ? OR expirationTime < now()") - ) { - - statement.setString(1, uuid.toString()); - statement.executeUpdate(); - - } catch (SQLException e) { - logger.error("error in refreshLockTable()", e); - } - - } -} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java deleted file mode 100644 index 0ed5930d..00000000 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * feature-distributed-locking - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.distributed.locking; - -import java.util.Properties; -import org.onap.policy.common.utils.properties.BeanConfigurator; -import org.onap.policy.common.utils.properties.Property; -import org.onap.policy.common.utils.properties.exception.PropertyException; - - -public class DistributedLockingProperties { - - /** - * Feature properties all begin with this prefix. - */ - public static final String PREFIX = "distributed.locking."; - - public static final String DB_DRIVER = "javax.persistence.jdbc.driver"; - public static final String DB_URL = "javax.persistence.jdbc.url"; - public static final String DB_USER = "javax.persistence.jdbc.user"; - public static final String DB_PWD = "javax.persistence.jdbc.password"; - - /** - * Properties from which this was constructed. - */ - private final Properties source; - - /** - * Database driver. - */ - @Property(name = DB_DRIVER) - private String dbDriver; - - /** - * Database url. - */ - @Property(name = DB_URL) - private String dbUrl; - - /** - * Database user. - */ - @Property(name = DB_USER) - private String dbUser; - - /** - * Database password. - */ - @Property(name = DB_PWD) - private String dbPwd; - - /** - * Constructs the object, populating fields from the properties. - * - * @param props properties from which to configure this - * @throws PropertyException if an error occurs - */ - public DistributedLockingProperties(Properties props) throws PropertyException { - source = props; - - new BeanConfigurator().configureFromProperties(this, props); - } - - - public Properties getSource() { - return source; - } - - - public String getDbDriver() { - return dbDriver; - } - - - public String getDbUrl() { - return dbUrl; - } - - - public String getDbUser() { - return dbUser; - } - - - public String getDbPwd() { - return dbPwd; - } - - - public void setDbDriver(String dbDriver) { - this.dbDriver = dbDriver; - } - - - public void setDbUrl(String dbUrl) { - this.dbUrl = dbUrl; - } - - - public void setDbUser(String dbUser) { - this.dbUser = dbUser; - } - - - public void setDbPwd(String dbPwd) { - this.dbPwd = dbPwd; - } - -} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java deleted file mode 100644 index 42e1f92f..00000000 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java +++ /dev/null @@ -1,282 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * feature-distributed-locking - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.distributed.locking; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.UUID; -import org.apache.commons.dbcp2.BasicDataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TargetLock { - - private static final Logger logger = LoggerFactory.getLogger(TargetLock.class); - - /** - * The Target resource we want to lock. - */ - private String resourceId; - - /** - * Data source used to connect to the DB containing locks. - */ - private BasicDataSource dataSource; - - /** - * UUID . - */ - private UUID uuid; - - /** - * Owner. - */ - private String owner; - - /** - * Constructs a TargetLock object. - * - * @param resourceId ID of the entity we want to lock - * @param dataSource used to connect to the DB containing locks - */ - public TargetLock(String resourceId, UUID uuid, String owner, BasicDataSource dataSource) { - this.resourceId = resourceId; - this.uuid = uuid; - this.owner = owner; - this.dataSource = dataSource; - } - - /** - * Obtain a lock. - * @param holdSec the amount of time, in seconds, that the lock should be held - */ - public boolean lock(int holdSec) { - - return grabLock(holdSec); - } - - /** - * Refresh a lock. - * - * @param holdSec the amount of time, in seconds, that the lock should be held - * @return {@code true} if the lock was refreshed, {@code false} if the resource is - * not currently locked by the given owner - */ - public boolean refresh(int holdSec) { - return updateLock(holdSec); - } - - /** - * Unlock a resource by deleting it's associated record in the db. - */ - public boolean unlock() { - return deleteLock(); - } - - /** - * "Grabs" lock by attempting to insert a new record in the db. - * If the insert fails due to duplicate key error resource is already locked - * so we call secondGrab. - * @param holdSec the amount of time, in seconds, that the lock should be held - */ - private boolean grabLock(int holdSec) { - - // try to insert a record into the table(thereby grabbing the lock) - try (Connection conn = dataSource.getConnection(); - - PreparedStatement statement = conn.prepareStatement( - "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) " - + "values (?, ?, ?, timestampadd(second, ?, now()))")) { - - int index = 1; - statement.setString(index++, this.resourceId); - statement.setString(index++, this.uuid.toString()); - statement.setString(index++, this.owner); - statement.setInt(index++, holdSec); - statement.executeUpdate(); - } - - catch (SQLException e) { - logger.error("error in TargetLock.grabLock()", e); - return secondGrab(holdSec); - } - - return true; - } - - /** - * A second attempt at grabbing a lock. It first attempts to update the lock in case it is expired. - * If that fails, it attempts to insert a new record again - * @param holdSec the amount of time, in seconds, that the lock should be held - */ - private boolean secondGrab(int holdSec) { - - try (Connection conn = dataSource.getConnection(); - - PreparedStatement updateStatement = conn.prepareStatement( - "UPDATE pooling.locks SET host = ?, owner = ?, " - + "expirationTime = timestampadd(second, ?, now()) " - + "WHERE resourceId = ? AND expirationTime < now()"); - - PreparedStatement insertStatement = conn.prepareStatement( - "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) " - + "values (?, ?, ?, timestampadd(second, ?, now()))");) { - - int index = 1; - updateStatement.setString(index++, this.uuid.toString()); - updateStatement.setString(index++, this.owner); - updateStatement.setInt(index++, holdSec); - updateStatement.setString(index++, this.resourceId); - - // The lock was expired and we grabbed it. - // return true - if (updateStatement.executeUpdate() == 1) { - return true; - } - - // If our update does not return 1 row, the lock either has not expired - // or it was removed. Try one last grab - else { - index = 1; - insertStatement.setString(index++, this.resourceId); - insertStatement.setString(index++, this.uuid.toString()); - insertStatement.setString(index++, this.owner); - insertStatement.setInt(index++, holdSec); - - // If our insert returns 1 we successfully grabbed the lock - return (insertStatement.executeUpdate() == 1); - } - - } catch (SQLException e) { - logger.error("error in TargetLock.secondGrab()", e); - return false; - } - - } - - /** - * Updates the DB record associated with the lock. - * - * @param holdSec the amount of time, in seconds, that the lock should be held - * @return {@code true} if the record was updated, {@code false} otherwise - */ - private boolean updateLock(int holdSec) { - - try (Connection conn = dataSource.getConnection(); - - PreparedStatement updateStatement = conn.prepareStatement( - "UPDATE pooling.locks SET host = ?, owner = ?, " - + "expirationTime = timestampadd(second, ?, now()) " - + "WHERE resourceId = ? AND owner = ? AND expirationTime >= now()")) { - - int index = 1; - updateStatement.setString(index++, this.uuid.toString()); - updateStatement.setString(index++, this.owner); - updateStatement.setInt(index++, holdSec); - updateStatement.setString(index++, this.resourceId); - updateStatement.setString(index++, this.owner); - - // refresh succeeded iff a record was updated - return (updateStatement.executeUpdate() == 1); - - } catch (SQLException e) { - logger.error("error in TargetLock.refreshLock()", e); - return false; - } - - } - - /** - *To remove a lock we simply delete the record from the db . - */ - private boolean deleteLock() { - - try (Connection conn = dataSource.getConnection(); - - PreparedStatement deleteStatement = conn.prepareStatement( - "DELETE FROM pooling.locks WHERE resourceId = ? AND owner = ? AND host = ?")) { - - deleteStatement.setString(1, this.resourceId); - deleteStatement.setString(2, this.owner); - deleteStatement.setString(3, this.uuid.toString()); - - return (deleteStatement.executeUpdate() == 1); - - } catch (SQLException e) { - logger.error("error in TargetLock.deleteLock()", e); - return false; - } - - } - - /** - * Is the lock active. - */ - public boolean isActive() { - try (Connection conn = dataSource.getConnection(); - - PreparedStatement selectStatement = conn.prepareStatement( - "SELECT * FROM pooling.locks " - + "WHERE resourceId = ? AND host = ? AND owner= ? AND expirationTime >= now()")) { - - selectStatement.setString(1, this.resourceId); - selectStatement.setString(2, this.uuid.toString()); - selectStatement.setString(3, this.owner); - try (ResultSet result = selectStatement.executeQuery()) { - - // This will return true if the - // query returned at least one row - return result.first(); - } - - } - - catch (SQLException e) { - logger.error("error in TargetLock.isActive()", e); - return false; - } - - } - - /** - * Is the resource locked. - */ - public boolean isLocked() { - - try (Connection conn = dataSource.getConnection(); - PreparedStatement selectStatement = conn - .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND expirationTime >= now()")) { - - selectStatement.setString(1, this.resourceId); - try (ResultSet result = selectStatement.executeQuery()) { - // This will return true if the - // query returned at least one row - return result.first(); - } - } catch (SQLException e) { - logger.error("error in TargetLock.isActive()", e); - return false; - } - } - -} diff --git a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi deleted file mode 100644 index 19bdf505..00000000 --- a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi +++ /dev/null @@ -1 +0,0 @@ -org.onap.policy.distributed.locking.DistributedLockingFeature
\ No newline at end of file diff --git a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi index 19bdf505..2a7c0547 100644 --- a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi +++ b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi @@ -1 +1 @@ -org.onap.policy.distributed.locking.DistributedLockingFeature
\ No newline at end of file +org.onap.policy.distributed.locking.DistributedLockManager
\ No newline at end of file diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureExceptionTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerExceptionTest.java index 7ba2384a..cfd6a151 100644 --- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureExceptionTest.java +++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerExceptionTest.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * feature-distributed-locking * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -24,13 +24,13 @@ import static org.junit.Assert.assertEquals; import org.junit.Test; import org.onap.policy.common.utils.test.ExceptionsTester; -import org.onap.policy.distributed.locking.DistributedLockingFeatureException; +import org.onap.policy.distributed.locking.DistributedLockManagerException; -public class DistributedLockingFeatureExceptionTest extends ExceptionsTester { +public class DistributedLockManagerExceptionTest extends ExceptionsTester { @Test public void test() { - assertEquals(1, test(DistributedLockingFeatureException.class)); + assertEquals(1, test(DistributedLockManagerException.class)); } } diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java new file mode 100644 index 00000000..59b56224 --- /dev/null +++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java @@ -0,0 +1,1818 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distributed.locking; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.dbcp2.BasicDataSource; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.common.utils.services.OrderedServiceImpl; +import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockState; +import org.onap.policy.drools.features.PolicyEngineFeatureApi; +import org.onap.policy.drools.persistence.SystemPersistenceConstants; +import org.onap.policy.drools.system.PolicyEngine; +import org.onap.policy.drools.system.PolicyEngineConstants; +import org.powermock.reflect.Whitebox; + +public class DistributedLockManagerTest { + private static final long EXPIRE_SEC = 900L; + private static final long RETRY_SEC = 60L; + private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService"; + private static final String OTHER_HOST = "other-host"; + private static final String OTHER_OWNER = "other-owner"; + private static final String EXPECTED_EXCEPTION = "expected exception"; + private static final String DB_CONNECTION = + "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling"; + private static final String DB_USER = "user"; + private static final String DB_PASSWORD = "password"; + private static final String OWNER_KEY = "my key"; + private static final String RESOURCE = "my resource"; + private static final String RESOURCE2 = "my resource #2"; + private static final String RESOURCE3 = "my resource #3"; + private static final String RESOURCE4 = "my resource #4"; + private static final String RESOURCE5 = "my resource #5"; + private static final int HOLD_SEC = 100; + private static final int HOLD_SEC2 = 120; + private static final int MAX_THREADS = 5; + private static final int MAX_LOOPS = 100; + private static final int TRANSIENT = 500; + private static final int PERMANENT = 600; + + // number of execute() calls before the first lock attempt + private static final int PRE_LOCK_EXECS = 1; + + // number of execute() calls before the first schedule attempt + private static final int PRE_SCHED_EXECS = 1; + + private static Connection conn = null; + private static ScheduledExecutorService saveExec; + private static ScheduledExecutorService realExec; + + @Mock + private ScheduledExecutorService exsvc; + + @Mock + private LockCallback callback; + + @Mock + private BasicDataSource datasrc; + + @Mock + private PolicyEngine engine; + + private DistributedLock lock; + + private AtomicInteger nactive; + private AtomicInteger nsuccesses; + private DistributedLockManager feature; + + + /** + * Configures the location of the property files and creates the DB. + * + * @throws SQLException if the DB cannot be created + */ + @BeforeClass + public static void setUpBeforeClass() throws SQLException { + SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources"); + + conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD); + + try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks " + + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), " + + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) { + createStmt.executeUpdate(); + } + + saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD); + + realExec = Executors.newScheduledThreadPool(3); + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec); + } + + /** + * Restores static fields. + */ + @AfterClass + public static void tearDownAfterClass() throws SQLException { + Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec); + realExec.shutdown(); + conn.close(); + } + + /** + * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute + * tasks. + * + * @throws SQLException if the lock records cannot be deleted from the DB + */ + @Before + public void setUp() throws SQLException { + MockitoAnnotations.initMocks(this); + + nactive = new AtomicInteger(0); + nsuccesses = new AtomicInteger(0); + + cleanDb(); + + when(engine.getExecutorService()).thenReturn(exsvc); + + feature = new MyLockingFeature(true); + } + + @After + public void tearDown() throws SQLException { + shutdownFeature(); + cleanDb(); + } + + private void cleanDb() throws SQLException { + try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) { + stmt.executeUpdate(); + } + } + + private void shutdownFeature() { + if (feature != null) { + feature.afterStop(engine); + feature = null; + } + } + + /** + * Tests that the feature is found in the expected service sets. + */ + @Test + public void testServiceApis() { + assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream() + .anyMatch(obj -> obj instanceof DistributedLockManager)); + } + + /** + * Tests constructor() when properties are invalid. + */ + @Test + public void testDistributedLockManagerInvalidProperties() { + // use properties containing an invalid value + Properties props = new Properties(); + props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "abc"); + + feature = new MyLockingFeature(false) { + @Override + protected Properties getProperties(String fileName) { + return props; + } + }; + + assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class); + } + + @Test + public void testGetSequenceNumber() { + assertEquals(1000, feature.getSequenceNumber()); + } + + @Test + public void testStartableApi() { + assertTrue(feature.isAlive()); + assertTrue(feature.start()); + assertTrue(feature.stop()); + feature.shutdown(); + + // above should have had no effect + assertTrue(feature.isAlive()); + + feature.afterStop(engine); + assertFalse(feature.isAlive()); + } + + @Test + public void testLockApi() { + assertFalse(feature.isLocked()); + assertTrue(feature.lock()); + assertTrue(feature.unlock()); + } + + @Test + public void testBeforeCreateLockManager() { + assertSame(feature, feature.beforeCreateLockManager(engine, new Properties())); + } + + /** + * Tests beforeCreate(), when getProperties() throws a runtime exception. + */ + @Test + public void testBeforeCreateLockManagerEx() { + shutdownFeature(); + + feature = new MyLockingFeature(false) { + @Override + protected Properties getProperties(String fileName) { + throw new IllegalArgumentException(EXPECTED_EXCEPTION); + } + }; + + assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, new Properties())) + .isInstanceOf(DistributedLockManagerException.class); + } + + @Test + public void testAfterStart() { + // verify that cleanup & expire check are both added to the queue + verify(exsvc).execute(any()); + verify(exsvc).schedule(any(Runnable.class), anyLong(), any()); + } + + /** + * Tests afterStart(), when thread pool throws a runtime exception. + */ + @Test + public void testAfterStartExInThreadPool() { + shutdownFeature(); + + feature = new MyLockingFeature(false); + + when(exsvc.schedule(any(Runnable.class), anyLong(), any())) + .thenThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)); + + assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class); + } + + @Test + public void testDeleteExpiredDbLocks() throws SQLException { + // add records: two expired, one not + insertRecord(RESOURCE, feature.getUuidString(), -1); + insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2); + insertRecord(RESOURCE3, OTHER_OWNER, 0); + insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC); + + // get the clean-up function and execute it + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc).execute(captor.capture()); + + long tbegin = System.currentTimeMillis(); + Runnable action = captor.getValue(); + action.run(); + + assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin)); + assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin)); + assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin)); + assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin)); + + assertEquals(2, getRecordCount()); + } + + /** + * Tests deleteExpiredDbLocks(), when getConnection() throws an exception. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDeleteExpiredDbLocksEx() throws SQLException { + feature = new InvalidDbLockingFeature(TRANSIENT); + + // get the clean-up function and execute it + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc).execute(captor.capture()); + + Runnable action = captor.getValue(); + + // should not throw an exception + action.run(); + } + + @Test + public void testAfterStop() { + shutdownFeature(); + + feature = new DistributedLockManager(); + + // shutdown without calling afterStart() + + shutdownFeature(); + } + + /** + * Tests afterStop(), when the data source throws an exception when close() is called. + * + * @throws SQLException if an error occurs + */ + @Test + public void testAfterStopEx() throws SQLException { + shutdownFeature(); + + // use a data source that throws an exception when closed + feature = new InvalidDbLockingFeature(TRANSIENT); + + shutdownFeature(); + } + + @Test + public void testCreateLock() throws SQLException { + verify(exsvc).execute(any()); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock.isWaiting()); + + verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any()); + + // this lock should fail + LockCallback callback2 = mock(LockCallback.class); + DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false); + assertTrue(lock2.isUnavailable()); + verify(callback2, never()).lockAvailable(lock2); + verify(callback2).lockUnavailable(lock2); + + // this should fail, too + LockCallback callback3 = mock(LockCallback.class); + DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false); + assertTrue(lock3.isUnavailable()); + verify(callback3, never()).lockAvailable(lock3); + verify(callback3).lockUnavailable(lock3); + + // no change to first + assertTrue(lock.isWaiting()); + + // no callbacks to the first lock + verify(callback, never()).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + assertTrue(lock.isWaiting()); + assertEquals(0, getRecordCount()); + + runLock(0, 0); + assertTrue(lock.isActive()); + assertEquals(1, getRecordCount()); + + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // this should succeed + DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock4.isWaiting()); + + // after running checker, original records should still remain + runChecker(0, 0, EXPIRE_SEC); + assertEquals(1, getRecordCount()); + verify(callback, never()).lockUnavailable(lock); + } + + /** + * Tests lock() when the feature is not the latest instance. + */ + @Test + public void testCreateLockNotLatestInstance() { + DistributedLockManager.setLatestInstance(null); + + Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock.isUnavailable()); + verify(callback, never()).lockAvailable(any()); + verify(callback).lockUnavailable(lock); + } + + @Test + public void testCheckExpired() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + LockCallback callback2 = mock(LockCallback.class); + final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false); + runLock(1, 0); + + LockCallback callback3 = mock(LockCallback.class); + final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false); + runLock(2, 0); + + LockCallback callback4 = mock(LockCallback.class); + final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false); + runLock(3, 0); + + LockCallback callback5 = mock(LockCallback.class); + final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false); + runLock(4, 0); + + assertEquals(5, getRecordCount()); + + // expire one record + updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1); + + // change host of another record + updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC); + + // change uuid of another record + updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC); + + + // run the checker + runChecker(0, 0, EXPIRE_SEC); + + + // check lock states + assertTrue(lock.isUnavailable()); + assertTrue(lock2.isActive()); + assertTrue(lock3.isUnavailable()); + assertTrue(lock4.isActive()); + assertTrue(lock5.isUnavailable()); + + // allow callbacks + runLock(5, 2); + runLock(6, 1); + runLock(7, 0); + verify(callback).lockUnavailable(lock); + verify(callback3).lockUnavailable(lock3); + verify(callback5).lockUnavailable(lock5); + + verify(callback2, never()).lockUnavailable(lock2); + verify(callback4, never()).lockUnavailable(lock4); + + + // another check should have been scheduled, with the normal interval + runChecker(1, 0, EXPIRE_SEC); + } + + /** + * Tests checkExpired(), when schedule() throws an exception. + */ + @Test + public void testCheckExpiredExecRejected() { + // arrange for execution to be rejected + when(exsvc.schedule(any(Runnable.class), anyLong(), any())) + .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION)); + + runChecker(0, 0, EXPIRE_SEC); + } + + /** + * Tests checkExpired(), when getConnection() throws an exception. + */ + @Test + public void testCheckExpiredSqlEx() { + // use a data source that throws an exception when getConnection() is called + feature = new InvalidDbLockingFeature(TRANSIENT); + + runChecker(0, 0, EXPIRE_SEC); + + // it should have scheduled another check, sooner + runChecker(0, 0, RETRY_SEC); + } + + @Test + public void testExpireLocks() throws SQLException { + AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null); + + feature = new MyLockingFeature(true) { + @Override + protected BasicDataSource makeDataSource() throws Exception { + // get the real data source + BasicDataSource src2 = super.makeDataSource(); + + when(datasrc.getConnection()).thenAnswer(answer -> { + DistributedLock lck = freeLock.getAndSet(null); + if (lck != null) { + // free it + lck.free(); + + // run its doUnlock + runLock(4, 0); + } + + return src2.getConnection(); + }); + + return datasrc; + } + }; + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + LockCallback callback2 = mock(LockCallback.class); + final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false); + runLock(1, 0); + + LockCallback callback3 = mock(LockCallback.class); + final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false); + // don't run doLock for lock3 - leave it in the waiting state + + LockCallback callback4 = mock(LockCallback.class); + final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false); + runLock(3, 0); + + assertEquals(3, getRecordCount()); + + // expire one record + updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1); + + // arrange to free lock4 while the checker is running + freeLock.set(lock4); + + // run the checker + runChecker(0, 0, EXPIRE_SEC); + + + // check lock states + assertTrue(lock.isUnavailable()); + assertTrue(lock2.isActive()); + assertTrue(lock3.isWaiting()); + assertTrue(lock4.isUnavailable()); + + runLock(5, 0); + verify(exsvc, times(PRE_LOCK_EXECS + 6)).execute(any()); + + verify(callback).lockUnavailable(lock); + verify(callback2, never()).lockUnavailable(lock2); + verify(callback3, never()).lockUnavailable(lock3); + verify(callback4, never()).lockUnavailable(lock4); + } + + @Test + public void testDistributedLockNoArgs() { + DistributedLock lock = new DistributedLock(); + assertNull(lock.getResourceId()); + assertNull(lock.getOwnerKey()); + assertNull(lock.getCallback()); + assertEquals(0, lock.getHoldSec()); + } + + @Test + public void testDistributedLock() { + assertThatIllegalArgumentException() + .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false)) + .withMessageContaining("holdSec"); + + // should generate no exception + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + } + + @Test + public void testDistributedLockSerializable() throws Exception { + DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + lock = roundTrip(lock); + + assertTrue(lock.isWaiting()); + + assertEquals(RESOURCE, lock.getResourceId()); + assertEquals(OWNER_KEY, lock.getOwnerKey()); + assertNull(lock.getCallback()); + assertEquals(HOLD_SEC, lock.getHoldSec()); + } + + @Test + public void testGrant() { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertFalse(lock.isActive()); + + // execute the doLock() call + runLock(0, 0); + + assertTrue(lock.isActive()); + + // the callback for the lock should have been run in the foreground thread + verify(callback).lockAvailable(lock); + } + + /** + * Tests grant() when the lock is already unavailable. + */ + @Test + public void testDistributedLockGrantUnavailable() { + DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + lock.setState(LockState.UNAVAILABLE); + lock.grant(); + + assertTrue(lock.isUnavailable()); + verify(callback, never()).lockAvailable(any()); + verify(callback, never()).lockUnavailable(any()); + } + + @Test + public void testDistributedLockDeny() { + // get a lock + feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // get another lock - should fail + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + assertTrue(lock.isUnavailable()); + + // the callback for the second lock should have been run in the foreground thread + verify(callback).lockUnavailable(lock); + + // should only have a request for the first lock + verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any()); + } + + @Test + public void testDistributedLockFree() { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + assertTrue(lock.free()); + assertTrue(lock.isUnavailable()); + + // run both requests associated with the lock + runLock(0, 1); + runLock(1, 0); + + // should not have changed state + assertTrue(lock.isUnavailable()); + + // attempt to free it again + assertFalse(lock.free()); + + // should not have queued anything else + verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any()); + + // new lock should succeed + DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + assertTrue(lock2 != lock); + assertTrue(lock2.isWaiting()); + } + + /** + * Tests that free() works on a serialized lock with a new feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testDistributedLockFreeSerialized() throws Exception { + DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + feature = new MyLockingFeature(true); + + lock = roundTrip(lock); + assertTrue(lock.free()); + assertTrue(lock.isUnavailable()); + } + + /** + * Tests free() on a serialized lock without a feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testDistributedLockFreeNoFeature() throws Exception { + DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + DistributedLockManager.setLatestInstance(null); + + lock = roundTrip(lock); + assertFalse(lock.free()); + assertTrue(lock.isUnavailable()); + } + + /** + * Tests the case where the lock is freed and doUnlock called between the call to + * isUnavailable() and the call to compute(). + */ + @Test + public void testDistributedLockFreeUnlocked() { + feature = new FreeWithFreeLockingFeature(true); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + assertFalse(lock.free()); + assertTrue(lock.isUnavailable()); + } + + /** + * Tests the case where the lock is freed, but doUnlock is not completed, between the + * call to isUnavailable() and the call to compute(). + */ + @Test + public void testDistributedLockFreeLockFreed() { + feature = new FreeWithFreeLockingFeature(false); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + assertFalse(lock.free()); + assertTrue(lock.isUnavailable()); + } + + @Test + public void testDistributedLockExtend() { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // lock2 should be denied - called back by this thread + DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + verify(callback, never()).lockAvailable(lock2); + verify(callback).lockUnavailable(lock2); + + // lock2 will still be denied - called back by this thread + lock2.extend(HOLD_SEC, callback); + verify(callback, times(2)).lockUnavailable(lock2); + + // force lock2 to be active - should still be denied + Whitebox.setInternalState(lock2, "state", LockState.ACTIVE); + lock2.extend(HOLD_SEC, callback); + verify(callback, times(3)).lockUnavailable(lock2); + + assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback)) + .withMessageContaining("holdSec"); + + // execute doLock() + runLock(0, 0); + assertTrue(lock.isActive()); + + // now extend the first lock + LockCallback callback2 = mock(LockCallback.class); + lock.extend(HOLD_SEC2, callback2); + assertTrue(lock.isWaiting()); + + // execute doExtend() + runLock(1, 0); + lock.extend(HOLD_SEC2, callback2); + assertEquals(HOLD_SEC2, lock.getHoldSec()); + verify(callback2).lockAvailable(lock); + verify(callback2, never()).lockUnavailable(lock); + } + + /** + * Tests that extend() works on a serialized lock with a new feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testDistributedLockExtendSerialized() throws Exception { + DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // run doLock + runLock(0, 0); + assertTrue(lock.isActive()); + + feature = new MyLockingFeature(true); + + lock = roundTrip(lock); + assertTrue(lock.isActive()); + + LockCallback scallback = mock(LockCallback.class); + + lock.extend(HOLD_SEC, scallback); + assertTrue(lock.isWaiting()); + + // run doExtend (in new feature) + runLock(0, 0); + assertTrue(lock.isActive()); + + verify(scallback).lockAvailable(lock); + verify(scallback, never()).lockUnavailable(lock); + } + + /** + * Tests extend() on a serialized lock without a feature. + * + * @throws Exception if an error occurs + */ + @Test + public void testDistributedLockExtendNoFeature() throws Exception { + DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // run doLock + runLock(0, 0); + assertTrue(lock.isActive()); + + DistributedLockManager.setLatestInstance(null); + + lock = roundTrip(lock); + assertTrue(lock.isActive()); + + LockCallback scallback = mock(LockCallback.class); + + lock.extend(HOLD_SEC, scallback); + assertTrue(lock.isUnavailable()); + + verify(scallback, never()).lockAvailable(lock); + verify(scallback).lockUnavailable(lock); + } + + /** + * Tests the case where the lock is freed and doUnlock called between the call to + * isUnavailable() and the call to compute(). + */ + @Test + public void testDistributedLockExtendUnlocked() { + feature = new FreeWithFreeLockingFeature(true); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + lock.extend(HOLD_SEC2, callback); + assertTrue(lock.isUnavailable()); + verify(callback).lockUnavailable(lock); + } + + /** + * Tests the case where the lock is freed, but doUnlock is not completed, between the + * call to isUnavailable() and the call to compute(). + */ + @Test + public void testDistributedLockExtendLockFreed() { + feature = new FreeWithFreeLockingFeature(false); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + lock.extend(HOLD_SEC2, callback); + assertTrue(lock.isUnavailable()); + verify(callback).lockUnavailable(lock); + } + + @Test + public void testDistributedLockScheduleRequest() { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + verify(callback).lockAvailable(lock); + } + + @Test + public void testDistributedLockRescheduleRequest() throws SQLException { + // use a data source that throws an exception when getConnection() is called + InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT); + feature = invfeat; + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should fail and reschedule + runLock(0, 0); + + // should still be waiting + assertTrue(lock.isWaiting()); + verify(callback, never()).lockUnavailable(lock); + + // free the lock while doLock is executing + invfeat.freeLock = true; + + // try scheduled request - should just invoke doUnlock + runSchedule(0, 0); + + // should still be waiting + assertTrue(lock.isUnavailable()); + verify(callback, never()).lockUnavailable(lock); + + // should have scheduled a retry of doUnlock + verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any()); + } + + @Test + public void testDistributedLockGetNextRequest() { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + /* + * run doLock. This should cause getNextRequest() to be called twice, once with a + * request in the queue, and the second time with request=null. + */ + runLock(0, 0); + } + + /** + * Tests getNextRequest(), where the same request is still in the queue the second + * time it's called. + */ + @Test + public void testDistributedLockGetNextRequestSameRequest() { + // force reschedule to be invoked + feature = new InvalidDbLockingFeature(TRANSIENT); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + /* + * run doLock. This should cause getNextRequest() to be called twice, once with a + * request in the queue, and the second time with the same request again. + */ + runLock(0, 0); + + verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any()); + } + + @Test + public void testDistributedLockDoRequest() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + assertTrue(lock.isWaiting()); + + // run doLock via doRequest + runLock(0, 0); + + assertTrue(lock.isActive()); + } + + /** + * Tests doRequest(), when doRequest() is already running within another thread. + */ + @Test + public void testDistributedLockDoRequestBusy() { + /* + * this feature will invoke a request in a background thread while it's being run + * in a foreground thread. + */ + AtomicBoolean running = new AtomicBoolean(false); + AtomicBoolean returned = new AtomicBoolean(false); + + feature = new MyLockingFeature(true) { + @Override + protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) { + private static final long serialVersionUID = 1L; + + @Override + protected boolean doDbInsert(Connection conn) throws SQLException { + if (running.get()) { + // already inside the thread - don't recurse any further + return super.doDbInsert(conn); + } + + running.set(true); + + Thread thread = new Thread(() -> { + // run doLock from within the new thread + runLock(0, 0); + }); + thread.setDaemon(true); + thread.start(); + + // wait for the background thread to complete before continuing + try { + thread.join(5000); + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + + returned.set(!thread.isAlive()); + + return super.doDbInsert(conn); + } + }; + } + }; + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // run doLock + runLock(0, 0); + + assertTrue(returned.get()); + } + + /** + * Tests doRequest() when an exception occurs while the lock is in the WAITING state. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoRequestRunExWaiting() throws SQLException { + // throw run-time exception + when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION)); + + // use a data source that throws an exception when getConnection() is called + feature = new MyLockingFeature(true) { + @Override + protected BasicDataSource makeDataSource() throws Exception { + return datasrc; + } + }; + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should NOT reschedule + runLock(0, 0); + + assertTrue(lock.isUnavailable()); + verify(callback).lockUnavailable(lock); + + verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any()); + } + + /** + * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE + * state. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoRequestRunExUnavailable() throws SQLException { + // throw run-time exception + when(datasrc.getConnection()).thenAnswer(answer -> { + lock.free(); + throw new IllegalStateException(EXPECTED_EXCEPTION); + }); + + // use a data source that throws an exception when getConnection() is called + feature = new MyLockingFeature(true) { + @Override + protected BasicDataSource makeDataSource() throws Exception { + return datasrc; + } + }; + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should NOT reschedule + runLock(0, 0); + + assertTrue(lock.isUnavailable()); + verify(callback, never()).lockUnavailable(lock); + + verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any()); + } + + /** + * Tests doRequest() when the retry count gets exhausted. + */ + @Test + public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() { + // use a data source that throws an exception when getConnection() is called + feature = new InvalidDbLockingFeature(TRANSIENT); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should fail and reschedule + runLock(0, 0); + + // should still be waiting + assertTrue(lock.isWaiting()); + verify(callback, never()).lockUnavailable(lock); + + // try again, via SCHEDULER - first retry fails + runSchedule(0, 0); + + // should still be waiting + assertTrue(lock.isWaiting()); + verify(callback, never()).lockUnavailable(lock); + + // try again, via SCHEDULER - final retry fails + runSchedule(1, 0); + assertTrue(lock.isUnavailable()); + + // now callback should have been called + verify(callback).lockUnavailable(lock); + } + + /** + * Tests doRequest() when a non-transient DB exception is thrown. + */ + @Test + public void testDistributedLockDoRequestNotTransient() { + /* + * use a data source that throws a PERMANENT exception when getConnection() is + * called + */ + feature = new InvalidDbLockingFeature(PERMANENT); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should fail + runLock(0, 0); + + assertTrue(lock.isUnavailable()); + verify(callback).lockUnavailable(lock); + + // should not have scheduled anything new + verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any()); + verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any()); + } + + @Test + public void testDistributedLockDoLock() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should simply do an insert + long tbegin = System.currentTimeMillis(); + runLock(0, 0); + + assertEquals(1, getRecordCount()); + assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin)); + verify(callback).lockAvailable(lock); + } + + /** + * Tests doLock() when the lock is freed before doLock runs. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoLockFreed() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + lock.setState(LockState.UNAVAILABLE); + + // invoke doLock - should do nothing + runLock(0, 0); + + assertEquals(0, getRecordCount()); + + verify(callback, never()).lockAvailable(lock); + } + + /** + * Tests doLock() when a DB exception is thrown. + */ + @Test + public void testDistributedLockDoLockEx() { + // use a data source that throws an exception when getConnection() is called + feature = new InvalidDbLockingFeature(PERMANENT); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should simply do an insert + runLock(0, 0); + + // lock should have failed due to exception + verify(callback).lockUnavailable(lock); + } + + /** + * Tests doLock() when an (expired) record already exists, thus requiring doUpdate() + * to be called. + */ + @Test + public void testDistributedLockDoLockNeedingUpdate() throws SQLException { + // insert an expired record + insertRecord(RESOURCE, feature.getUuidString(), 0); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock - should simply do an update + runLock(0, 0); + verify(callback).lockAvailable(lock); + } + + /** + * Tests doLock() when a locked record already exists. + */ + @Test + public void testDistributedLockDoLockAlreadyLocked() throws SQLException { + // insert an expired record + insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock + runLock(0, 0); + + // lock should have failed because it's already locked + verify(callback).lockUnavailable(lock); + } + + @Test + public void testDistributedLockDoUnlock() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // invoke doLock() + runLock(0, 0); + + lock.free(); + + // invoke doUnlock() + long tbegin = System.currentTimeMillis(); + runLock(1, 0); + + assertEquals(0, getRecordCount()); + assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin)); + + assertTrue(lock.isUnavailable()); + + // no more callbacks should have occurred + verify(callback, times(1)).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + } + + /** + * Tests doUnlock() when a DB exception is thrown. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoUnlockEx() throws SQLException { + feature = new InvalidDbLockingFeature(PERMANENT); + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + + // do NOT invoke doLock() - it will fail without a DB connection + + lock.free(); + + // invoke doUnlock() + runLock(1, 0); + + assertTrue(lock.isUnavailable()); + + // no more callbacks should have occurred + verify(callback, never()).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + } + + @Test + public void testDistributedLockDoExtend() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + LockCallback callback2 = mock(LockCallback.class); + lock.extend(HOLD_SEC2, callback2); + + // call doExtend() + long tbegin = System.currentTimeMillis(); + runLock(1, 0); + + assertEquals(1, getRecordCount()); + assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin)); + + assertTrue(lock.isActive()); + + // no more callbacks should have occurred + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // extension should have succeeded + verify(callback2).lockAvailable(lock); + verify(callback2, never()).lockUnavailable(lock); + } + + /** + * Tests doExtend() when the lock is freed before doExtend runs. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoExtendFreed() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + lock.extend(HOLD_SEC2, callback); + + lock.setState(LockState.UNAVAILABLE); + + // invoke doExtend - should do nothing + runLock(1, 0); + + assertEquals(0, getRecordCount()); + + verify(callback, never()).lockAvailable(lock); + } + + /** + * Tests doExtend() when the lock record is missing from the DB, thus requiring an + * insert. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoExtendInsertNeeded() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + LockCallback callback2 = mock(LockCallback.class); + lock.extend(HOLD_SEC2, callback2); + + // delete the record so it's forced to re-insert it + cleanDb(); + + // call doExtend() + long tbegin = System.currentTimeMillis(); + runLock(1, 0); + + assertEquals(1, getRecordCount()); + assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin)); + + assertTrue(lock.isActive()); + + // no more callbacks should have occurred + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // extension should have succeeded + verify(callback2).lockAvailable(lock); + verify(callback2, never()).lockUnavailable(lock); + } + + /** + * Tests doExtend() when both update and insert fail. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException { + /* + * this feature will create a lock that returns false when doDbUpdate() is + * invoked, or when doDbInsert() is invoked a second time + */ + feature = new MyLockingFeature(true) { + @Override + protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) { + private static final long serialVersionUID = 1L; + private int ntimes = 0; + + @Override + protected boolean doDbInsert(Connection conn) throws SQLException { + if (ntimes++ > 0) { + return false; + } + + return super.doDbInsert(conn); + } + + @Override + protected boolean doDbUpdate(Connection conn) { + return false; + } + }; + } + }; + + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + LockCallback callback2 = mock(LockCallback.class); + lock.extend(HOLD_SEC2, callback2); + + // call doExtend() + runLock(1, 0); + + assertTrue(lock.isUnavailable()); + + // no more callbacks should have occurred + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // extension should have failed + verify(callback2, never()).lockAvailable(lock); + verify(callback2).lockUnavailable(lock); + } + + /** + * Tests doExtend() when an exception occurs. + * + * @throws SQLException if an error occurs + */ + @Test + public void testDistributedLockDoExtendEx() throws SQLException { + lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false); + runLock(0, 0); + + /* + * delete the record and insert one with a different owner, which will cause + * doDbInsert() to throw an exception + */ + cleanDb(); + insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC); + + LockCallback callback2 = mock(LockCallback.class); + lock.extend(HOLD_SEC2, callback2); + + // call doExtend() + runLock(1, 0); + + assertTrue(lock.isUnavailable()); + + // no more callbacks should have occurred + verify(callback).lockAvailable(lock); + verify(callback, never()).lockUnavailable(lock); + + // extension should have failed + verify(callback2, never()).lockAvailable(lock); + verify(callback2).lockUnavailable(lock); + } + + @Test + public void testDistributedLockToString() { + String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString(); + assertNotNull(text); + assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback"); + } + + @Test + public void testMakeThreadPool() { + // use a REAL feature to test this + feature = new DistributedLockManager(); + + // this should create a thread pool + feature.beforeCreateLockManager(engine, new Properties()); + feature.afterStart(engine); + + shutdownFeature(); + } + + /** + * Performs a multi-threaded test of the locking facility. + * + * @throws InterruptedException if the current thread is interrupted while waiting for + * the background threads to complete + */ + @Test + public void testMultiThreaded() throws InterruptedException { + feature = new DistributedLockManager(); + feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties()); + feature.afterStart(PolicyEngineConstants.getManager()); + + List<MyThread> threads = new ArrayList<>(MAX_THREADS); + for (int x = 0; x < MAX_THREADS; ++x) { + threads.add(new MyThread()); + } + + threads.forEach(Thread::start); + + for (MyThread thread : threads) { + thread.join(6000); + assertFalse(thread.isAlive()); + } + + for (MyThread thread : threads) { + if (thread.err != null) { + throw thread.err; + } + } + + assertTrue(nsuccesses.get() > 0); + } + + private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback, + boolean waitForLock) { + return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock); + } + + private DistributedLock roundTrip(DistributedLock lock) throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(lock); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + try (ObjectInputStream ois = new ObjectInputStream(bais)) { + return (DistributedLock) ois.readObject(); + } + } + + /** + * Runs the checkExpired() action. + * + * @param nskip number of actions in the work queue to skip + * @param nadditional number of additional actions that appear in the work queue + * <i>after</i> the checkExpired action + * @param schedSec number of seconds for which the checker should have been scheduled + */ + private void runChecker(int nskip, int nadditional, long schedSec) { + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS)); + Runnable action = captor.getAllValues().get(nskip); + action.run(); + } + + /** + * Runs a lock action (e.g., doLock, doUnlock). + * + * @param nskip number of actions in the work queue to skip + * @param nadditional number of additional actions that appear in the work queue + * <i>after</i> the desired action + */ + void runLock(int nskip, int nadditional) { + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture()); + + Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip); + action.run(); + } + + /** + * Runs a scheduled action (e.g., "retry" action). + * + * @param nskip number of actions in the work queue to skip + * @param nadditional number of additional actions that appear in the work queue + * <i>after</i> the desired action + */ + void runSchedule(int nskip, int nadditional) { + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any()); + + Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip); + action.run(); + } + + /** + * Gets a count of the number of lock records in the DB. + * + * @return the number of lock records in the DB + * @throws SQLException if an error occurs accessing the DB + */ + private int getRecordCount() throws SQLException { + try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks"); + ResultSet result = stmt.executeQuery()) { + + if (result.next()) { + return result.getInt(1); + + } else { + return 0; + } + } + } + + /** + * Determines if there is a record for the given resource whose expiration time is in + * the expected range. + * + * @param resourceId ID of the resource of interest + * @param uuidString UUID string of the owner + * @param holdSec seconds for which the lock was to be held + * @param tbegin earliest time, in milliseconds, at which the record could have been + * inserted into the DB + * @return {@code true} if a record is found, {@code false} otherwise + * @throws SQLException if an error occurs accessing the DB + */ + private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException { + try (PreparedStatement stmt = + conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks" + + " WHERE resourceId=? AND host=? AND owner=?")) { + + stmt.setString(1, resourceId); + stmt.setString(2, feature.getHostName()); + stmt.setString(3, uuidString); + + try (ResultSet result = stmt.executeQuery()) { + if (result.next()) { + int remaining = result.getInt(1); + long maxDiff = System.currentTimeMillis() - tbegin; + return (remaining >= 0 && holdSec - remaining <= maxDiff); + + } else { + return false; + } + } + } + } + + /** + * Inserts a record into the DB. + * + * @param resourceId ID of the resource of interest + * @param uuidString UUID string of the owner + * @param expireOffset offset, in seconds, from "now", at which the lock should expire + * @throws SQLException if an error occurs accessing the DB + */ + private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException { + this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset); + } + + private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset) + throws SQLException { + try (PreparedStatement stmt = + conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) " + + "values (?, ?, ?, timestampadd(second, ?, now()))")) { + + stmt.setString(1, resourceId); + stmt.setString(2, hostName); + stmt.setString(3, uuidString); + stmt.setInt(4, expireOffset); + + assertEquals(1, stmt.executeUpdate()); + } + } + + /** + * Updates a record in the DB. + * + * @param resourceId ID of the resource of interest + * @param newUuid UUID string of the <i>new</i> owner + * @param expireOffset offset, in seconds, from "now", at which the lock should expire + * @throws SQLException if an error occurs accessing the DB + */ + private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException { + try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?," + + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) { + + stmt.setString(1, newHost); + stmt.setString(2, newUuid); + stmt.setInt(3, expireOffset); + stmt.setString(4, resourceId); + + assertEquals(1, stmt.executeUpdate()); + } + } + + /** + * Feature that uses <i>exsvc</i> to execute requests. + */ + private class MyLockingFeature extends DistributedLockManager { + + public MyLockingFeature(boolean init) { + shutdownFeature(); + + exsvc = mock(ScheduledExecutorService.class); + when(engine.getExecutorService()).thenReturn(exsvc); + + if (init) { + beforeCreateLockManager(engine, new Properties()); + afterStart(engine); + } + } + } + + /** + * Feature whose data source all throws exceptions. + */ + private class InvalidDbLockingFeature extends MyLockingFeature { + private int errcode; + private boolean freeLock = false; + + public InvalidDbLockingFeature(int errcode) { + // pass "false" because we have to set the error code BEFORE calling + // afterStart() + super(false); + + this.errcode = errcode; + + this.beforeCreateLockManager(engine, new Properties()); + this.afterStart(engine); + } + + @Override + protected BasicDataSource makeDataSource() throws Exception { + when(datasrc.getConnection()).thenAnswer(answer -> { + if (freeLock) { + freeLock = false; + lock.free(); + } + + throw new SQLException(EXPECTED_EXCEPTION, "", errcode); + }); + + doThrow(new SQLException(EXPECTED_EXCEPTION, "", errcode)).when(datasrc).close(); + + return datasrc; + } + } + + /** + * Feature whose locks free themselves while free() is already running. + */ + private class FreeWithFreeLockingFeature extends MyLockingFeature { + private boolean relock; + + public FreeWithFreeLockingFeature(boolean relock) { + super(true); + this.relock = relock; + } + + @Override + protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + + return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) { + private static final long serialVersionUID = 1L; + private boolean checked = false; + + @Override + public boolean isUnavailable() { + if (checked) { + return super.isUnavailable(); + } + + checked = true; + + // release and relock + free(); + + if (relock) { + // run doUnlock + runLock(1, 0); + + // relock it + createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false); + } + + return false; + } + }; + } + } + + /** + * Thread used with the multi-threaded test. It repeatedly attempts to get a lock, + * extend it, and then unlock it. + */ + private class MyThread extends Thread { + AssertionError err = null; + + public MyThread() { + setDaemon(true); + } + + @Override + public void run() { + try { + for (int x = 0; x < MAX_LOOPS; ++x) { + makeAttempt(); + } + + } catch (AssertionError e) { + err = e; + } + } + + private void makeAttempt() { + try { + Semaphore sem = new Semaphore(0); + + LockCallback cb = new LockCallback() { + @Override + public void lockAvailable(Lock lock) { + sem.release(); + } + + @Override + public void lockUnavailable(Lock lock) { + sem.release(); + } + }; + + Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false); + + // wait for callback, whether available or unavailable + assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS)); + if (!lock.isActive()) { + return; + } + + nsuccesses.incrementAndGet(); + + assertEquals(1, nactive.incrementAndGet()); + + lock.extend(HOLD_SEC2, cb); + assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS)); + assertTrue(lock.isActive()); + + // decrement BEFORE free() + nactive.decrementAndGet(); + + assertTrue(lock.free()); + assertTrue(lock.isUnavailable()); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("interrupted", e); + } + } + } +} diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java new file mode 100644 index 00000000..5f76d657 --- /dev/null +++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java @@ -0,0 +1,81 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distributed.locking; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Properties; +import java.util.TreeSet; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.utils.properties.exception.PropertyException; + +public class DistributedLockPropertiesTest { + + private Properties props; + + /** + * Populates {@link #props}. + */ + @Before + public void setUp() { + props = new Properties(); + + props.setProperty(DistributedLockProperties.DB_DRIVER, "my driver"); + props.setProperty(DistributedLockProperties.DB_URL, "my url"); + props.setProperty(DistributedLockProperties.DB_USER, "my user"); + props.setProperty(DistributedLockProperties.DB_PASS, "my pass"); + props.setProperty(DistributedLockProperties.TRANSIENT_ERROR_CODES, "10,-20,,,30"); + props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "100"); + props.setProperty(DistributedLockProperties.RETRY_SEC, "200"); + props.setProperty(DistributedLockProperties.MAX_RETRIES, "300"); + } + + @Test + public void test() throws PropertyException { + DistributedLockProperties dlp = new DistributedLockProperties(props); + + assertEquals("my driver", dlp.getDbDriver()); + assertEquals("my url", dlp.getDbUrl()); + assertEquals("my user", dlp.getDbUser()); + assertEquals("my pass", dlp.getDbPwd()); + assertEquals("10,-20,,,30", dlp.getErrorCodeStrings()); + assertEquals("[-20, 10, 30]", new TreeSet<>(dlp.getTransientErrorCodes()).toString()); + assertEquals(100, dlp.getExpireCheckSec()); + assertEquals(200, dlp.getRetrySec()); + assertEquals(300, dlp.getMaxRetries()); + + assertTrue(dlp.isTransient(10)); + assertTrue(dlp.isTransient(-20)); + assertTrue(dlp.isTransient(30)); + + assertFalse(dlp.isTransient(-10)); + + // invalid value + props.setProperty(DistributedLockProperties.TRANSIENT_ERROR_CODES, "10,abc,30"); + + assertThatThrownBy(() -> new DistributedLockProperties(props)).isInstanceOf(PropertyException.class) + .hasMessageContaining(DistributedLockProperties.TRANSIENT_ERROR_CODES); + } +} diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java deleted file mode 100644 index 68a5a31b..00000000 --- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.distributed.locking; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.sql.SQLException; -import org.apache.commons.dbcp2.BasicDataSource; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.onap.policy.common.utils.properties.exception.PropertyException; -import org.onap.policy.drools.persistence.SystemPersistenceConstants; - -/** - * Partially tests DistributedLockingFeature; most of the methods are tested via - * {@link TargetLockTest}. - */ -public class DistributedLockingFeatureTest { - private static final String EXPECTED = "expected exception"; - - private BasicDataSource dataSrc; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources"); - } - - @Before - public void setUp() throws Exception { - dataSrc = mock(BasicDataSource.class); - } - - @Test - public void testGetSequenceNumber() { - assertEquals(1000, new DistributedLockingFeature().getSequenceNumber()); - } - - @Test(expected = DistributedLockingFeatureException.class) - public void testAfterStart_PropEx() { - new DistributedLockingFeatureImpl(new PropertyException("prop", "val")).afterStart(null); - } - - @Test(expected = DistributedLockingFeatureException.class) - public void testAfterStart_InterruptEx() { - new DistributedLockingFeatureImpl(new InterruptedException(EXPECTED)).afterStart(null); - } - - @Test(expected = DistributedLockingFeatureException.class) - public void testAfterStart_OtherEx() { - new DistributedLockingFeatureImpl(new RuntimeException(EXPECTED)).afterStart(null); - } - - @Test - public void testCleanLockTable() throws Exception { - when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED)); - - new DistributedLockingFeatureImpl().afterStart(null); - } - - /** - * Feature that overrides {@link #makeDataSource()}. - */ - private class DistributedLockingFeatureImpl extends DistributedLockingFeature { - /** - * Exception to throw when {@link #makeDataSource()} is invoked. - */ - private final Exception makeEx; - - public DistributedLockingFeatureImpl() { - makeEx = null; - } - - public DistributedLockingFeatureImpl(Exception ex) { - this.makeEx = ex; - } - - @Override - protected BasicDataSource makeDataSource() throws Exception { - if (makeEx != null) { - throw makeEx; - } - - return dataSrc; - } - } -} diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java deleted file mode 100644 index 84eba6b6..00000000 --- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java +++ /dev/null @@ -1,345 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * feature-distributed-locking - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.distributed.locking; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import org.apache.commons.dbcp2.BasicDataSource; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi.OperResult; -import org.onap.policy.drools.persistence.SystemPersistenceConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TargetLockTest { - private static final Logger logger = LoggerFactory.getLogger(TargetLockTest.class); - private static final int MAX_AGE_SEC = 4 * 60; - private static final String DB_CONNECTION = - "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling"; - private static final String DB_USER = "user"; - private static final String DB_PASSWORD = "password"; - private static final String EXPECTED = "expected exception"; - private static final String MY_RESOURCE = "my-resource-id"; - private static final String MY_OWNER = "my-owner"; - private static final UUID MY_UUID = UUID.randomUUID(); - private static Connection conn = null; - private static DistributedLockingFeature distLockFeat; - - /** - * Setup the database. - */ - @BeforeClass - public static void setup() { - getDbConnection(); - createTable(); - SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources"); - distLockFeat = new DistributedLockingFeature(); - distLockFeat.afterStart(null); - } - - /** - * Cleanup the lock. - */ - @AfterClass - public static void cleanUp() { - distLockFeat.beforeShutdown(null); - try { - conn.close(); - } catch (SQLException e) { - logger.error("Error in TargetLockTest.cleanUp()", e); - } - } - - /** - * Wipe the database. - */ - @Before - public void wipeDb() { - try (PreparedStatement lockDelete = conn.prepareStatement("DELETE FROM pooling.locks"); ) { - lockDelete.executeUpdate(); - } catch (SQLException e) { - logger.error("Error in TargetLockTest.wipeDb()", e); - throw new RuntimeException(e); - } - } - - @Test - public void testGrabLockSuccess() throws InterruptedException, ExecutionException { - assertEquals( - OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC)); - - // attempt to grab expiredLock - try (PreparedStatement updateStatement = - conn.prepareStatement( - "UPDATE pooling.locks SET expirationTime = timestampadd(second, -1, now()) WHERE resourceId = ?"); ) { - updateStatement.setString(1, "resource1"); - updateStatement.executeUpdate(); - - } catch (SQLException e) { - logger.error("Error in TargetLockTest.testGrabLockSuccess()", e); - throw new RuntimeException(e); - } - - assertEquals( - OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC)); - - // cannot re-lock - assertEquals( - OperResult.OPER_DENIED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC)); - - assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - } - - @Test - public void testExpiredLocks() throws Exception { - - // grab lock - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - - // force lock to expire - try (PreparedStatement lockExpire = - conn.prepareStatement( - "UPDATE pooling.locks SET expirationTime = timestampadd(second, -?, now())"); ) { - lockExpire.setInt(1, MAX_AGE_SEC + 1); - lockExpire.executeUpdate(); - } - - assertEquals( - OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner2", MAX_AGE_SEC)); - } - - @Test - public void testGrabLockFail() throws InterruptedException, ExecutionException { - - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - - assertEquals( - OperResult.OPER_DENIED, distLockFeat.beforeLock("resource1", "owner2", MAX_AGE_SEC)); - } - - @Test - public void testSecondGrab_UpdateOk() throws Exception { - PreparedStatement grabLockInsert = mock(PreparedStatement.class); - when(grabLockInsert.executeUpdate()).thenThrow(new SQLException(EXPECTED)); - - PreparedStatement secondGrabUpdate = mock(PreparedStatement.class); - when(secondGrabUpdate.executeUpdate()).thenReturn(1); - - Connection connMock = mock(Connection.class); - when(connMock.prepareStatement(anyString())).thenReturn(grabLockInsert, secondGrabUpdate); - - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenReturn(connMock); - - assertTrue(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).lock(MAX_AGE_SEC)); - } - - @Test - public void testSecondGrab_UpdateFail_InsertOk() throws Exception { - PreparedStatement grabLockInsert = mock(PreparedStatement.class); - when(grabLockInsert.executeUpdate()).thenThrow(new SQLException(EXPECTED)); - - PreparedStatement secondGrabUpdate = mock(PreparedStatement.class); - when(secondGrabUpdate.executeUpdate()).thenReturn(0); - - PreparedStatement secondGrabInsert = mock(PreparedStatement.class); - when(secondGrabInsert.executeUpdate()).thenReturn(1); - - Connection connMock = mock(Connection.class); - when(connMock.prepareStatement(anyString())).thenReturn(grabLockInsert, secondGrabUpdate, secondGrabInsert); - - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenReturn(connMock); - - assertTrue(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).lock(MAX_AGE_SEC)); - } - - @Test - public void testSecondGrab_UpdateFail_InsertFail() throws Exception { - PreparedStatement grabLockInsert = mock(PreparedStatement.class); - when(grabLockInsert.executeUpdate()).thenThrow(new SQLException(EXPECTED)); - - PreparedStatement secondGrabUpdate = mock(PreparedStatement.class); - when(secondGrabUpdate.executeUpdate()).thenReturn(0); - - PreparedStatement secondGrabInsert = mock(PreparedStatement.class); - when(secondGrabInsert.executeUpdate()).thenReturn(0); - - Connection connMock = mock(Connection.class); - when(connMock.prepareStatement(anyString())).thenReturn(grabLockInsert, secondGrabUpdate, secondGrabInsert); - - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenReturn(connMock); - - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).lock(MAX_AGE_SEC)); - } - - @Test - public void testUpdateLock() throws Exception { - // not locked yet - refresh should fail - assertEquals( - OperResult.OPER_DENIED, distLockFeat.beforeRefresh("resource1", "owner1", MAX_AGE_SEC)); - - // now lock it - assertEquals( - OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC)); - - // refresh should work now - assertEquals( - OperResult.OPER_ACCEPTED, distLockFeat.beforeRefresh("resource1", "owner1", MAX_AGE_SEC)); - - assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - - // expire the lock - try (PreparedStatement updateStatement = - conn.prepareStatement( - "UPDATE pooling.locks SET expirationTime = timestampadd(second, -1, now()) WHERE resourceId = ?"); ) { - updateStatement.setString(1, "resource1"); - updateStatement.executeUpdate(); - } - - // refresh should fail now - assertEquals( - OperResult.OPER_DENIED, distLockFeat.beforeRefresh("resource1", "owner1", MAX_AGE_SEC)); - - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - - // test exception case - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED)); - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).refresh(MAX_AGE_SEC)); - } - - @Test - public void testUnlock() throws Exception { - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - - assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeUnlock("resource1", "owner1")); - assertEquals( - OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner2", MAX_AGE_SEC)); - - // test exception case - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED)); - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).unlock()); - } - - @Test - public void testIsActive() throws Exception { - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner2")); - - // isActive on expiredLock - try (PreparedStatement updateStatement = - conn.prepareStatement( - "UPDATE pooling.locks SET expirationTime = timestampadd(second, -5, now()) WHERE resourceId = ?"); ) { - updateStatement.setString(1, "resource1"); - updateStatement.executeUpdate(); - } - - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - // Unlock record, next isActive attempt should fail - distLockFeat.beforeUnlock("resource1", "owner1"); - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1")); - - // test exception case for outer "try" - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED)); - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isActive()); - - // test exception case for inner "try" - PreparedStatement stmt = mock(PreparedStatement.class); - when(stmt.executeQuery()).thenThrow(new SQLException(EXPECTED)); - Connection connMock = mock(Connection.class); - when(connMock.prepareStatement(anyString())).thenReturn(stmt); - dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenReturn(connMock); - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isActive()); - } - - @Test - public void unlockBeforeLock() { - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeUnlock("resource1", "owner1")); - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeUnlock("resource1", "owner1")); - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeUnlock("resource1", "owner1")); - } - - @Test - public void testIsLocked() throws Exception { - assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLocked("resource1")); - distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC); - assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLocked("resource1")); - - // test exception case for outer "try" - BasicDataSource dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED)); - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isLocked()); - - // test exception case for inner "try" - PreparedStatement stmt = mock(PreparedStatement.class); - when(stmt.executeQuery()).thenThrow(new SQLException(EXPECTED)); - Connection connMock = mock(Connection.class); - when(connMock.prepareStatement(anyString())).thenReturn(stmt); - dataSrc = mock(BasicDataSource.class); - when(dataSrc.getConnection()).thenReturn(connMock); - assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isLocked()); - } - - private static void getDbConnection() { - try { - conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD); - } catch (SQLException e) { - logger.error("Error in TargetLockTest.getDBConnection()", e); - } - } - - private static void createTable() { - String createString = - "create table if not exists pooling.locks " - + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), " - + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))"; - try (PreparedStatement createStmt = conn.prepareStatement(createString); ) { - createStmt.executeUpdate(); - - } catch (SQLException e) { - logger.error("Error in TargetLockTest.createTable()", e); - } - } -} diff --git a/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties b/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties index d1a07e82..061cc608 100644 --- a/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties +++ b/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties @@ -2,14 +2,14 @@ # ============LICENSE_START======================================================= # feature-distributed-locking # ================================================================================ -# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. +# Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,5 +22,8 @@ javax.persistence.jdbc.driver=org.h2.Driver javax.persistence.jdbc.url=jdbc:h2:mem:pooling javax.persistence.jdbc.user=user javax.persistence.jdbc.password=password -distributed.locking.lock.aging=150 -distributed.locking.heartbeat.interval=500
\ No newline at end of file + +distributed.locking.transient.error.codes=500 +distributed.locking.expire.check.seconds=900 +distributed.locking.retry.seconds=60 +distributed.locking.max.retries=2 |