aboutsummaryrefslogtreecommitdiffstats
path: root/feature-distributed-locking/src
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2019-09-24 10:51:21 -0400
committerJim Hahn <jrh3@att.com>2019-10-17 15:40:32 -0400
commit6e0b450abe7e62fa47ffe14e95a67d035174dbdb (patch)
treee91c7bfb7365f9a06ad5674bc83e041b7237e378 /feature-distributed-locking/src
parent1528214803af722cd660b7c4a3129f3de5b4ea7f (diff)
Reimplement Lock API using Lock objects
Modified PolicyResourceLockManager to just return a feature, deferring the lock() call/method to the feature, itself. The manager was also modified so that, if it can't find an enabled provider, it will return a default provider, whose lock() methods always fail. Once a feature has been identified, the manager will cache it for use thereafter. Modified the feature API to return lock objects and simplified the interface to remove the beforeXxx and afterXxx methods. Moved the unlock and refresh methods from the feature API into the lock class, renaming them to free and extend, respectively. Added a separate, feature-simple-locking project, which implements a simple version of the locking feature, over a single JVM. Extensively revised the distributed locking feature to fit in with the new API. Added support for persistence so that the various LockImpl classes can be serialized and still function correctly when they are deserialized back into new feature instances Added default implementations of free & extend to LockImpl. Modified API to take the ownerKey string, instead of the owner object. Removed Extractor as unneeded - may add via another review, if still useful. Updates per review comments: - Updated licenses in feature-simple-locking - Added beforeCreateLock & afterCreateLock to feature API - Moved SimpleLockingFeature into policy-management so that it's always available - Moved the executor service, "exsvc", into PolicyEngine - Moved Extrator into policy-utils - Changed Extractor logging level for exceptions - Fixed feature sequence numbers - Fixed mixing of seconds and milliseconds - Renamed exsvc - Modified to use property method with default value - Configured scheduled executor - Added suffix to Extractor.register() - Eliminated Feature Api and tied lock manager into engine - Added non-null checks to LockImpl parameters - Added non-null checks to createLock() parameters - Checked that lockManager is initialized Change-Id: Iddba38157ddc5f7277656979c0e679e5489eb7b1 Issue-ID: POLICY-2113 Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'feature-distributed-locking/src')
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java936
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java (renamed from feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java)12
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java136
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java179
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java127
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java282
-rw-r--r--feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi1
-rw-r--r--feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi2
-rw-r--r--feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerExceptionTest.java (renamed from feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureExceptionTest.java)12
-rw-r--r--feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java1818
-rw-r--r--feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java81
-rw-r--r--feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java107
-rw-r--r--feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java345
-rw-r--r--feature-distributed-locking/src/test/resources/feature-distributed-locking.properties13
14 files changed, 2992 insertions, 1059 deletions
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java
new file mode 100644
index 00000000..523c0d93
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java
@@ -0,0 +1,936 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.commons.dbcp2.BasicDataSourceFactory;
+import org.onap.policy.common.utils.network.NetworkUtil;
+import org.onap.policy.drools.core.lock.AlwaysFailLock;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.LockImpl;
+import org.onap.policy.drools.core.lock.LockState;
+import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
+import org.onap.policy.drools.features.PolicyEngineFeatureApi;
+import org.onap.policy.drools.persistence.SystemPersistenceConstants;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Distributed implementation of the Lock Feature. Maintains locks across servers using a
+ * shared DB.
+ *
+ * <p/>
+ * Note: this implementation does <i>not</i> honor the waitForLocks={@code true}
+ * parameter.
+ *
+ * <p/>
+ * Additional Notes:
+ * <dl>
+ * <li>The <i>owner</i> field in the DB is not derived from the lock's owner info, but is
+ * instead populated with the {@link #uuidString}.</li>
+ * <li>A periodic check of the DB is made to determine if any of the locks have
+ * expired.</li>
+ * <li>When a lock is deserialized, it will not initially appear in this feature's map; it
+ * will be added to the map once free() or extend() is invoked, provided there isn't
+ * already an entry. In addition, it initially has the host and UUID of the feature
+ * instance that created it. However, as soon as doExtend() completes successfully, the
+ * host and UUID of the lock will be updated to reflect the values within this feature
+ * instance.</li>
+ * </dl>
+ */
+public class DistributedLockManager
+ implements PolicyResourceLockManager, PolicyEngineFeatureApi {
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class);
+
+ private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
+ private static final String LOCK_LOST_MSG = "lock lost";
+ private static final String NOT_LOCKED_MSG = "not locked";
+
+ @Getter(AccessLevel.PROTECTED)
+ @Setter(AccessLevel.PROTECTED)
+ private static DistributedLockManager latestInstance = null;
+
+
+ /**
+ * Name of the host on which this JVM is running.
+ */
+ @Getter
+ private final String hostName;
+
+ /**
+ * UUID of this object.
+ */
+ @Getter
+ private final String uuidString = UUID.randomUUID().toString();
+
+ /**
+ * Maps a resource to the lock that owns it, or is awaiting a request for it. Once a
+ * lock is added to the map, it remains in the map until the lock is lost or until the
+ * unlock request completes.
+ */
+ private final Map<String, DistributedLock> resource2lock = new ConcurrentHashMap<>();
+
+ /**
+ * Engine with which this manager is associated.
+ */
+ private PolicyEngine engine;
+
+ /**
+ * Feature properties.
+ */
+ private DistributedLockProperties featProps;
+
+ /**
+ * Thread pool used to check for lock expiration and to notify owners when locks are
+ * granted or lost.
+ */
+ private ScheduledExecutorService exsvc = null;
+
+ /**
+ * Data source used to connect to the DB.
+ */
+ private BasicDataSource dataSource = null;
+
+
+ /**
+ * Constructs the object.
+ */
+ public DistributedLockManager() {
+ this.hostName = NetworkUtil.getHostname();
+ }
+
+ @Override
+ public int getSequenceNumber() {
+ return 1000;
+ }
+
+ @Override
+ public boolean isAlive() {
+ return (exsvc != null);
+ }
+
+ @Override
+ public boolean start() {
+ // handled via engine API
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ // handled via engine API
+ return true;
+ }
+
+ @Override
+ public void shutdown() {
+ // handled via engine API
+ }
+
+ @Override
+ public boolean isLocked() {
+ return false;
+ }
+
+ @Override
+ public boolean lock() {
+ return true;
+ }
+
+ @Override
+ public boolean unlock() {
+ return true;
+ }
+
+ @Override
+ public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) {
+
+ try {
+ this.engine = engine;
+ this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME));
+ this.exsvc = getThreadPool();
+ this.dataSource = makeDataSource();
+
+ return this;
+
+ } catch (Exception e) {
+ throw new DistributedLockManagerException(e);
+ }
+ }
+
+ @Override
+ public boolean afterStart(PolicyEngine engine) {
+
+ try {
+ exsvc.execute(this::deleteExpiredDbLocks);
+ exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
+
+ setLatestInstance(this);
+
+ } catch (Exception e) {
+ throw new DistributedLockManagerException(e);
+ }
+
+ return false;
+ }
+
+ /**
+ * Make data source.
+ *
+ * @return a new, pooled data source
+ * @throws Exception exception
+ */
+ protected BasicDataSource makeDataSource() throws Exception {
+ Properties props = new Properties();
+ props.put("driverClassName", featProps.getDbDriver());
+ props.put("url", featProps.getDbUrl());
+ props.put("username", featProps.getDbUser());
+ props.put("password", featProps.getDbPwd());
+ props.put("testOnBorrow", "true");
+ props.put("poolPreparedStatements", "true");
+
+ // additional properties are listed in the GenericObjectPool API
+
+ return BasicDataSourceFactory.createDataSource(props);
+ }
+
+ /**
+ * Deletes expired locks from the DB.
+ */
+ private void deleteExpiredDbLocks() {
+ logger.info("deleting all expired locks from the DB");
+
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn
+ .prepareStatement("DELETE FROM pooling.locks WHERE expirationTime <= now()")) {
+
+ int ndel = stmt.executeUpdate();
+ logger.info("deleted {} expired locks from the DB", ndel);
+
+ } catch (SQLException e) {
+ logger.warn("failed to delete expired locks from the DB", e);
+ }
+ }
+
+ /**
+ * Closes the data source. Does <i>not</i> invoke any lock call-backs.
+ */
+ @Override
+ public boolean afterStop(PolicyEngine engine) {
+ exsvc = null;
+ closeDataSource();
+ return false;
+ }
+
+ /**
+ * Closes {@link #dataSource} and sets it to {@code null}.
+ */
+ private void closeDataSource() {
+ try {
+ if (dataSource != null) {
+ dataSource.close();
+ }
+
+ } catch (SQLException e) {
+ logger.error("cannot close the distributed locking DB", e);
+ }
+
+ dataSource = null;
+ }
+
+ @Override
+ public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback,
+ boolean waitForLock) {
+
+ if (latestInstance != this) {
+ AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback);
+ lock.notifyUnavailable();
+ return lock;
+ }
+
+ DistributedLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback);
+
+ DistributedLock existingLock = resource2lock.putIfAbsent(resourceId, lock);
+
+ // do these outside of compute() to avoid blocking other map operations
+ if (existingLock == null) {
+ logger.debug("added lock to map {}", lock);
+ lock.scheduleRequest(lock::doLock);
+ } else {
+ lock.deny("resource is busy", true);
+ }
+
+ return lock;
+ }
+
+ /**
+ * Checks for expired locks.
+ */
+ private void checkExpired() {
+
+ try {
+ logger.info("checking for expired locks");
+ Set<String> expiredIds = new HashSet<>(resource2lock.keySet());
+ identifyDbLocks(expiredIds);
+ expireLocks(expiredIds);
+
+ exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
+
+ } catch (RejectedExecutionException e) {
+ logger.warn("thread pool is no longer accepting requests", e);
+
+ } catch (SQLException | RuntimeException e) {
+ logger.error("error checking expired locks", e);
+ exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS);
+ }
+
+ logger.info("done checking for expired locks");
+ }
+
+ /**
+ * Identifies this feature instance's locks that the DB indicates are still active.
+ *
+ * @param expiredIds IDs of resources that have expired locks. If a resource is still
+ * locked, it's ID is removed from this set
+ * @throws SQLException if a DB error occurs
+ */
+ private void identifyDbLocks(Set<String> expiredIds) throws SQLException {
+ /*
+ * We could query for host and UUIDs that actually appear within the locks, but
+ * those might change while the query is running so no real value in doing that.
+ * On the other hand, there's only a brief instance between the time a
+ * deserialized lock is added to this feature instance and its doExtend() method
+ * updates its host and UUID to match this feature instance. If this happens to
+ * run during that brief instance, then the lock will be lost and the callback
+ * invoked. It isn't worth complicating this code further to handle those highly
+ * unlikely cases.
+ */
+
+ // @formatter:off
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn.prepareStatement(
+ "SELECT resourceId FROM pooling.locks WHERE host=? AND owner=? AND expirationTime > now()")) {
+ // @formatter:on
+
+ stmt.setString(1, hostName);
+ stmt.setString(2, uuidString);
+
+ try (ResultSet resultSet = stmt.executeQuery()) {
+ while (resultSet.next()) {
+ String resourceId = resultSet.getString(1);
+
+ // we have now seen this resource id
+ expiredIds.remove(resourceId);
+ }
+ }
+ }
+ }
+
+ /**
+ * Expires locks for the resources that no longer appear within the DB.
+ *
+ * @param expiredIds IDs of resources that have expired locks
+ */
+ private void expireLocks(Set<String> expiredIds) {
+ for (String resourceId : expiredIds) {
+ AtomicReference<DistributedLock> lockref = new AtomicReference<>(null);
+
+ resource2lock.computeIfPresent(resourceId, (key, lock) -> {
+ if (lock.isActive()) {
+ // it thinks it's active, but it isn't - remove from the map
+ lockref.set(lock);
+ return null;
+ }
+
+ return lock;
+ });
+
+ DistributedLock lock = lockref.get();
+ if (lock != null) {
+ logger.debug("removed lock from map {}", lock);
+ lock.deny(LOCK_LOST_MSG, false);
+ }
+ }
+ }
+
+ /**
+ * Distributed Lock implementation.
+ */
+ public static class DistributedLock extends LockImpl {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Feature containing this lock. May be {@code null} until the feature is
+ * identified. Note: this can only be null if the lock has been de-serialized.
+ */
+ private transient DistributedLockManager feature;
+
+ /**
+ * Host name from the feature instance that created this object. Replaced with the
+ * host name from the current feature instance whenever the lock is successfully
+ * extended.
+ */
+ private String hostName;
+
+ /**
+ * UUID string from the feature instance that created this object. Replaced with
+ * the UUID string from the current feature instance whenever the lock is
+ * successfully extended.
+ */
+ private String uuidString;
+
+ /**
+ * {@code True} if the lock is busy making a request, {@code false} otherwise.
+ */
+ private transient boolean busy = false;
+
+ /**
+ * Request to be performed.
+ */
+ private transient RunnableWithEx request = null;
+
+ /**
+ * Number of times we've retried a request.
+ */
+ private transient int nretries = 0;
+
+ /**
+ * Constructs the object.
+ */
+ public DistributedLock() {
+ this.hostName = "";
+ this.uuidString = "";
+ }
+
+ /**
+ * Constructs the object.
+ *
+ * @param state initial state of the lock
+ * @param resourceId identifier of the resource to be locked
+ * @param ownerKey information identifying the owner requesting the lock
+ * @param holdSec amount of time, in seconds, for which the lock should be held,
+ * after which it will automatically be released
+ * @param callback callback to be invoked once the lock is granted, or
+ * subsequently lost; must not be {@code null}
+ * @param feature feature containing this lock
+ */
+ public DistributedLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback,
+ DistributedLockManager feature) {
+ super(state, resourceId, ownerKey, holdSec, callback);
+
+ this.feature = feature;
+ this.hostName = feature.hostName;
+ this.uuidString = feature.uuidString;
+ }
+
+ /**
+ * Grants this lock. The notification is <i>always</i> invoked via the
+ * <i>foreground</i> thread.
+ */
+ protected void grant() {
+ synchronized (this) {
+ if (isUnavailable()) {
+ return;
+ }
+
+ setState(LockState.ACTIVE);
+ }
+
+ logger.info("lock granted: {}", this);
+
+ notifyAvailable();
+ }
+
+ /**
+ * Permanently denies this lock.
+ *
+ * @param reason the reason the lock was denied
+ * @param foreground {@code true} if the callback can be invoked in the current
+ * (i.e., foreground) thread, {@code false} if it should be invoked via the
+ * executor
+ */
+ protected void deny(String reason, boolean foreground) {
+ synchronized (this) {
+ setState(LockState.UNAVAILABLE);
+ }
+
+ logger.info("{}: {}", reason, this);
+
+ if (feature == null || foreground) {
+ notifyUnavailable();
+
+ } else {
+ feature.exsvc.execute(this::notifyUnavailable);
+ }
+ }
+
+ @Override
+ public boolean free() {
+ // do a quick check of the state
+ if (isUnavailable()) {
+ return false;
+ }
+
+ logger.info("releasing lock: {}", this);
+
+ if (!attachFeature()) {
+ setState(LockState.UNAVAILABLE);
+ return false;
+ }
+
+ AtomicBoolean result = new AtomicBoolean(false);
+
+ feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
+ if (curlock == this && !isUnavailable()) {
+ // this lock was the owner
+ result.set(true);
+ setState(LockState.UNAVAILABLE);
+
+ /*
+ * NOTE: do NOT return null; curlock must remain until doUnlock
+ * completes.
+ */
+ }
+
+ return curlock;
+ });
+
+ if (result.get()) {
+ scheduleRequest(this::doUnlock);
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public void extend(int holdSec, LockCallback callback) {
+ if (holdSec < 0) {
+ throw new IllegalArgumentException("holdSec is negative");
+ }
+
+ setHoldSec(holdSec);
+ setCallback(callback);
+
+ // do a quick check of the state
+ if (isUnavailable() || !attachFeature()) {
+ deny(LOCK_LOST_MSG, true);
+ return;
+ }
+
+ AtomicBoolean success = new AtomicBoolean(false);
+
+ feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
+ if (curlock == this && !isUnavailable()) {
+ success.set(true);
+ setState(LockState.WAITING);
+ }
+
+ // note: leave it in the map until doUnlock() removes it
+
+ return curlock;
+ });
+
+ if (success.get()) {
+ scheduleRequest(this::doExtend);
+
+ } else {
+ deny(NOT_LOCKED_MSG, true);
+ }
+ }
+
+ /**
+ * Attaches to the feature instance, if not already attached.
+ *
+ * @return {@code true} if the lock is now attached to a feature, {@code false}
+ * otherwise
+ */
+ private synchronized boolean attachFeature() {
+ if (feature != null) {
+ // already attached
+ return true;
+ }
+
+ feature = latestInstance;
+ if (feature == null) {
+ logger.warn("no feature yet for {}", this);
+ return false;
+ }
+
+ // put this lock into the map
+ feature.resource2lock.putIfAbsent(getResourceId(), this);
+
+ return true;
+ }
+
+ /**
+ * Schedules a request for execution.
+ *
+ * @param schedreq the request that should be scheduled
+ */
+ private synchronized void scheduleRequest(RunnableWithEx schedreq) {
+ logger.debug("schedule lock action {}", this);
+ nretries = 0;
+ request = schedreq;
+ feature.exsvc.execute(this::doRequest);
+ }
+
+ /**
+ * Reschedules a request for execution, if there is not already a request in the
+ * queue, and if the retry count has not been exhausted.
+ *
+ * @param req request to be rescheduled
+ */
+ private void rescheduleRequest(RunnableWithEx req) {
+ synchronized (this) {
+ if (request != null) {
+ // a new request has already been scheduled - it supersedes "req"
+ logger.debug("not rescheduling lock action {}", this);
+ return;
+ }
+
+ if (nretries++ < feature.featProps.getMaxRetries()) {
+ logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this);
+ request = req;
+ feature.exsvc.schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS);
+ return;
+ }
+ }
+
+ logger.warn("retry count {} exhausted for lock: {}", feature.featProps.getMaxRetries(), this);
+ removeFromMap();
+ }
+
+ /**
+ * Gets, and removes, the next request from the queue. Clears {@link #busy} if
+ * there are no more requests in the queue.
+ *
+ * @param prevReq the previous request that was just run
+ *
+ * @return the next request, or {@code null} if the queue is empty
+ */
+ private synchronized RunnableWithEx getNextRequest(RunnableWithEx prevReq) {
+ if (request == null || request == prevReq) {
+ logger.debug("no more requests for {}", this);
+ busy = false;
+ return null;
+ }
+
+ RunnableWithEx req = request;
+ request = null;
+
+ return req;
+ }
+
+ /**
+ * Executes the current request, if none are currently executing.
+ */
+ private void doRequest() {
+ synchronized (this) {
+ if (busy) {
+ // another thread is already processing the request(s)
+ return;
+ }
+ busy = true;
+ }
+
+ /*
+ * There is a race condition wherein this thread could invoke run() while the
+ * next scheduled thread checks the busy flag and finds that work is being
+ * done and returns, leaving the next work item in "request". In that case,
+ * the next work item may never be executed, thus we use a loop here, instead
+ * of just executing a single request.
+ */
+ RunnableWithEx req = null;
+ while ((req = getNextRequest(req)) != null) {
+ if (feature.resource2lock.get(getResourceId()) != this) {
+ /*
+ * no longer in the map - don't apply the action, as it may interfere
+ * with any newly added Lock object
+ */
+ logger.debug("discard lock action {}", this);
+ synchronized (this) {
+ busy = false;
+ }
+ return;
+ }
+
+ try {
+ /*
+ * Run the request. If it throws an exception, then it will be
+ * rescheduled for execution a little later.
+ */
+ req.run();
+
+ } catch (SQLException e) {
+ logger.warn("request failed for lock: {}", this, e);
+
+ if (feature.featProps.isTransient(e.getErrorCode())) {
+ // retry the request a little later
+ rescheduleRequest(req);
+ } else {
+ removeFromMap();
+ }
+
+ } catch (RuntimeException e) {
+ logger.warn("request failed for lock: {}", this, e);
+ removeFromMap();
+ }
+ }
+ }
+
+ /**
+ * Attempts to add a lock to the DB. Generates a callback, indicating success or
+ * failure.
+ *
+ * @throws SQLException if a DB error occurs
+ */
+ private void doLock() throws SQLException {
+ if (!isWaiting()) {
+ logger.debug("discard doLock {}", this);
+ return;
+ }
+
+ /*
+ * There is a small window in which a client could invoke free() before the DB
+ * is updated. In that case, doUnlock will be added to the queue to run after
+ * this, which will delete the record, as desired. In addition, grant() will
+ * not do anything, because the lock state will have been set to UNAVAILABLE
+ * by free().
+ */
+
+ logger.debug("doLock {}", this);
+ try (Connection conn = feature.dataSource.getConnection()) {
+ boolean success = false;
+ try {
+ success = doDbInsert(conn);
+
+ } catch (SQLException e) {
+ logger.info("failed to insert lock record - attempting update: {}", this, e);
+ success = doDbUpdate(conn);
+ }
+
+ if (success) {
+ grant();
+ return;
+ }
+ }
+
+ removeFromMap();
+ }
+
+ /**
+ * Attempts to remove a lock from the DB. Does <i>not</i> generate a callback if
+ * it fails, as this should only be executed in response to a call to
+ * {@link #free()}.
+ *
+ * @throws SQLException if a DB error occurs
+ */
+ private void doUnlock() throws SQLException {
+ logger.debug("unlock {}", this);
+ try (Connection conn = feature.dataSource.getConnection()) {
+ doDbDelete(conn);
+ }
+
+ removeFromMap();
+ }
+
+ /**
+ * Attempts to extend a lock in the DB. Generates a callback, indicating success
+ * or failure.
+ *
+ * @throws SQLException if a DB error occurs
+ */
+ private void doExtend() throws SQLException {
+ if (!isWaiting()) {
+ logger.debug("discard doExtend {}", this);
+ return;
+ }
+
+ /*
+ * There is a small window in which a client could invoke free() before the DB
+ * is updated. In that case, doUnlock will be added to the queue to run after
+ * this, which will delete the record, as desired. In addition, grant() will
+ * not do anything, because the lock state will have been set to UNAVAILABLE
+ * by free().
+ */
+
+ logger.debug("doExtend {}", this);
+ try (Connection conn = feature.dataSource.getConnection()) {
+ /*
+ * invoker may have called extend() before free() had a chance to insert
+ * the record, thus we have to try to insert, if the update fails
+ */
+ if (doDbUpdate(conn) || doDbInsert(conn)) {
+ grant();
+ return;
+ }
+ }
+
+ removeFromMap();
+ }
+
+ /**
+ * Inserts the lock into the DB.
+ *
+ * @param conn DB connection
+ * @return {@code true} if a record was successfully inserted, {@code false}
+ * otherwise
+ * @throws SQLException if a DB error occurs
+ */
+ protected boolean doDbInsert(Connection conn) throws SQLException {
+ logger.debug("insert lock record {}", this);
+ try (PreparedStatement stmt =
+ conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
+ + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
+
+ stmt.setString(1, getResourceId());
+ stmt.setString(2, feature.hostName);
+ stmt.setString(3, feature.uuidString);
+ stmt.setInt(4, getHoldSec());
+
+ stmt.executeUpdate();
+
+ this.hostName = feature.hostName;
+ this.uuidString = feature.uuidString;
+
+ return true;
+ }
+ }
+
+ /**
+ * Updates the lock in the DB.
+ *
+ * @param conn DB connection
+ * @return {@code true} if a record was successfully updated, {@code false}
+ * otherwise
+ * @throws SQLException if a DB error occurs
+ */
+ protected boolean doDbUpdate(Connection conn) throws SQLException {
+ logger.debug("update lock record {}", this);
+ try (PreparedStatement stmt =
+ conn.prepareStatement("UPDATE pooling.locks SET resourceId=?, host=?, owner=?,"
+ + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?"
+ + " AND ((host=? AND owner=?) OR expirationTime < now())")) {
+
+ stmt.setString(1, getResourceId());
+ stmt.setString(2, feature.hostName);
+ stmt.setString(3, feature.uuidString);
+ stmt.setInt(4, getHoldSec());
+
+ stmt.setString(5, getResourceId());
+ stmt.setString(6, this.hostName);
+ stmt.setString(7, this.uuidString);
+
+ if (stmt.executeUpdate() != 1) {
+ return false;
+ }
+
+ this.hostName = feature.hostName;
+ this.uuidString = feature.uuidString;
+
+ return true;
+ }
+ }
+
+ /**
+ * Deletes the lock from the DB.
+ *
+ * @param conn DB connection
+ * @throws SQLException if a DB error occurs
+ */
+ protected void doDbDelete(Connection conn) throws SQLException {
+ logger.debug("delete lock record {}", this);
+ try (PreparedStatement stmt =
+ conn.prepareStatement("DELETE pooling.locks WHERE resourceId=? AND host=? AND owner=?")) {
+
+ stmt.setString(1, getResourceId());
+ stmt.setString(2, this.hostName);
+ stmt.setString(3, this.uuidString);
+
+ stmt.executeUpdate();
+ }
+ }
+
+ /**
+ * Removes the lock from the map, and sends a notification using the current
+ * thread.
+ */
+ private void removeFromMap() {
+ logger.debug("remove lock from map {}", this);
+ feature.resource2lock.remove(getResourceId(), this);
+
+ synchronized (this) {
+ if (!isUnavailable()) {
+ deny(LOCK_LOST_MSG, true);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "DistributedLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey="
+ + getOwnerKey() + ", holdSec=" + getHoldSec() + ", hostName=" + hostName + ", uuidString="
+ + uuidString + "]";
+ }
+ }
+
+ @FunctionalInterface
+ private static interface RunnableWithEx {
+ void run() throws SQLException;
+ }
+
+ // these may be overridden by junit tests
+
+ protected Properties getProperties(String fileName) {
+ return SystemPersistenceConstants.getManager().getProperties(fileName);
+ }
+
+ protected ScheduledExecutorService getThreadPool() {
+ return engine.getExecutorService();
+ }
+
+ protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+ return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this);
+ }
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java
index 55fc4fab..e720f9a1 100644
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* feature-distributed-locking
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,15 +20,15 @@
package org.onap.policy.distributed.locking;
-public class DistributedLockingFeatureException extends RuntimeException {
+public class DistributedLockManagerException extends RuntimeException {
private static final long serialVersionUID = 1L;
/**
* Constructor.
- *
+ *
* @param ex exception to be wrapped
*/
- public DistributedLockingFeatureException(Exception ex) {
+ public DistributedLockManagerException(Exception ex) {
super(ex);
}
}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java
new file mode 100644
index 00000000..f470c8e2
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java
@@ -0,0 +1,136 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.utils.properties.BeanConfigurator;
+import org.onap.policy.common.utils.properties.Property;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+
+
+@Getter
+@Setter
+public class DistributedLockProperties {
+ public static final String PREFIX = "distributed.locking.";
+
+ public static final String DB_DRIVER = "javax.persistence.jdbc.driver";
+ public static final String DB_URL = "javax.persistence.jdbc.url";
+ public static final String DB_USER = "javax.persistence.jdbc.user";
+ public static final String DB_PASS = "javax.persistence.jdbc.password";
+ public static final String TRANSIENT_ERROR_CODES = PREFIX + "transient.error.codes";
+ public static final String EXPIRE_CHECK_SEC = PREFIX + "expire.check.seconds";
+ public static final String RETRY_SEC = PREFIX + "retry.seconds";
+ public static final String MAX_RETRIES = PREFIX + "max.retries";
+
+ /**
+ * Database driver.
+ */
+ @Property(name = DB_DRIVER)
+ private String dbDriver;
+
+ /**
+ * Database url.
+ */
+ @Property(name = DB_URL)
+ private String dbUrl;
+
+ /**
+ * Database user.
+ */
+ @Property(name = DB_USER)
+ private String dbUser;
+
+ /**
+ * Database password.
+ */
+ @Property(name = DB_PASS)
+ private String dbPwd;
+
+ /**
+ * Vendor-specific error codes that are "transient", meaning they may go away if the
+ * command is repeated (e.g., connection issue), as opposed to something like a syntax
+ * error or a duplicate key.
+ */
+ @Property(name = TRANSIENT_ERROR_CODES)
+ private String errorCodeStrings;
+
+ private final Set<Integer> transientErrorCodes;
+
+ /**
+ * Time, in seconds, to wait between checks for expired locks.
+ */
+ @Property(name = EXPIRE_CHECK_SEC, defaultValue = "900")
+ private int expireCheckSec;
+
+ /**
+ * Number of seconds to wait before retrying, after a DB error.
+ */
+ @Property(name = RETRY_SEC, defaultValue = "60")
+ private int retrySec;
+
+ /**
+ * Maximum number of times to retry a DB operation.
+ */
+ @Property(name = MAX_RETRIES, defaultValue = "2")
+ private int maxRetries;
+
+ /**
+ * Constructs the object, populating fields from the properties.
+ *
+ * @param props properties from which to configure this
+ * @throws PropertyException if an error occurs
+ */
+ public DistributedLockProperties(Properties props) throws PropertyException {
+ new BeanConfigurator().configureFromProperties(this, props);
+
+ Set<Integer> set = new HashSet<>();
+ for (String text : errorCodeStrings.split(",")) {
+ text = text.trim();
+ if (text.isEmpty()) {
+ continue;
+ }
+
+ try {
+ set.add(Integer.valueOf(text));
+
+ } catch (NumberFormatException e) {
+ throw new PropertyException(TRANSIENT_ERROR_CODES, "errorCodeStrings", e);
+ }
+ }
+
+ transientErrorCodes = Collections.unmodifiableSet(set);
+ }
+
+ /**
+ * Determines if an error is transient.
+ *
+ * @param errorCode error code to check
+ * @return {@code true} if the error is transient, {@code false} otherwise
+ */
+ public boolean isTransient(int errorCode) {
+ return transientErrorCodes.contains(errorCode);
+ }
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java
deleted file mode 100644
index d5e07a30..00000000
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Properties;
-import java.util.UUID;
-import org.apache.commons.dbcp2.BasicDataSource;
-import org.apache.commons.dbcp2.BasicDataSourceFactory;
-import org.onap.policy.common.utils.properties.exception.PropertyException;
-import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi;
-import org.onap.policy.drools.features.PolicyEngineFeatureApi;
-import org.onap.policy.drools.persistence.SystemPersistenceConstants;
-import org.onap.policy.drools.system.PolicyEngine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DistributedLockingFeature implements PolicyEngineFeatureApi, PolicyResourceLockFeatureApi {
-
- /**
- * Logger instance.
- */
- private static final Logger logger = LoggerFactory.getLogger(DistributedLockingFeature.class);
-
- /**
- * Properties Configuration Name.
- */
- public static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
-
- /**
- * Properties for locking feature.
- */
- private DistributedLockingProperties lockProps;
-
- /**
- * Data source used to connect to the DB containing locks.
- */
- private BasicDataSource dataSource;
-
- /**
- * UUID.
- */
- private static final UUID uuid = UUID.randomUUID();
-
- @Override
- public int getSequenceNumber() {
- return 1000;
- }
-
- @Override
- public OperResult beforeLock(String resourceId, String owner, int holdSec) {
-
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.lock(holdSec) ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeRefresh(String resourceId, String owner, int holdSec) {
-
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.refresh(holdSec) ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeUnlock(String resourceId, String owner) {
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.unlock() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeIsLockedBy(String resourceId, String owner) {
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.isActive() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeIsLocked(String resourceId) {
- TargetLock lock = new TargetLock(resourceId, uuid, "dummyOwner", dataSource);
-
- return (lock.isLocked() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public boolean afterStart(PolicyEngine engine) {
-
- try {
- this.lockProps = new DistributedLockingProperties(SystemPersistenceConstants.getManager()
- .getProperties(DistributedLockingFeature.CONFIGURATION_PROPERTIES_NAME));
- this.dataSource = makeDataSource();
- } catch (PropertyException e) {
- logger.error("DistributedLockingFeature feature properies have not been loaded", e);
- throw new DistributedLockingFeatureException(e);
- } catch (InterruptedException e) {
- logger.error("DistributedLockingFeature failed to create data source", e);
- Thread.currentThread().interrupt();
- throw new DistributedLockingFeatureException(e);
- } catch (Exception e) {
- logger.error("DistributedLockingFeature failed to create data source", e);
- throw new DistributedLockingFeatureException(e);
- }
-
- cleanLockTable();
-
- return false;
- }
-
- /**
- * Make data source.
- *
- * @return a new, pooled data source
- * @throws Exception exception
- */
- protected BasicDataSource makeDataSource() throws Exception {
- Properties props = new Properties();
- props.put("driverClassName", lockProps.getDbDriver());
- props.put("url", lockProps.getDbUrl());
- props.put("username", lockProps.getDbUser());
- props.put("password", lockProps.getDbPwd());
- props.put("testOnBorrow", "true");
- props.put("poolPreparedStatements", "true");
-
- // additional properties are listed in the GenericObjectPool API
-
- return BasicDataSourceFactory.createDataSource(props);
- }
-
- /**
- * This method kills the heartbeat thread and calls refreshLockTable which removes
- * any records from the db where the current host is the owner.
- */
- @Override
- public boolean beforeShutdown(PolicyEngine engine) {
- cleanLockTable();
- return false;
- }
-
- /**
- * This method removes all records owned by the current host from the db.
- */
- private void cleanLockTable() {
-
- try (Connection conn = dataSource.getConnection();
- PreparedStatement statement = conn.prepareStatement(
- "DELETE FROM pooling.locks WHERE host = ? OR expirationTime < now()")
- ) {
-
- statement.setString(1, uuid.toString());
- statement.executeUpdate();
-
- } catch (SQLException e) {
- logger.error("error in refreshLockTable()", e);
- }
-
- }
-}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java
deleted file mode 100644
index 0ed5930d..00000000
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import java.util.Properties;
-import org.onap.policy.common.utils.properties.BeanConfigurator;
-import org.onap.policy.common.utils.properties.Property;
-import org.onap.policy.common.utils.properties.exception.PropertyException;
-
-
-public class DistributedLockingProperties {
-
- /**
- * Feature properties all begin with this prefix.
- */
- public static final String PREFIX = "distributed.locking.";
-
- public static final String DB_DRIVER = "javax.persistence.jdbc.driver";
- public static final String DB_URL = "javax.persistence.jdbc.url";
- public static final String DB_USER = "javax.persistence.jdbc.user";
- public static final String DB_PWD = "javax.persistence.jdbc.password";
-
- /**
- * Properties from which this was constructed.
- */
- private final Properties source;
-
- /**
- * Database driver.
- */
- @Property(name = DB_DRIVER)
- private String dbDriver;
-
- /**
- * Database url.
- */
- @Property(name = DB_URL)
- private String dbUrl;
-
- /**
- * Database user.
- */
- @Property(name = DB_USER)
- private String dbUser;
-
- /**
- * Database password.
- */
- @Property(name = DB_PWD)
- private String dbPwd;
-
- /**
- * Constructs the object, populating fields from the properties.
- *
- * @param props properties from which to configure this
- * @throws PropertyException if an error occurs
- */
- public DistributedLockingProperties(Properties props) throws PropertyException {
- source = props;
-
- new BeanConfigurator().configureFromProperties(this, props);
- }
-
-
- public Properties getSource() {
- return source;
- }
-
-
- public String getDbDriver() {
- return dbDriver;
- }
-
-
- public String getDbUrl() {
- return dbUrl;
- }
-
-
- public String getDbUser() {
- return dbUser;
- }
-
-
- public String getDbPwd() {
- return dbPwd;
- }
-
-
- public void setDbDriver(String dbDriver) {
- this.dbDriver = dbDriver;
- }
-
-
- public void setDbUrl(String dbUrl) {
- this.dbUrl = dbUrl;
- }
-
-
- public void setDbUser(String dbUser) {
- this.dbUser = dbUser;
- }
-
-
- public void setDbPwd(String dbPwd) {
- this.dbPwd = dbPwd;
- }
-
-}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java
deleted file mode 100644
index 42e1f92f..00000000
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.UUID;
-import org.apache.commons.dbcp2.BasicDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TargetLock {
-
- private static final Logger logger = LoggerFactory.getLogger(TargetLock.class);
-
- /**
- * The Target resource we want to lock.
- */
- private String resourceId;
-
- /**
- * Data source used to connect to the DB containing locks.
- */
- private BasicDataSource dataSource;
-
- /**
- * UUID .
- */
- private UUID uuid;
-
- /**
- * Owner.
- */
- private String owner;
-
- /**
- * Constructs a TargetLock object.
- *
- * @param resourceId ID of the entity we want to lock
- * @param dataSource used to connect to the DB containing locks
- */
- public TargetLock(String resourceId, UUID uuid, String owner, BasicDataSource dataSource) {
- this.resourceId = resourceId;
- this.uuid = uuid;
- this.owner = owner;
- this.dataSource = dataSource;
- }
-
- /**
- * Obtain a lock.
- * @param holdSec the amount of time, in seconds, that the lock should be held
- */
- public boolean lock(int holdSec) {
-
- return grabLock(holdSec);
- }
-
- /**
- * Refresh a lock.
- *
- * @param holdSec the amount of time, in seconds, that the lock should be held
- * @return {@code true} if the lock was refreshed, {@code false} if the resource is
- * not currently locked by the given owner
- */
- public boolean refresh(int holdSec) {
- return updateLock(holdSec);
- }
-
- /**
- * Unlock a resource by deleting it's associated record in the db.
- */
- public boolean unlock() {
- return deleteLock();
- }
-
- /**
- * "Grabs" lock by attempting to insert a new record in the db.
- * If the insert fails due to duplicate key error resource is already locked
- * so we call secondGrab.
- * @param holdSec the amount of time, in seconds, that the lock should be held
- */
- private boolean grabLock(int holdSec) {
-
- // try to insert a record into the table(thereby grabbing the lock)
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement statement = conn.prepareStatement(
- "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
- + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
-
- int index = 1;
- statement.setString(index++, this.resourceId);
- statement.setString(index++, this.uuid.toString());
- statement.setString(index++, this.owner);
- statement.setInt(index++, holdSec);
- statement.executeUpdate();
- }
-
- catch (SQLException e) {
- logger.error("error in TargetLock.grabLock()", e);
- return secondGrab(holdSec);
- }
-
- return true;
- }
-
- /**
- * A second attempt at grabbing a lock. It first attempts to update the lock in case it is expired.
- * If that fails, it attempts to insert a new record again
- * @param holdSec the amount of time, in seconds, that the lock should be held
- */
- private boolean secondGrab(int holdSec) {
-
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement updateStatement = conn.prepareStatement(
- "UPDATE pooling.locks SET host = ?, owner = ?, "
- + "expirationTime = timestampadd(second, ?, now()) "
- + "WHERE resourceId = ? AND expirationTime < now()");
-
- PreparedStatement insertStatement = conn.prepareStatement(
- "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
- + "values (?, ?, ?, timestampadd(second, ?, now()))");) {
-
- int index = 1;
- updateStatement.setString(index++, this.uuid.toString());
- updateStatement.setString(index++, this.owner);
- updateStatement.setInt(index++, holdSec);
- updateStatement.setString(index++, this.resourceId);
-
- // The lock was expired and we grabbed it.
- // return true
- if (updateStatement.executeUpdate() == 1) {
- return true;
- }
-
- // If our update does not return 1 row, the lock either has not expired
- // or it was removed. Try one last grab
- else {
- index = 1;
- insertStatement.setString(index++, this.resourceId);
- insertStatement.setString(index++, this.uuid.toString());
- insertStatement.setString(index++, this.owner);
- insertStatement.setInt(index++, holdSec);
-
- // If our insert returns 1 we successfully grabbed the lock
- return (insertStatement.executeUpdate() == 1);
- }
-
- } catch (SQLException e) {
- logger.error("error in TargetLock.secondGrab()", e);
- return false;
- }
-
- }
-
- /**
- * Updates the DB record associated with the lock.
- *
- * @param holdSec the amount of time, in seconds, that the lock should be held
- * @return {@code true} if the record was updated, {@code false} otherwise
- */
- private boolean updateLock(int holdSec) {
-
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement updateStatement = conn.prepareStatement(
- "UPDATE pooling.locks SET host = ?, owner = ?, "
- + "expirationTime = timestampadd(second, ?, now()) "
- + "WHERE resourceId = ? AND owner = ? AND expirationTime >= now()")) {
-
- int index = 1;
- updateStatement.setString(index++, this.uuid.toString());
- updateStatement.setString(index++, this.owner);
- updateStatement.setInt(index++, holdSec);
- updateStatement.setString(index++, this.resourceId);
- updateStatement.setString(index++, this.owner);
-
- // refresh succeeded iff a record was updated
- return (updateStatement.executeUpdate() == 1);
-
- } catch (SQLException e) {
- logger.error("error in TargetLock.refreshLock()", e);
- return false;
- }
-
- }
-
- /**
- *To remove a lock we simply delete the record from the db .
- */
- private boolean deleteLock() {
-
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement deleteStatement = conn.prepareStatement(
- "DELETE FROM pooling.locks WHERE resourceId = ? AND owner = ? AND host = ?")) {
-
- deleteStatement.setString(1, this.resourceId);
- deleteStatement.setString(2, this.owner);
- deleteStatement.setString(3, this.uuid.toString());
-
- return (deleteStatement.executeUpdate() == 1);
-
- } catch (SQLException e) {
- logger.error("error in TargetLock.deleteLock()", e);
- return false;
- }
-
- }
-
- /**
- * Is the lock active.
- */
- public boolean isActive() {
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement selectStatement = conn.prepareStatement(
- "SELECT * FROM pooling.locks "
- + "WHERE resourceId = ? AND host = ? AND owner= ? AND expirationTime >= now()")) {
-
- selectStatement.setString(1, this.resourceId);
- selectStatement.setString(2, this.uuid.toString());
- selectStatement.setString(3, this.owner);
- try (ResultSet result = selectStatement.executeQuery()) {
-
- // This will return true if the
- // query returned at least one row
- return result.first();
- }
-
- }
-
- catch (SQLException e) {
- logger.error("error in TargetLock.isActive()", e);
- return false;
- }
-
- }
-
- /**
- * Is the resource locked.
- */
- public boolean isLocked() {
-
- try (Connection conn = dataSource.getConnection();
- PreparedStatement selectStatement = conn
- .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND expirationTime >= now()")) {
-
- selectStatement.setString(1, this.resourceId);
- try (ResultSet result = selectStatement.executeQuery()) {
- // This will return true if the
- // query returned at least one row
- return result.first();
- }
- } catch (SQLException e) {
- logger.error("error in TargetLock.isActive()", e);
- return false;
- }
- }
-
-}
diff --git a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi
deleted file mode 100644
index 19bdf505..00000000
--- a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi
+++ /dev/null
@@ -1 +0,0 @@
-org.onap.policy.distributed.locking.DistributedLockingFeature \ No newline at end of file
diff --git a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi
index 19bdf505..2a7c0547 100644
--- a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi
+++ b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi
@@ -1 +1 @@
-org.onap.policy.distributed.locking.DistributedLockingFeature \ No newline at end of file
+org.onap.policy.distributed.locking.DistributedLockManager \ No newline at end of file
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureExceptionTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerExceptionTest.java
index 7ba2384a..cfd6a151 100644
--- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureExceptionTest.java
+++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerExceptionTest.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* feature-distributed-locking
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,13 +24,13 @@ import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.onap.policy.common.utils.test.ExceptionsTester;
-import org.onap.policy.distributed.locking.DistributedLockingFeatureException;
+import org.onap.policy.distributed.locking.DistributedLockManagerException;
-public class DistributedLockingFeatureExceptionTest extends ExceptionsTester {
+public class DistributedLockManagerExceptionTest extends ExceptionsTester {
@Test
public void test() {
- assertEquals(1, test(DistributedLockingFeatureException.class));
+ assertEquals(1, test(DistributedLockManagerException.class));
}
}
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java
new file mode 100644
index 00000000..59b56224
--- /dev/null
+++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java
@@ -0,0 +1,1818 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.utils.services.OrderedServiceImpl;
+import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.LockState;
+import org.onap.policy.drools.features.PolicyEngineFeatureApi;
+import org.onap.policy.drools.persistence.SystemPersistenceConstants;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.onap.policy.drools.system.PolicyEngineConstants;
+import org.powermock.reflect.Whitebox;
+
+public class DistributedLockManagerTest {
+ private static final long EXPIRE_SEC = 900L;
+ private static final long RETRY_SEC = 60L;
+ private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
+ private static final String OTHER_HOST = "other-host";
+ private static final String OTHER_OWNER = "other-owner";
+ private static final String EXPECTED_EXCEPTION = "expected exception";
+ private static final String DB_CONNECTION =
+ "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
+ private static final String DB_USER = "user";
+ private static final String DB_PASSWORD = "password";
+ private static final String OWNER_KEY = "my key";
+ private static final String RESOURCE = "my resource";
+ private static final String RESOURCE2 = "my resource #2";
+ private static final String RESOURCE3 = "my resource #3";
+ private static final String RESOURCE4 = "my resource #4";
+ private static final String RESOURCE5 = "my resource #5";
+ private static final int HOLD_SEC = 100;
+ private static final int HOLD_SEC2 = 120;
+ private static final int MAX_THREADS = 5;
+ private static final int MAX_LOOPS = 100;
+ private static final int TRANSIENT = 500;
+ private static final int PERMANENT = 600;
+
+ // number of execute() calls before the first lock attempt
+ private static final int PRE_LOCK_EXECS = 1;
+
+ // number of execute() calls before the first schedule attempt
+ private static final int PRE_SCHED_EXECS = 1;
+
+ private static Connection conn = null;
+ private static ScheduledExecutorService saveExec;
+ private static ScheduledExecutorService realExec;
+
+ @Mock
+ private ScheduledExecutorService exsvc;
+
+ @Mock
+ private LockCallback callback;
+
+ @Mock
+ private BasicDataSource datasrc;
+
+ @Mock
+ private PolicyEngine engine;
+
+ private DistributedLock lock;
+
+ private AtomicInteger nactive;
+ private AtomicInteger nsuccesses;
+ private DistributedLockManager feature;
+
+
+ /**
+ * Configures the location of the property files and creates the DB.
+ *
+ * @throws SQLException if the DB cannot be created
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws SQLException {
+ SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
+
+ conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
+
+ try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
+ + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
+ + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
+ createStmt.executeUpdate();
+ }
+
+ saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
+
+ realExec = Executors.newScheduledThreadPool(3);
+ Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
+ }
+
+ /**
+ * Restores static fields.
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws SQLException {
+ Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
+ realExec.shutdown();
+ conn.close();
+ }
+
+ /**
+ * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
+ * tasks.
+ *
+ * @throws SQLException if the lock records cannot be deleted from the DB
+ */
+ @Before
+ public void setUp() throws SQLException {
+ MockitoAnnotations.initMocks(this);
+
+ nactive = new AtomicInteger(0);
+ nsuccesses = new AtomicInteger(0);
+
+ cleanDb();
+
+ when(engine.getExecutorService()).thenReturn(exsvc);
+
+ feature = new MyLockingFeature(true);
+ }
+
+ @After
+ public void tearDown() throws SQLException {
+ shutdownFeature();
+ cleanDb();
+ }
+
+ private void cleanDb() throws SQLException {
+ try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
+ stmt.executeUpdate();
+ }
+ }
+
+ private void shutdownFeature() {
+ if (feature != null) {
+ feature.afterStop(engine);
+ feature = null;
+ }
+ }
+
+ /**
+ * Tests that the feature is found in the expected service sets.
+ */
+ @Test
+ public void testServiceApis() {
+ assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
+ .anyMatch(obj -> obj instanceof DistributedLockManager));
+ }
+
+ /**
+ * Tests constructor() when properties are invalid.
+ */
+ @Test
+ public void testDistributedLockManagerInvalidProperties() {
+ // use properties containing an invalid value
+ Properties props = new Properties();
+ props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "abc");
+
+ feature = new MyLockingFeature(false) {
+ @Override
+ protected Properties getProperties(String fileName) {
+ return props;
+ }
+ };
+
+ assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
+ }
+
+ @Test
+ public void testGetSequenceNumber() {
+ assertEquals(1000, feature.getSequenceNumber());
+ }
+
+ @Test
+ public void testStartableApi() {
+ assertTrue(feature.isAlive());
+ assertTrue(feature.start());
+ assertTrue(feature.stop());
+ feature.shutdown();
+
+ // above should have had no effect
+ assertTrue(feature.isAlive());
+
+ feature.afterStop(engine);
+ assertFalse(feature.isAlive());
+ }
+
+ @Test
+ public void testLockApi() {
+ assertFalse(feature.isLocked());
+ assertTrue(feature.lock());
+ assertTrue(feature.unlock());
+ }
+
+ @Test
+ public void testBeforeCreateLockManager() {
+ assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
+ }
+
+ /**
+ * Tests beforeCreate(), when getProperties() throws a runtime exception.
+ */
+ @Test
+ public void testBeforeCreateLockManagerEx() {
+ shutdownFeature();
+
+ feature = new MyLockingFeature(false) {
+ @Override
+ protected Properties getProperties(String fileName) {
+ throw new IllegalArgumentException(EXPECTED_EXCEPTION);
+ }
+ };
+
+ assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, new Properties()))
+ .isInstanceOf(DistributedLockManagerException.class);
+ }
+
+ @Test
+ public void testAfterStart() {
+ // verify that cleanup & expire check are both added to the queue
+ verify(exsvc).execute(any());
+ verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
+ }
+
+ /**
+ * Tests afterStart(), when thread pool throws a runtime exception.
+ */
+ @Test
+ public void testAfterStartExInThreadPool() {
+ shutdownFeature();
+
+ feature = new MyLockingFeature(false);
+
+ when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
+ .thenThrow(new IllegalArgumentException(EXPECTED_EXCEPTION));
+
+ assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
+ }
+
+ @Test
+ public void testDeleteExpiredDbLocks() throws SQLException {
+ // add records: two expired, one not
+ insertRecord(RESOURCE, feature.getUuidString(), -1);
+ insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
+ insertRecord(RESOURCE3, OTHER_OWNER, 0);
+ insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
+
+ // get the clean-up function and execute it
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc).execute(captor.capture());
+
+ long tbegin = System.currentTimeMillis();
+ Runnable action = captor.getValue();
+ action.run();
+
+ assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
+ assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
+ assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
+ assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
+
+ assertEquals(2, getRecordCount());
+ }
+
+ /**
+ * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDeleteExpiredDbLocksEx() throws SQLException {
+ feature = new InvalidDbLockingFeature(TRANSIENT);
+
+ // get the clean-up function and execute it
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc).execute(captor.capture());
+
+ Runnable action = captor.getValue();
+
+ // should not throw an exception
+ action.run();
+ }
+
+ @Test
+ public void testAfterStop() {
+ shutdownFeature();
+
+ feature = new DistributedLockManager();
+
+ // shutdown without calling afterStart()
+
+ shutdownFeature();
+ }
+
+ /**
+ * Tests afterStop(), when the data source throws an exception when close() is called.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testAfterStopEx() throws SQLException {
+ shutdownFeature();
+
+ // use a data source that throws an exception when closed
+ feature = new InvalidDbLockingFeature(TRANSIENT);
+
+ shutdownFeature();
+ }
+
+ @Test
+ public void testCreateLock() throws SQLException {
+ verify(exsvc).execute(any());
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertTrue(lock.isWaiting());
+
+ verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
+
+ // this lock should fail
+ LockCallback callback2 = mock(LockCallback.class);
+ DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
+ assertTrue(lock2.isUnavailable());
+ verify(callback2, never()).lockAvailable(lock2);
+ verify(callback2).lockUnavailable(lock2);
+
+ // this should fail, too
+ LockCallback callback3 = mock(LockCallback.class);
+ DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
+ assertTrue(lock3.isUnavailable());
+ verify(callback3, never()).lockAvailable(lock3);
+ verify(callback3).lockUnavailable(lock3);
+
+ // no change to first
+ assertTrue(lock.isWaiting());
+
+ // no callbacks to the first lock
+ verify(callback, never()).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ assertTrue(lock.isWaiting());
+ assertEquals(0, getRecordCount());
+
+ runLock(0, 0);
+ assertTrue(lock.isActive());
+ assertEquals(1, getRecordCount());
+
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ // this should succeed
+ DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
+ assertTrue(lock4.isWaiting());
+
+ // after running checker, original records should still remain
+ runChecker(0, 0, EXPIRE_SEC);
+ assertEquals(1, getRecordCount());
+ verify(callback, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests lock() when the feature is not the latest instance.
+ */
+ @Test
+ public void testCreateLockNotLatestInstance() {
+ DistributedLockManager.setLatestInstance(null);
+
+ Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertTrue(lock.isUnavailable());
+ verify(callback, never()).lockAvailable(any());
+ verify(callback).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testCheckExpired() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
+ runLock(1, 0);
+
+ LockCallback callback3 = mock(LockCallback.class);
+ final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
+ runLock(2, 0);
+
+ LockCallback callback4 = mock(LockCallback.class);
+ final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
+ runLock(3, 0);
+
+ LockCallback callback5 = mock(LockCallback.class);
+ final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
+ runLock(4, 0);
+
+ assertEquals(5, getRecordCount());
+
+ // expire one record
+ updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
+
+ // change host of another record
+ updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
+
+ // change uuid of another record
+ updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC);
+
+
+ // run the checker
+ runChecker(0, 0, EXPIRE_SEC);
+
+
+ // check lock states
+ assertTrue(lock.isUnavailable());
+ assertTrue(lock2.isActive());
+ assertTrue(lock3.isUnavailable());
+ assertTrue(lock4.isActive());
+ assertTrue(lock5.isUnavailable());
+
+ // allow callbacks
+ runLock(5, 2);
+ runLock(6, 1);
+ runLock(7, 0);
+ verify(callback).lockUnavailable(lock);
+ verify(callback3).lockUnavailable(lock3);
+ verify(callback5).lockUnavailable(lock5);
+
+ verify(callback2, never()).lockUnavailable(lock2);
+ verify(callback4, never()).lockUnavailable(lock4);
+
+
+ // another check should have been scheduled, with the normal interval
+ runChecker(1, 0, EXPIRE_SEC);
+ }
+
+ /**
+ * Tests checkExpired(), when schedule() throws an exception.
+ */
+ @Test
+ public void testCheckExpiredExecRejected() {
+ // arrange for execution to be rejected
+ when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
+ .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
+
+ runChecker(0, 0, EXPIRE_SEC);
+ }
+
+ /**
+ * Tests checkExpired(), when getConnection() throws an exception.
+ */
+ @Test
+ public void testCheckExpiredSqlEx() {
+ // use a data source that throws an exception when getConnection() is called
+ feature = new InvalidDbLockingFeature(TRANSIENT);
+
+ runChecker(0, 0, EXPIRE_SEC);
+
+ // it should have scheduled another check, sooner
+ runChecker(0, 0, RETRY_SEC);
+ }
+
+ @Test
+ public void testExpireLocks() throws SQLException {
+ AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
+
+ feature = new MyLockingFeature(true) {
+ @Override
+ protected BasicDataSource makeDataSource() throws Exception {
+ // get the real data source
+ BasicDataSource src2 = super.makeDataSource();
+
+ when(datasrc.getConnection()).thenAnswer(answer -> {
+ DistributedLock lck = freeLock.getAndSet(null);
+ if (lck != null) {
+ // free it
+ lck.free();
+
+ // run its doUnlock
+ runLock(4, 0);
+ }
+
+ return src2.getConnection();
+ });
+
+ return datasrc;
+ }
+ };
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
+ runLock(1, 0);
+
+ LockCallback callback3 = mock(LockCallback.class);
+ final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
+ // don't run doLock for lock3 - leave it in the waiting state
+
+ LockCallback callback4 = mock(LockCallback.class);
+ final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
+ runLock(3, 0);
+
+ assertEquals(3, getRecordCount());
+
+ // expire one record
+ updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
+
+ // arrange to free lock4 while the checker is running
+ freeLock.set(lock4);
+
+ // run the checker
+ runChecker(0, 0, EXPIRE_SEC);
+
+
+ // check lock states
+ assertTrue(lock.isUnavailable());
+ assertTrue(lock2.isActive());
+ assertTrue(lock3.isWaiting());
+ assertTrue(lock4.isUnavailable());
+
+ runLock(5, 0);
+ verify(exsvc, times(PRE_LOCK_EXECS + 6)).execute(any());
+
+ verify(callback).lockUnavailable(lock);
+ verify(callback2, never()).lockUnavailable(lock2);
+ verify(callback3, never()).lockUnavailable(lock3);
+ verify(callback4, never()).lockUnavailable(lock4);
+ }
+
+ @Test
+ public void testDistributedLockNoArgs() {
+ DistributedLock lock = new DistributedLock();
+ assertNull(lock.getResourceId());
+ assertNull(lock.getOwnerKey());
+ assertNull(lock.getCallback());
+ assertEquals(0, lock.getHoldSec());
+ }
+
+ @Test
+ public void testDistributedLock() {
+ assertThatIllegalArgumentException()
+ .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
+ .withMessageContaining("holdSec");
+
+ // should generate no exception
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ }
+
+ @Test
+ public void testDistributedLockSerializable() throws Exception {
+ DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ lock = roundTrip(lock);
+
+ assertTrue(lock.isWaiting());
+
+ assertEquals(RESOURCE, lock.getResourceId());
+ assertEquals(OWNER_KEY, lock.getOwnerKey());
+ assertNull(lock.getCallback());
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+ }
+
+ @Test
+ public void testGrant() {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertFalse(lock.isActive());
+
+ // execute the doLock() call
+ runLock(0, 0);
+
+ assertTrue(lock.isActive());
+
+ // the callback for the lock should have been run in the foreground thread
+ verify(callback).lockAvailable(lock);
+ }
+
+ /**
+ * Tests grant() when the lock is already unavailable.
+ */
+ @Test
+ public void testDistributedLockGrantUnavailable() {
+ DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ lock.setState(LockState.UNAVAILABLE);
+ lock.grant();
+
+ assertTrue(lock.isUnavailable());
+ verify(callback, never()).lockAvailable(any());
+ verify(callback, never()).lockUnavailable(any());
+ }
+
+ @Test
+ public void testDistributedLockDeny() {
+ // get a lock
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // get another lock - should fail
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ assertTrue(lock.isUnavailable());
+
+ // the callback for the second lock should have been run in the foreground thread
+ verify(callback).lockUnavailable(lock);
+
+ // should only have a request for the first lock
+ verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
+ }
+
+ @Test
+ public void testDistributedLockFree() {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ assertTrue(lock.free());
+ assertTrue(lock.isUnavailable());
+
+ // run both requests associated with the lock
+ runLock(0, 1);
+ runLock(1, 0);
+
+ // should not have changed state
+ assertTrue(lock.isUnavailable());
+
+ // attempt to free it again
+ assertFalse(lock.free());
+
+ // should not have queued anything else
+ verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
+
+ // new lock should succeed
+ DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertTrue(lock2 != lock);
+ assertTrue(lock2.isWaiting());
+ }
+
+ /**
+ * Tests that free() works on a serialized lock with a new feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testDistributedLockFreeSerialized() throws Exception {
+ DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ feature = new MyLockingFeature(true);
+
+ lock = roundTrip(lock);
+ assertTrue(lock.free());
+ assertTrue(lock.isUnavailable());
+ }
+
+ /**
+ * Tests free() on a serialized lock without a feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testDistributedLockFreeNoFeature() throws Exception {
+ DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ DistributedLockManager.setLatestInstance(null);
+
+ lock = roundTrip(lock);
+ assertFalse(lock.free());
+ assertTrue(lock.isUnavailable());
+ }
+
+ /**
+ * Tests the case where the lock is freed and doUnlock called between the call to
+ * isUnavailable() and the call to compute().
+ */
+ @Test
+ public void testDistributedLockFreeUnlocked() {
+ feature = new FreeWithFreeLockingFeature(true);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ assertFalse(lock.free());
+ assertTrue(lock.isUnavailable());
+ }
+
+ /**
+ * Tests the case where the lock is freed, but doUnlock is not completed, between the
+ * call to isUnavailable() and the call to compute().
+ */
+ @Test
+ public void testDistributedLockFreeLockFreed() {
+ feature = new FreeWithFreeLockingFeature(false);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ assertFalse(lock.free());
+ assertTrue(lock.isUnavailable());
+ }
+
+ @Test
+ public void testDistributedLockExtend() {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // lock2 should be denied - called back by this thread
+ DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ verify(callback, never()).lockAvailable(lock2);
+ verify(callback).lockUnavailable(lock2);
+
+ // lock2 will still be denied - called back by this thread
+ lock2.extend(HOLD_SEC, callback);
+ verify(callback, times(2)).lockUnavailable(lock2);
+
+ // force lock2 to be active - should still be denied
+ Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
+ lock2.extend(HOLD_SEC, callback);
+ verify(callback, times(3)).lockUnavailable(lock2);
+
+ assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
+ .withMessageContaining("holdSec");
+
+ // execute doLock()
+ runLock(0, 0);
+ assertTrue(lock.isActive());
+
+ // now extend the first lock
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+ assertTrue(lock.isWaiting());
+
+ // execute doExtend()
+ runLock(1, 0);
+ lock.extend(HOLD_SEC2, callback2);
+ assertEquals(HOLD_SEC2, lock.getHoldSec());
+ verify(callback2).lockAvailable(lock);
+ verify(callback2, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests that extend() works on a serialized lock with a new feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testDistributedLockExtendSerialized() throws Exception {
+ DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // run doLock
+ runLock(0, 0);
+ assertTrue(lock.isActive());
+
+ feature = new MyLockingFeature(true);
+
+ lock = roundTrip(lock);
+ assertTrue(lock.isActive());
+
+ LockCallback scallback = mock(LockCallback.class);
+
+ lock.extend(HOLD_SEC, scallback);
+ assertTrue(lock.isWaiting());
+
+ // run doExtend (in new feature)
+ runLock(0, 0);
+ assertTrue(lock.isActive());
+
+ verify(scallback).lockAvailable(lock);
+ verify(scallback, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests extend() on a serialized lock without a feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testDistributedLockExtendNoFeature() throws Exception {
+ DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // run doLock
+ runLock(0, 0);
+ assertTrue(lock.isActive());
+
+ DistributedLockManager.setLatestInstance(null);
+
+ lock = roundTrip(lock);
+ assertTrue(lock.isActive());
+
+ LockCallback scallback = mock(LockCallback.class);
+
+ lock.extend(HOLD_SEC, scallback);
+ assertTrue(lock.isUnavailable());
+
+ verify(scallback, never()).lockAvailable(lock);
+ verify(scallback).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests the case where the lock is freed and doUnlock called between the call to
+ * isUnavailable() and the call to compute().
+ */
+ @Test
+ public void testDistributedLockExtendUnlocked() {
+ feature = new FreeWithFreeLockingFeature(true);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ lock.extend(HOLD_SEC2, callback);
+ assertTrue(lock.isUnavailable());
+ verify(callback).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests the case where the lock is freed, but doUnlock is not completed, between the
+ * call to isUnavailable() and the call to compute().
+ */
+ @Test
+ public void testDistributedLockExtendLockFreed() {
+ feature = new FreeWithFreeLockingFeature(false);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ lock.extend(HOLD_SEC2, callback);
+ assertTrue(lock.isUnavailable());
+ verify(callback).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testDistributedLockScheduleRequest() {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ verify(callback).lockAvailable(lock);
+ }
+
+ @Test
+ public void testDistributedLockRescheduleRequest() throws SQLException {
+ // use a data source that throws an exception when getConnection() is called
+ InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
+ feature = invfeat;
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should fail and reschedule
+ runLock(0, 0);
+
+ // should still be waiting
+ assertTrue(lock.isWaiting());
+ verify(callback, never()).lockUnavailable(lock);
+
+ // free the lock while doLock is executing
+ invfeat.freeLock = true;
+
+ // try scheduled request - should just invoke doUnlock
+ runSchedule(0, 0);
+
+ // should still be waiting
+ assertTrue(lock.isUnavailable());
+ verify(callback, never()).lockUnavailable(lock);
+
+ // should have scheduled a retry of doUnlock
+ verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
+ }
+
+ @Test
+ public void testDistributedLockGetNextRequest() {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ /*
+ * run doLock. This should cause getNextRequest() to be called twice, once with a
+ * request in the queue, and the second time with request=null.
+ */
+ runLock(0, 0);
+ }
+
+ /**
+ * Tests getNextRequest(), where the same request is still in the queue the second
+ * time it's called.
+ */
+ @Test
+ public void testDistributedLockGetNextRequestSameRequest() {
+ // force reschedule to be invoked
+ feature = new InvalidDbLockingFeature(TRANSIENT);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ /*
+ * run doLock. This should cause getNextRequest() to be called twice, once with a
+ * request in the queue, and the second time with the same request again.
+ */
+ runLock(0, 0);
+
+ verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
+ }
+
+ @Test
+ public void testDistributedLockDoRequest() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ assertTrue(lock.isWaiting());
+
+ // run doLock via doRequest
+ runLock(0, 0);
+
+ assertTrue(lock.isActive());
+ }
+
+ /**
+ * Tests doRequest(), when doRequest() is already running within another thread.
+ */
+ @Test
+ public void testDistributedLockDoRequestBusy() {
+ /*
+ * this feature will invoke a request in a background thread while it's being run
+ * in a foreground thread.
+ */
+ AtomicBoolean running = new AtomicBoolean(false);
+ AtomicBoolean returned = new AtomicBoolean(false);
+
+ feature = new MyLockingFeature(true) {
+ @Override
+ protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+ return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean doDbInsert(Connection conn) throws SQLException {
+ if (running.get()) {
+ // already inside the thread - don't recurse any further
+ return super.doDbInsert(conn);
+ }
+
+ running.set(true);
+
+ Thread thread = new Thread(() -> {
+ // run doLock from within the new thread
+ runLock(0, 0);
+ });
+ thread.setDaemon(true);
+ thread.start();
+
+ // wait for the background thread to complete before continuing
+ try {
+ thread.join(5000);
+ } catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+ }
+
+ returned.set(!thread.isAlive());
+
+ return super.doDbInsert(conn);
+ }
+ };
+ }
+ };
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // run doLock
+ runLock(0, 0);
+
+ assertTrue(returned.get());
+ }
+
+ /**
+ * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
+ // throw run-time exception
+ when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
+
+ // use a data source that throws an exception when getConnection() is called
+ feature = new MyLockingFeature(true) {
+ @Override
+ protected BasicDataSource makeDataSource() throws Exception {
+ return datasrc;
+ }
+ };
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should NOT reschedule
+ runLock(0, 0);
+
+ assertTrue(lock.isUnavailable());
+ verify(callback).lockUnavailable(lock);
+
+ verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
+ }
+
+ /**
+ * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
+ * state.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
+ // throw run-time exception
+ when(datasrc.getConnection()).thenAnswer(answer -> {
+ lock.free();
+ throw new IllegalStateException(EXPECTED_EXCEPTION);
+ });
+
+ // use a data source that throws an exception when getConnection() is called
+ feature = new MyLockingFeature(true) {
+ @Override
+ protected BasicDataSource makeDataSource() throws Exception {
+ return datasrc;
+ }
+ };
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should NOT reschedule
+ runLock(0, 0);
+
+ assertTrue(lock.isUnavailable());
+ verify(callback, never()).lockUnavailable(lock);
+
+ verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
+ }
+
+ /**
+ * Tests doRequest() when the retry count gets exhausted.
+ */
+ @Test
+ public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
+ // use a data source that throws an exception when getConnection() is called
+ feature = new InvalidDbLockingFeature(TRANSIENT);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should fail and reschedule
+ runLock(0, 0);
+
+ // should still be waiting
+ assertTrue(lock.isWaiting());
+ verify(callback, never()).lockUnavailable(lock);
+
+ // try again, via SCHEDULER - first retry fails
+ runSchedule(0, 0);
+
+ // should still be waiting
+ assertTrue(lock.isWaiting());
+ verify(callback, never()).lockUnavailable(lock);
+
+ // try again, via SCHEDULER - final retry fails
+ runSchedule(1, 0);
+ assertTrue(lock.isUnavailable());
+
+ // now callback should have been called
+ verify(callback).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests doRequest() when a non-transient DB exception is thrown.
+ */
+ @Test
+ public void testDistributedLockDoRequestNotTransient() {
+ /*
+ * use a data source that throws a PERMANENT exception when getConnection() is
+ * called
+ */
+ feature = new InvalidDbLockingFeature(PERMANENT);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should fail
+ runLock(0, 0);
+
+ assertTrue(lock.isUnavailable());
+ verify(callback).lockUnavailable(lock);
+
+ // should not have scheduled anything new
+ verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
+ verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
+ }
+
+ @Test
+ public void testDistributedLockDoLock() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should simply do an insert
+ long tbegin = System.currentTimeMillis();
+ runLock(0, 0);
+
+ assertEquals(1, getRecordCount());
+ assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
+ verify(callback).lockAvailable(lock);
+ }
+
+ /**
+ * Tests doLock() when the lock is freed before doLock runs.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoLockFreed() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ lock.setState(LockState.UNAVAILABLE);
+
+ // invoke doLock - should do nothing
+ runLock(0, 0);
+
+ assertEquals(0, getRecordCount());
+
+ verify(callback, never()).lockAvailable(lock);
+ }
+
+ /**
+ * Tests doLock() when a DB exception is thrown.
+ */
+ @Test
+ public void testDistributedLockDoLockEx() {
+ // use a data source that throws an exception when getConnection() is called
+ feature = new InvalidDbLockingFeature(PERMANENT);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should simply do an insert
+ runLock(0, 0);
+
+ // lock should have failed due to exception
+ verify(callback).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
+ * to be called.
+ */
+ @Test
+ public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
+ // insert an expired record
+ insertRecord(RESOURCE, feature.getUuidString(), 0);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should simply do an update
+ runLock(0, 0);
+ verify(callback).lockAvailable(lock);
+ }
+
+ /**
+ * Tests doLock() when a locked record already exists.
+ */
+ @Test
+ public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
+ // insert an expired record
+ insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock
+ runLock(0, 0);
+
+ // lock should have failed because it's already locked
+ verify(callback).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testDistributedLockDoUnlock() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock()
+ runLock(0, 0);
+
+ lock.free();
+
+ // invoke doUnlock()
+ long tbegin = System.currentTimeMillis();
+ runLock(1, 0);
+
+ assertEquals(0, getRecordCount());
+ assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
+
+ assertTrue(lock.isUnavailable());
+
+ // no more callbacks should have occurred
+ verify(callback, times(1)).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests doUnlock() when a DB exception is thrown.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoUnlockEx() throws SQLException {
+ feature = new InvalidDbLockingFeature(PERMANENT);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // do NOT invoke doLock() - it will fail without a DB connection
+
+ lock.free();
+
+ // invoke doUnlock()
+ runLock(1, 0);
+
+ assertTrue(lock.isUnavailable());
+
+ // no more callbacks should have occurred
+ verify(callback, never()).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testDistributedLockDoExtend() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+
+ // call doExtend()
+ long tbegin = System.currentTimeMillis();
+ runLock(1, 0);
+
+ assertEquals(1, getRecordCount());
+ assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
+
+ assertTrue(lock.isActive());
+
+ // no more callbacks should have occurred
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ // extension should have succeeded
+ verify(callback2).lockAvailable(lock);
+ verify(callback2, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests doExtend() when the lock is freed before doExtend runs.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoExtendFreed() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ lock.extend(HOLD_SEC2, callback);
+
+ lock.setState(LockState.UNAVAILABLE);
+
+ // invoke doExtend - should do nothing
+ runLock(1, 0);
+
+ assertEquals(0, getRecordCount());
+
+ verify(callback, never()).lockAvailable(lock);
+ }
+
+ /**
+ * Tests doExtend() when the lock record is missing from the DB, thus requiring an
+ * insert.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+
+ // delete the record so it's forced to re-insert it
+ cleanDb();
+
+ // call doExtend()
+ long tbegin = System.currentTimeMillis();
+ runLock(1, 0);
+
+ assertEquals(1, getRecordCount());
+ assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
+
+ assertTrue(lock.isActive());
+
+ // no more callbacks should have occurred
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ // extension should have succeeded
+ verify(callback2).lockAvailable(lock);
+ verify(callback2, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests doExtend() when both update and insert fail.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException {
+ /*
+ * this feature will create a lock that returns false when doDbUpdate() is
+ * invoked, or when doDbInsert() is invoked a second time
+ */
+ feature = new MyLockingFeature(true) {
+ @Override
+ protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+ return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
+ private static final long serialVersionUID = 1L;
+ private int ntimes = 0;
+
+ @Override
+ protected boolean doDbInsert(Connection conn) throws SQLException {
+ if (ntimes++ > 0) {
+ return false;
+ }
+
+ return super.doDbInsert(conn);
+ }
+
+ @Override
+ protected boolean doDbUpdate(Connection conn) {
+ return false;
+ }
+ };
+ }
+ };
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+
+ // call doExtend()
+ runLock(1, 0);
+
+ assertTrue(lock.isUnavailable());
+
+ // no more callbacks should have occurred
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ // extension should have failed
+ verify(callback2, never()).lockAvailable(lock);
+ verify(callback2).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests doExtend() when an exception occurs.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoExtendEx() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ /*
+ * delete the record and insert one with a different owner, which will cause
+ * doDbInsert() to throw an exception
+ */
+ cleanDb();
+ insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+
+ // call doExtend()
+ runLock(1, 0);
+
+ assertTrue(lock.isUnavailable());
+
+ // no more callbacks should have occurred
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ // extension should have failed
+ verify(callback2, never()).lockAvailable(lock);
+ verify(callback2).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testDistributedLockToString() {
+ String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
+ assertNotNull(text);
+ assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
+ }
+
+ @Test
+ public void testMakeThreadPool() {
+ // use a REAL feature to test this
+ feature = new DistributedLockManager();
+
+ // this should create a thread pool
+ feature.beforeCreateLockManager(engine, new Properties());
+ feature.afterStart(engine);
+
+ shutdownFeature();
+ }
+
+ /**
+ * Performs a multi-threaded test of the locking facility.
+ *
+ * @throws InterruptedException if the current thread is interrupted while waiting for
+ * the background threads to complete
+ */
+ @Test
+ public void testMultiThreaded() throws InterruptedException {
+ feature = new DistributedLockManager();
+ feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
+ feature.afterStart(PolicyEngineConstants.getManager());
+
+ List<MyThread> threads = new ArrayList<>(MAX_THREADS);
+ for (int x = 0; x < MAX_THREADS; ++x) {
+ threads.add(new MyThread());
+ }
+
+ threads.forEach(Thread::start);
+
+ for (MyThread thread : threads) {
+ thread.join(6000);
+ assertFalse(thread.isAlive());
+ }
+
+ for (MyThread thread : threads) {
+ if (thread.err != null) {
+ throw thread.err;
+ }
+ }
+
+ assertTrue(nsuccesses.get() > 0);
+ }
+
+ private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
+ boolean waitForLock) {
+ return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
+ }
+
+ private DistributedLock roundTrip(DistributedLock lock) throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(lock);
+ }
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ try (ObjectInputStream ois = new ObjectInputStream(bais)) {
+ return (DistributedLock) ois.readObject();
+ }
+ }
+
+ /**
+ * Runs the checkExpired() action.
+ *
+ * @param nskip number of actions in the work queue to skip
+ * @param nadditional number of additional actions that appear in the work queue
+ * <i>after</i> the checkExpired action
+ * @param schedSec number of seconds for which the checker should have been scheduled
+ */
+ private void runChecker(int nskip, int nadditional, long schedSec) {
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
+ Runnable action = captor.getAllValues().get(nskip);
+ action.run();
+ }
+
+ /**
+ * Runs a lock action (e.g., doLock, doUnlock).
+ *
+ * @param nskip number of actions in the work queue to skip
+ * @param nadditional number of additional actions that appear in the work queue
+ * <i>after</i> the desired action
+ */
+ void runLock(int nskip, int nadditional) {
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
+
+ Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
+ action.run();
+ }
+
+ /**
+ * Runs a scheduled action (e.g., "retry" action).
+ *
+ * @param nskip number of actions in the work queue to skip
+ * @param nadditional number of additional actions that appear in the work queue
+ * <i>after</i> the desired action
+ */
+ void runSchedule(int nskip, int nadditional) {
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
+
+ Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
+ action.run();
+ }
+
+ /**
+ * Gets a count of the number of lock records in the DB.
+ *
+ * @return the number of lock records in the DB
+ * @throws SQLException if an error occurs accessing the DB
+ */
+ private int getRecordCount() throws SQLException {
+ try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
+ ResultSet result = stmt.executeQuery()) {
+
+ if (result.next()) {
+ return result.getInt(1);
+
+ } else {
+ return 0;
+ }
+ }
+ }
+
+ /**
+ * Determines if there is a record for the given resource whose expiration time is in
+ * the expected range.
+ *
+ * @param resourceId ID of the resource of interest
+ * @param uuidString UUID string of the owner
+ * @param holdSec seconds for which the lock was to be held
+ * @param tbegin earliest time, in milliseconds, at which the record could have been
+ * inserted into the DB
+ * @return {@code true} if a record is found, {@code false} otherwise
+ * @throws SQLException if an error occurs accessing the DB
+ */
+ private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
+ try (PreparedStatement stmt =
+ conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
+ + " WHERE resourceId=? AND host=? AND owner=?")) {
+
+ stmt.setString(1, resourceId);
+ stmt.setString(2, feature.getHostName());
+ stmt.setString(3, uuidString);
+
+ try (ResultSet result = stmt.executeQuery()) {
+ if (result.next()) {
+ int remaining = result.getInt(1);
+ long maxDiff = System.currentTimeMillis() - tbegin;
+ return (remaining >= 0 && holdSec - remaining <= maxDiff);
+
+ } else {
+ return false;
+ }
+ }
+ }
+ }
+
+ /**
+ * Inserts a record into the DB.
+ *
+ * @param resourceId ID of the resource of interest
+ * @param uuidString UUID string of the owner
+ * @param expireOffset offset, in seconds, from "now", at which the lock should expire
+ * @throws SQLException if an error occurs accessing the DB
+ */
+ private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
+ this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset);
+ }
+
+ private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
+ throws SQLException {
+ try (PreparedStatement stmt =
+ conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
+ + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
+
+ stmt.setString(1, resourceId);
+ stmt.setString(2, hostName);
+ stmt.setString(3, uuidString);
+ stmt.setInt(4, expireOffset);
+
+ assertEquals(1, stmt.executeUpdate());
+ }
+ }
+
+ /**
+ * Updates a record in the DB.
+ *
+ * @param resourceId ID of the resource of interest
+ * @param newUuid UUID string of the <i>new</i> owner
+ * @param expireOffset offset, in seconds, from "now", at which the lock should expire
+ * @throws SQLException if an error occurs accessing the DB
+ */
+ private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
+ try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
+ + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
+
+ stmt.setString(1, newHost);
+ stmt.setString(2, newUuid);
+ stmt.setInt(3, expireOffset);
+ stmt.setString(4, resourceId);
+
+ assertEquals(1, stmt.executeUpdate());
+ }
+ }
+
+ /**
+ * Feature that uses <i>exsvc</i> to execute requests.
+ */
+ private class MyLockingFeature extends DistributedLockManager {
+
+ public MyLockingFeature(boolean init) {
+ shutdownFeature();
+
+ exsvc = mock(ScheduledExecutorService.class);
+ when(engine.getExecutorService()).thenReturn(exsvc);
+
+ if (init) {
+ beforeCreateLockManager(engine, new Properties());
+ afterStart(engine);
+ }
+ }
+ }
+
+ /**
+ * Feature whose data source all throws exceptions.
+ */
+ private class InvalidDbLockingFeature extends MyLockingFeature {
+ private int errcode;
+ private boolean freeLock = false;
+
+ public InvalidDbLockingFeature(int errcode) {
+ // pass "false" because we have to set the error code BEFORE calling
+ // afterStart()
+ super(false);
+
+ this.errcode = errcode;
+
+ this.beforeCreateLockManager(engine, new Properties());
+ this.afterStart(engine);
+ }
+
+ @Override
+ protected BasicDataSource makeDataSource() throws Exception {
+ when(datasrc.getConnection()).thenAnswer(answer -> {
+ if (freeLock) {
+ freeLock = false;
+ lock.free();
+ }
+
+ throw new SQLException(EXPECTED_EXCEPTION, "", errcode);
+ });
+
+ doThrow(new SQLException(EXPECTED_EXCEPTION, "", errcode)).when(datasrc).close();
+
+ return datasrc;
+ }
+ }
+
+ /**
+ * Feature whose locks free themselves while free() is already running.
+ */
+ private class FreeWithFreeLockingFeature extends MyLockingFeature {
+ private boolean relock;
+
+ public FreeWithFreeLockingFeature(boolean relock) {
+ super(true);
+ this.relock = relock;
+ }
+
+ @Override
+ protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+
+ return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
+ private static final long serialVersionUID = 1L;
+ private boolean checked = false;
+
+ @Override
+ public boolean isUnavailable() {
+ if (checked) {
+ return super.isUnavailable();
+ }
+
+ checked = true;
+
+ // release and relock
+ free();
+
+ if (relock) {
+ // run doUnlock
+ runLock(1, 0);
+
+ // relock it
+ createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
+ }
+
+ return false;
+ }
+ };
+ }
+ }
+
+ /**
+ * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
+ * extend it, and then unlock it.
+ */
+ private class MyThread extends Thread {
+ AssertionError err = null;
+
+ public MyThread() {
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (int x = 0; x < MAX_LOOPS; ++x) {
+ makeAttempt();
+ }
+
+ } catch (AssertionError e) {
+ err = e;
+ }
+ }
+
+ private void makeAttempt() {
+ try {
+ Semaphore sem = new Semaphore(0);
+
+ LockCallback cb = new LockCallback() {
+ @Override
+ public void lockAvailable(Lock lock) {
+ sem.release();
+ }
+
+ @Override
+ public void lockUnavailable(Lock lock) {
+ sem.release();
+ }
+ };
+
+ Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
+
+ // wait for callback, whether available or unavailable
+ assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
+ if (!lock.isActive()) {
+ return;
+ }
+
+ nsuccesses.incrementAndGet();
+
+ assertEquals(1, nactive.incrementAndGet());
+
+ lock.extend(HOLD_SEC2, cb);
+ assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
+ assertTrue(lock.isActive());
+
+ // decrement BEFORE free()
+ nactive.decrementAndGet();
+
+ assertTrue(lock.free());
+ assertTrue(lock.isUnavailable());
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new AssertionError("interrupted", e);
+ }
+ }
+ }
+}
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java
new file mode 100644
index 00000000..5f76d657
--- /dev/null
+++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Properties;
+import java.util.TreeSet;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+
+public class DistributedLockPropertiesTest {
+
+ private Properties props;
+
+ /**
+ * Populates {@link #props}.
+ */
+ @Before
+ public void setUp() {
+ props = new Properties();
+
+ props.setProperty(DistributedLockProperties.DB_DRIVER, "my driver");
+ props.setProperty(DistributedLockProperties.DB_URL, "my url");
+ props.setProperty(DistributedLockProperties.DB_USER, "my user");
+ props.setProperty(DistributedLockProperties.DB_PASS, "my pass");
+ props.setProperty(DistributedLockProperties.TRANSIENT_ERROR_CODES, "10,-20,,,30");
+ props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "100");
+ props.setProperty(DistributedLockProperties.RETRY_SEC, "200");
+ props.setProperty(DistributedLockProperties.MAX_RETRIES, "300");
+ }
+
+ @Test
+ public void test() throws PropertyException {
+ DistributedLockProperties dlp = new DistributedLockProperties(props);
+
+ assertEquals("my driver", dlp.getDbDriver());
+ assertEquals("my url", dlp.getDbUrl());
+ assertEquals("my user", dlp.getDbUser());
+ assertEquals("my pass", dlp.getDbPwd());
+ assertEquals("10,-20,,,30", dlp.getErrorCodeStrings());
+ assertEquals("[-20, 10, 30]", new TreeSet<>(dlp.getTransientErrorCodes()).toString());
+ assertEquals(100, dlp.getExpireCheckSec());
+ assertEquals(200, dlp.getRetrySec());
+ assertEquals(300, dlp.getMaxRetries());
+
+ assertTrue(dlp.isTransient(10));
+ assertTrue(dlp.isTransient(-20));
+ assertTrue(dlp.isTransient(30));
+
+ assertFalse(dlp.isTransient(-10));
+
+ // invalid value
+ props.setProperty(DistributedLockProperties.TRANSIENT_ERROR_CODES, "10,abc,30");
+
+ assertThatThrownBy(() -> new DistributedLockProperties(props)).isInstanceOf(PropertyException.class)
+ .hasMessageContaining(DistributedLockProperties.TRANSIENT_ERROR_CODES);
+ }
+}
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java
deleted file mode 100644
index 68a5a31b..00000000
--- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.sql.SQLException;
-import org.apache.commons.dbcp2.BasicDataSource;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.onap.policy.common.utils.properties.exception.PropertyException;
-import org.onap.policy.drools.persistence.SystemPersistenceConstants;
-
-/**
- * Partially tests DistributedLockingFeature; most of the methods are tested via
- * {@link TargetLockTest}.
- */
-public class DistributedLockingFeatureTest {
- private static final String EXPECTED = "expected exception";
-
- private BasicDataSource dataSrc;
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
- }
-
- @Before
- public void setUp() throws Exception {
- dataSrc = mock(BasicDataSource.class);
- }
-
- @Test
- public void testGetSequenceNumber() {
- assertEquals(1000, new DistributedLockingFeature().getSequenceNumber());
- }
-
- @Test(expected = DistributedLockingFeatureException.class)
- public void testAfterStart_PropEx() {
- new DistributedLockingFeatureImpl(new PropertyException("prop", "val")).afterStart(null);
- }
-
- @Test(expected = DistributedLockingFeatureException.class)
- public void testAfterStart_InterruptEx() {
- new DistributedLockingFeatureImpl(new InterruptedException(EXPECTED)).afterStart(null);
- }
-
- @Test(expected = DistributedLockingFeatureException.class)
- public void testAfterStart_OtherEx() {
- new DistributedLockingFeatureImpl(new RuntimeException(EXPECTED)).afterStart(null);
- }
-
- @Test
- public void testCleanLockTable() throws Exception {
- when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED));
-
- new DistributedLockingFeatureImpl().afterStart(null);
- }
-
- /**
- * Feature that overrides {@link #makeDataSource()}.
- */
- private class DistributedLockingFeatureImpl extends DistributedLockingFeature {
- /**
- * Exception to throw when {@link #makeDataSource()} is invoked.
- */
- private final Exception makeEx;
-
- public DistributedLockingFeatureImpl() {
- makeEx = null;
- }
-
- public DistributedLockingFeatureImpl(Exception ex) {
- this.makeEx = ex;
- }
-
- @Override
- protected BasicDataSource makeDataSource() throws Exception {
- if (makeEx != null) {
- throw makeEx;
- }
-
- return dataSrc;
- }
- }
-}
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java
deleted file mode 100644
index 84eba6b6..00000000
--- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import org.apache.commons.dbcp2.BasicDataSource;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi.OperResult;
-import org.onap.policy.drools.persistence.SystemPersistenceConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TargetLockTest {
- private static final Logger logger = LoggerFactory.getLogger(TargetLockTest.class);
- private static final int MAX_AGE_SEC = 4 * 60;
- private static final String DB_CONNECTION =
- "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
- private static final String DB_USER = "user";
- private static final String DB_PASSWORD = "password";
- private static final String EXPECTED = "expected exception";
- private static final String MY_RESOURCE = "my-resource-id";
- private static final String MY_OWNER = "my-owner";
- private static final UUID MY_UUID = UUID.randomUUID();
- private static Connection conn = null;
- private static DistributedLockingFeature distLockFeat;
-
- /**
- * Setup the database.
- */
- @BeforeClass
- public static void setup() {
- getDbConnection();
- createTable();
- SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
- distLockFeat = new DistributedLockingFeature();
- distLockFeat.afterStart(null);
- }
-
- /**
- * Cleanup the lock.
- */
- @AfterClass
- public static void cleanUp() {
- distLockFeat.beforeShutdown(null);
- try {
- conn.close();
- } catch (SQLException e) {
- logger.error("Error in TargetLockTest.cleanUp()", e);
- }
- }
-
- /**
- * Wipe the database.
- */
- @Before
- public void wipeDb() {
- try (PreparedStatement lockDelete = conn.prepareStatement("DELETE FROM pooling.locks"); ) {
- lockDelete.executeUpdate();
- } catch (SQLException e) {
- logger.error("Error in TargetLockTest.wipeDb()", e);
- throw new RuntimeException(e);
- }
- }
-
- @Test
- public void testGrabLockSuccess() throws InterruptedException, ExecutionException {
- assertEquals(
- OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC));
-
- // attempt to grab expiredLock
- try (PreparedStatement updateStatement =
- conn.prepareStatement(
- "UPDATE pooling.locks SET expirationTime = timestampadd(second, -1, now()) WHERE resourceId = ?"); ) {
- updateStatement.setString(1, "resource1");
- updateStatement.executeUpdate();
-
- } catch (SQLException e) {
- logger.error("Error in TargetLockTest.testGrabLockSuccess()", e);
- throw new RuntimeException(e);
- }
-
- assertEquals(
- OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC));
-
- // cannot re-lock
- assertEquals(
- OperResult.OPER_DENIED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC));
-
- assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
- }
-
- @Test
- public void testExpiredLocks() throws Exception {
-
- // grab lock
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
-
- // force lock to expire
- try (PreparedStatement lockExpire =
- conn.prepareStatement(
- "UPDATE pooling.locks SET expirationTime = timestampadd(second, -?, now())"); ) {
- lockExpire.setInt(1, MAX_AGE_SEC + 1);
- lockExpire.executeUpdate();
- }
-
- assertEquals(
- OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner2", MAX_AGE_SEC));
- }
-
- @Test
- public void testGrabLockFail() throws InterruptedException, ExecutionException {
-
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
-
- assertEquals(
- OperResult.OPER_DENIED, distLockFeat.beforeLock("resource1", "owner2", MAX_AGE_SEC));
- }
-
- @Test
- public void testSecondGrab_UpdateOk() throws Exception {
- PreparedStatement grabLockInsert = mock(PreparedStatement.class);
- when(grabLockInsert.executeUpdate()).thenThrow(new SQLException(EXPECTED));
-
- PreparedStatement secondGrabUpdate = mock(PreparedStatement.class);
- when(secondGrabUpdate.executeUpdate()).thenReturn(1);
-
- Connection connMock = mock(Connection.class);
- when(connMock.prepareStatement(anyString())).thenReturn(grabLockInsert, secondGrabUpdate);
-
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenReturn(connMock);
-
- assertTrue(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).lock(MAX_AGE_SEC));
- }
-
- @Test
- public void testSecondGrab_UpdateFail_InsertOk() throws Exception {
- PreparedStatement grabLockInsert = mock(PreparedStatement.class);
- when(grabLockInsert.executeUpdate()).thenThrow(new SQLException(EXPECTED));
-
- PreparedStatement secondGrabUpdate = mock(PreparedStatement.class);
- when(secondGrabUpdate.executeUpdate()).thenReturn(0);
-
- PreparedStatement secondGrabInsert = mock(PreparedStatement.class);
- when(secondGrabInsert.executeUpdate()).thenReturn(1);
-
- Connection connMock = mock(Connection.class);
- when(connMock.prepareStatement(anyString())).thenReturn(grabLockInsert, secondGrabUpdate, secondGrabInsert);
-
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenReturn(connMock);
-
- assertTrue(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).lock(MAX_AGE_SEC));
- }
-
- @Test
- public void testSecondGrab_UpdateFail_InsertFail() throws Exception {
- PreparedStatement grabLockInsert = mock(PreparedStatement.class);
- when(grabLockInsert.executeUpdate()).thenThrow(new SQLException(EXPECTED));
-
- PreparedStatement secondGrabUpdate = mock(PreparedStatement.class);
- when(secondGrabUpdate.executeUpdate()).thenReturn(0);
-
- PreparedStatement secondGrabInsert = mock(PreparedStatement.class);
- when(secondGrabInsert.executeUpdate()).thenReturn(0);
-
- Connection connMock = mock(Connection.class);
- when(connMock.prepareStatement(anyString())).thenReturn(grabLockInsert, secondGrabUpdate, secondGrabInsert);
-
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenReturn(connMock);
-
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).lock(MAX_AGE_SEC));
- }
-
- @Test
- public void testUpdateLock() throws Exception {
- // not locked yet - refresh should fail
- assertEquals(
- OperResult.OPER_DENIED, distLockFeat.beforeRefresh("resource1", "owner1", MAX_AGE_SEC));
-
- // now lock it
- assertEquals(
- OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC));
-
- // refresh should work now
- assertEquals(
- OperResult.OPER_ACCEPTED, distLockFeat.beforeRefresh("resource1", "owner1", MAX_AGE_SEC));
-
- assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
-
- // expire the lock
- try (PreparedStatement updateStatement =
- conn.prepareStatement(
- "UPDATE pooling.locks SET expirationTime = timestampadd(second, -1, now()) WHERE resourceId = ?"); ) {
- updateStatement.setString(1, "resource1");
- updateStatement.executeUpdate();
- }
-
- // refresh should fail now
- assertEquals(
- OperResult.OPER_DENIED, distLockFeat.beforeRefresh("resource1", "owner1", MAX_AGE_SEC));
-
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
-
- // test exception case
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED));
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).refresh(MAX_AGE_SEC));
- }
-
- @Test
- public void testUnlock() throws Exception {
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
-
- assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeUnlock("resource1", "owner1"));
- assertEquals(
- OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner2", MAX_AGE_SEC));
-
- // test exception case
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED));
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).unlock());
- }
-
- @Test
- public void testIsActive() throws Exception {
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
- assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner2"));
-
- // isActive on expiredLock
- try (PreparedStatement updateStatement =
- conn.prepareStatement(
- "UPDATE pooling.locks SET expirationTime = timestampadd(second, -5, now()) WHERE resourceId = ?"); ) {
- updateStatement.setString(1, "resource1");
- updateStatement.executeUpdate();
- }
-
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
-
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
- // Unlock record, next isActive attempt should fail
- distLockFeat.beforeUnlock("resource1", "owner1");
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
-
- // test exception case for outer "try"
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED));
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isActive());
-
- // test exception case for inner "try"
- PreparedStatement stmt = mock(PreparedStatement.class);
- when(stmt.executeQuery()).thenThrow(new SQLException(EXPECTED));
- Connection connMock = mock(Connection.class);
- when(connMock.prepareStatement(anyString())).thenReturn(stmt);
- dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenReturn(connMock);
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isActive());
- }
-
- @Test
- public void unlockBeforeLock() {
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeUnlock("resource1", "owner1"));
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
- assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeUnlock("resource1", "owner1"));
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeUnlock("resource1", "owner1"));
- }
-
- @Test
- public void testIsLocked() throws Exception {
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLocked("resource1"));
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
- assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLocked("resource1"));
-
- // test exception case for outer "try"
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED));
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isLocked());
-
- // test exception case for inner "try"
- PreparedStatement stmt = mock(PreparedStatement.class);
- when(stmt.executeQuery()).thenThrow(new SQLException(EXPECTED));
- Connection connMock = mock(Connection.class);
- when(connMock.prepareStatement(anyString())).thenReturn(stmt);
- dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenReturn(connMock);
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isLocked());
- }
-
- private static void getDbConnection() {
- try {
- conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
- } catch (SQLException e) {
- logger.error("Error in TargetLockTest.getDBConnection()", e);
- }
- }
-
- private static void createTable() {
- String createString =
- "create table if not exists pooling.locks "
- + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
- + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))";
- try (PreparedStatement createStmt = conn.prepareStatement(createString); ) {
- createStmt.executeUpdate();
-
- } catch (SQLException e) {
- logger.error("Error in TargetLockTest.createTable()", e);
- }
- }
-}
diff --git a/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties b/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties
index d1a07e82..061cc608 100644
--- a/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties
+++ b/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties
@@ -2,14 +2,14 @@
# ============LICENSE_START=======================================================
# feature-distributed-locking
# ================================================================================
-# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,5 +22,8 @@ javax.persistence.jdbc.driver=org.h2.Driver
javax.persistence.jdbc.url=jdbc:h2:mem:pooling
javax.persistence.jdbc.user=user
javax.persistence.jdbc.password=password
-distributed.locking.lock.aging=150
-distributed.locking.heartbeat.interval=500 \ No newline at end of file
+
+distributed.locking.transient.error.codes=500
+distributed.locking.expire.check.seconds=900
+distributed.locking.retry.seconds=60
+distributed.locking.max.retries=2