aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2019-09-24 10:51:21 -0400
committerJim Hahn <jrh3@att.com>2019-10-17 15:40:32 -0400
commit6e0b450abe7e62fa47ffe14e95a67d035174dbdb (patch)
treee91c7bfb7365f9a06ad5674bc83e041b7237e378
parent1528214803af722cd660b7c4a3129f3de5b4ea7f (diff)
Reimplement Lock API using Lock objects
Modified PolicyResourceLockManager to just return a feature, deferring the lock() call/method to the feature, itself. The manager was also modified so that, if it can't find an enabled provider, it will return a default provider, whose lock() methods always fail. Once a feature has been identified, the manager will cache it for use thereafter. Modified the feature API to return lock objects and simplified the interface to remove the beforeXxx and afterXxx methods. Moved the unlock and refresh methods from the feature API into the lock class, renaming them to free and extend, respectively. Added a separate, feature-simple-locking project, which implements a simple version of the locking feature, over a single JVM. Extensively revised the distributed locking feature to fit in with the new API. Added support for persistence so that the various LockImpl classes can be serialized and still function correctly when they are deserialized back into new feature instances Added default implementations of free & extend to LockImpl. Modified API to take the ownerKey string, instead of the owner object. Removed Extractor as unneeded - may add via another review, if still useful. Updates per review comments: - Updated licenses in feature-simple-locking - Added beforeCreateLock & afterCreateLock to feature API - Moved SimpleLockingFeature into policy-management so that it's always available - Moved the executor service, "exsvc", into PolicyEngine - Moved Extrator into policy-utils - Changed Extractor logging level for exceptions - Fixed feature sequence numbers - Fixed mixing of seconds and milliseconds - Renamed exsvc - Modified to use property method with default value - Configured scheduled executor - Added suffix to Extractor.register() - Eliminated Feature Api and tied lock manager into engine - Added non-null checks to LockImpl parameters - Added non-null checks to createLock() parameters - Checked that lockManager is initialized Change-Id: Iddba38157ddc5f7277656979c0e679e5489eb7b1 Issue-ID: POLICY-2113 Signed-off-by: Jim Hahn <jrh3@att.com>
-rw-r--r--feature-distributed-locking/pom.xml7
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java936
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java (renamed from feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java)12
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java136
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java179
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java127
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java282
-rw-r--r--feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi1
-rw-r--r--feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi2
-rw-r--r--feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerExceptionTest.java (renamed from feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureExceptionTest.java)12
-rw-r--r--feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java1818
-rw-r--r--feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java81
-rw-r--r--feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java107
-rw-r--r--feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java345
-rw-r--r--feature-distributed-locking/src/test/resources/feature-distributed-locking.properties13
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/lock/AlwaysFailLock.java71
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/lock/Lock.java164
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/lock/LockCallback.java (renamed from policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiConstants.java)31
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/lock/LockImpl.java158
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/lock/LockState.java43
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApi.java168
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockManager.java227
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/lock/SimpleLockManager.java384
-rw-r--r--policy-core/src/test/java/org/onap/policy/drools/core/lock/AlwaysFailLockTest.java106
-rw-r--r--policy-core/src/test/java/org/onap/policy/drools/core/lock/LockImplTest.java261
-rw-r--r--policy-core/src/test/java/org/onap/policy/drools/core/lock/LockTest.java134
-rw-r--r--policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiTest.java92
-rw-r--r--policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockManagerTest.java595
-rw-r--r--policy-core/src/test/java/org/onap/policy/drools/core/lock/SimpleLockManagerTest.java527
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java23
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java33
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java139
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java426
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManagerException.java34
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockProperties.java52
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java164
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerExceptionTest.java35
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java781
38 files changed, 5406 insertions, 3300 deletions
diff --git a/feature-distributed-locking/pom.xml b/feature-distributed-locking/pom.xml
index af777efa..1b6063b0 100644
--- a/feature-distributed-locking/pom.xml
+++ b/feature-distributed-locking/pom.xml
@@ -2,7 +2,7 @@
============LICENSE_START=======================================================
ONAP Policy Engine - Drools PDP
================================================================================
- Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
================================================================================
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -117,6 +117,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
<scope>test</scope>
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java
new file mode 100644
index 00000000..523c0d93
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java
@@ -0,0 +1,936 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.commons.dbcp2.BasicDataSourceFactory;
+import org.onap.policy.common.utils.network.NetworkUtil;
+import org.onap.policy.drools.core.lock.AlwaysFailLock;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.LockImpl;
+import org.onap.policy.drools.core.lock.LockState;
+import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
+import org.onap.policy.drools.features.PolicyEngineFeatureApi;
+import org.onap.policy.drools.persistence.SystemPersistenceConstants;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Distributed implementation of the Lock Feature. Maintains locks across servers using a
+ * shared DB.
+ *
+ * <p/>
+ * Note: this implementation does <i>not</i> honor the waitForLocks={@code true}
+ * parameter.
+ *
+ * <p/>
+ * Additional Notes:
+ * <dl>
+ * <li>The <i>owner</i> field in the DB is not derived from the lock's owner info, but is
+ * instead populated with the {@link #uuidString}.</li>
+ * <li>A periodic check of the DB is made to determine if any of the locks have
+ * expired.</li>
+ * <li>When a lock is deserialized, it will not initially appear in this feature's map; it
+ * will be added to the map once free() or extend() is invoked, provided there isn't
+ * already an entry. In addition, it initially has the host and UUID of the feature
+ * instance that created it. However, as soon as doExtend() completes successfully, the
+ * host and UUID of the lock will be updated to reflect the values within this feature
+ * instance.</li>
+ * </dl>
+ */
+public class DistributedLockManager
+ implements PolicyResourceLockManager, PolicyEngineFeatureApi {
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class);
+
+ private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
+ private static final String LOCK_LOST_MSG = "lock lost";
+ private static final String NOT_LOCKED_MSG = "not locked";
+
+ @Getter(AccessLevel.PROTECTED)
+ @Setter(AccessLevel.PROTECTED)
+ private static DistributedLockManager latestInstance = null;
+
+
+ /**
+ * Name of the host on which this JVM is running.
+ */
+ @Getter
+ private final String hostName;
+
+ /**
+ * UUID of this object.
+ */
+ @Getter
+ private final String uuidString = UUID.randomUUID().toString();
+
+ /**
+ * Maps a resource to the lock that owns it, or is awaiting a request for it. Once a
+ * lock is added to the map, it remains in the map until the lock is lost or until the
+ * unlock request completes.
+ */
+ private final Map<String, DistributedLock> resource2lock = new ConcurrentHashMap<>();
+
+ /**
+ * Engine with which this manager is associated.
+ */
+ private PolicyEngine engine;
+
+ /**
+ * Feature properties.
+ */
+ private DistributedLockProperties featProps;
+
+ /**
+ * Thread pool used to check for lock expiration and to notify owners when locks are
+ * granted or lost.
+ */
+ private ScheduledExecutorService exsvc = null;
+
+ /**
+ * Data source used to connect to the DB.
+ */
+ private BasicDataSource dataSource = null;
+
+
+ /**
+ * Constructs the object.
+ */
+ public DistributedLockManager() {
+ this.hostName = NetworkUtil.getHostname();
+ }
+
+ @Override
+ public int getSequenceNumber() {
+ return 1000;
+ }
+
+ @Override
+ public boolean isAlive() {
+ return (exsvc != null);
+ }
+
+ @Override
+ public boolean start() {
+ // handled via engine API
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ // handled via engine API
+ return true;
+ }
+
+ @Override
+ public void shutdown() {
+ // handled via engine API
+ }
+
+ @Override
+ public boolean isLocked() {
+ return false;
+ }
+
+ @Override
+ public boolean lock() {
+ return true;
+ }
+
+ @Override
+ public boolean unlock() {
+ return true;
+ }
+
+ @Override
+ public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) {
+
+ try {
+ this.engine = engine;
+ this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME));
+ this.exsvc = getThreadPool();
+ this.dataSource = makeDataSource();
+
+ return this;
+
+ } catch (Exception e) {
+ throw new DistributedLockManagerException(e);
+ }
+ }
+
+ @Override
+ public boolean afterStart(PolicyEngine engine) {
+
+ try {
+ exsvc.execute(this::deleteExpiredDbLocks);
+ exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
+
+ setLatestInstance(this);
+
+ } catch (Exception e) {
+ throw new DistributedLockManagerException(e);
+ }
+
+ return false;
+ }
+
+ /**
+ * Make data source.
+ *
+ * @return a new, pooled data source
+ * @throws Exception exception
+ */
+ protected BasicDataSource makeDataSource() throws Exception {
+ Properties props = new Properties();
+ props.put("driverClassName", featProps.getDbDriver());
+ props.put("url", featProps.getDbUrl());
+ props.put("username", featProps.getDbUser());
+ props.put("password", featProps.getDbPwd());
+ props.put("testOnBorrow", "true");
+ props.put("poolPreparedStatements", "true");
+
+ // additional properties are listed in the GenericObjectPool API
+
+ return BasicDataSourceFactory.createDataSource(props);
+ }
+
+ /**
+ * Deletes expired locks from the DB.
+ */
+ private void deleteExpiredDbLocks() {
+ logger.info("deleting all expired locks from the DB");
+
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn
+ .prepareStatement("DELETE FROM pooling.locks WHERE expirationTime <= now()")) {
+
+ int ndel = stmt.executeUpdate();
+ logger.info("deleted {} expired locks from the DB", ndel);
+
+ } catch (SQLException e) {
+ logger.warn("failed to delete expired locks from the DB", e);
+ }
+ }
+
+ /**
+ * Closes the data source. Does <i>not</i> invoke any lock call-backs.
+ */
+ @Override
+ public boolean afterStop(PolicyEngine engine) {
+ exsvc = null;
+ closeDataSource();
+ return false;
+ }
+
+ /**
+ * Closes {@link #dataSource} and sets it to {@code null}.
+ */
+ private void closeDataSource() {
+ try {
+ if (dataSource != null) {
+ dataSource.close();
+ }
+
+ } catch (SQLException e) {
+ logger.error("cannot close the distributed locking DB", e);
+ }
+
+ dataSource = null;
+ }
+
+ @Override
+ public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback,
+ boolean waitForLock) {
+
+ if (latestInstance != this) {
+ AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback);
+ lock.notifyUnavailable();
+ return lock;
+ }
+
+ DistributedLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback);
+
+ DistributedLock existingLock = resource2lock.putIfAbsent(resourceId, lock);
+
+ // do these outside of compute() to avoid blocking other map operations
+ if (existingLock == null) {
+ logger.debug("added lock to map {}", lock);
+ lock.scheduleRequest(lock::doLock);
+ } else {
+ lock.deny("resource is busy", true);
+ }
+
+ return lock;
+ }
+
+ /**
+ * Checks for expired locks.
+ */
+ private void checkExpired() {
+
+ try {
+ logger.info("checking for expired locks");
+ Set<String> expiredIds = new HashSet<>(resource2lock.keySet());
+ identifyDbLocks(expiredIds);
+ expireLocks(expiredIds);
+
+ exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
+
+ } catch (RejectedExecutionException e) {
+ logger.warn("thread pool is no longer accepting requests", e);
+
+ } catch (SQLException | RuntimeException e) {
+ logger.error("error checking expired locks", e);
+ exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS);
+ }
+
+ logger.info("done checking for expired locks");
+ }
+
+ /**
+ * Identifies this feature instance's locks that the DB indicates are still active.
+ *
+ * @param expiredIds IDs of resources that have expired locks. If a resource is still
+ * locked, it's ID is removed from this set
+ * @throws SQLException if a DB error occurs
+ */
+ private void identifyDbLocks(Set<String> expiredIds) throws SQLException {
+ /*
+ * We could query for host and UUIDs that actually appear within the locks, but
+ * those might change while the query is running so no real value in doing that.
+ * On the other hand, there's only a brief instance between the time a
+ * deserialized lock is added to this feature instance and its doExtend() method
+ * updates its host and UUID to match this feature instance. If this happens to
+ * run during that brief instance, then the lock will be lost and the callback
+ * invoked. It isn't worth complicating this code further to handle those highly
+ * unlikely cases.
+ */
+
+ // @formatter:off
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn.prepareStatement(
+ "SELECT resourceId FROM pooling.locks WHERE host=? AND owner=? AND expirationTime > now()")) {
+ // @formatter:on
+
+ stmt.setString(1, hostName);
+ stmt.setString(2, uuidString);
+
+ try (ResultSet resultSet = stmt.executeQuery()) {
+ while (resultSet.next()) {
+ String resourceId = resultSet.getString(1);
+
+ // we have now seen this resource id
+ expiredIds.remove(resourceId);
+ }
+ }
+ }
+ }
+
+ /**
+ * Expires locks for the resources that no longer appear within the DB.
+ *
+ * @param expiredIds IDs of resources that have expired locks
+ */
+ private void expireLocks(Set<String> expiredIds) {
+ for (String resourceId : expiredIds) {
+ AtomicReference<DistributedLock> lockref = new AtomicReference<>(null);
+
+ resource2lock.computeIfPresent(resourceId, (key, lock) -> {
+ if (lock.isActive()) {
+ // it thinks it's active, but it isn't - remove from the map
+ lockref.set(lock);
+ return null;
+ }
+
+ return lock;
+ });
+
+ DistributedLock lock = lockref.get();
+ if (lock != null) {
+ logger.debug("removed lock from map {}", lock);
+ lock.deny(LOCK_LOST_MSG, false);
+ }
+ }
+ }
+
+ /**
+ * Distributed Lock implementation.
+ */
+ public static class DistributedLock extends LockImpl {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Feature containing this lock. May be {@code null} until the feature is
+ * identified. Note: this can only be null if the lock has been de-serialized.
+ */
+ private transient DistributedLockManager feature;
+
+ /**
+ * Host name from the feature instance that created this object. Replaced with the
+ * host name from the current feature instance whenever the lock is successfully
+ * extended.
+ */
+ private String hostName;
+
+ /**
+ * UUID string from the feature instance that created this object. Replaced with
+ * the UUID string from the current feature instance whenever the lock is
+ * successfully extended.
+ */
+ private String uuidString;
+
+ /**
+ * {@code True} if the lock is busy making a request, {@code false} otherwise.
+ */
+ private transient boolean busy = false;
+
+ /**
+ * Request to be performed.
+ */
+ private transient RunnableWithEx request = null;
+
+ /**
+ * Number of times we've retried a request.
+ */
+ private transient int nretries = 0;
+
+ /**
+ * Constructs the object.
+ */
+ public DistributedLock() {
+ this.hostName = "";
+ this.uuidString = "";
+ }
+
+ /**
+ * Constructs the object.
+ *
+ * @param state initial state of the lock
+ * @param resourceId identifier of the resource to be locked
+ * @param ownerKey information identifying the owner requesting the lock
+ * @param holdSec amount of time, in seconds, for which the lock should be held,
+ * after which it will automatically be released
+ * @param callback callback to be invoked once the lock is granted, or
+ * subsequently lost; must not be {@code null}
+ * @param feature feature containing this lock
+ */
+ public DistributedLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback,
+ DistributedLockManager feature) {
+ super(state, resourceId, ownerKey, holdSec, callback);
+
+ this.feature = feature;
+ this.hostName = feature.hostName;
+ this.uuidString = feature.uuidString;
+ }
+
+ /**
+ * Grants this lock. The notification is <i>always</i> invoked via the
+ * <i>foreground</i> thread.
+ */
+ protected void grant() {
+ synchronized (this) {
+ if (isUnavailable()) {
+ return;
+ }
+
+ setState(LockState.ACTIVE);
+ }
+
+ logger.info("lock granted: {}", this);
+
+ notifyAvailable();
+ }
+
+ /**
+ * Permanently denies this lock.
+ *
+ * @param reason the reason the lock was denied
+ * @param foreground {@code true} if the callback can be invoked in the current
+ * (i.e., foreground) thread, {@code false} if it should be invoked via the
+ * executor
+ */
+ protected void deny(String reason, boolean foreground) {
+ synchronized (this) {
+ setState(LockState.UNAVAILABLE);
+ }
+
+ logger.info("{}: {}", reason, this);
+
+ if (feature == null || foreground) {
+ notifyUnavailable();
+
+ } else {
+ feature.exsvc.execute(this::notifyUnavailable);
+ }
+ }
+
+ @Override
+ public boolean free() {
+ // do a quick check of the state
+ if (isUnavailable()) {
+ return false;
+ }
+
+ logger.info("releasing lock: {}", this);
+
+ if (!attachFeature()) {
+ setState(LockState.UNAVAILABLE);
+ return false;
+ }
+
+ AtomicBoolean result = new AtomicBoolean(false);
+
+ feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
+ if (curlock == this && !isUnavailable()) {
+ // this lock was the owner
+ result.set(true);
+ setState(LockState.UNAVAILABLE);
+
+ /*
+ * NOTE: do NOT return null; curlock must remain until doUnlock
+ * completes.
+ */
+ }
+
+ return curlock;
+ });
+
+ if (result.get()) {
+ scheduleRequest(this::doUnlock);
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public void extend(int holdSec, LockCallback callback) {
+ if (holdSec < 0) {
+ throw new IllegalArgumentException("holdSec is negative");
+ }
+
+ setHoldSec(holdSec);
+ setCallback(callback);
+
+ // do a quick check of the state
+ if (isUnavailable() || !attachFeature()) {
+ deny(LOCK_LOST_MSG, true);
+ return;
+ }
+
+ AtomicBoolean success = new AtomicBoolean(false);
+
+ feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
+ if (curlock == this && !isUnavailable()) {
+ success.set(true);
+ setState(LockState.WAITING);
+ }
+
+ // note: leave it in the map until doUnlock() removes it
+
+ return curlock;
+ });
+
+ if (success.get()) {
+ scheduleRequest(this::doExtend);
+
+ } else {
+ deny(NOT_LOCKED_MSG, true);
+ }
+ }
+
+ /**
+ * Attaches to the feature instance, if not already attached.
+ *
+ * @return {@code true} if the lock is now attached to a feature, {@code false}
+ * otherwise
+ */
+ private synchronized boolean attachFeature() {
+ if (feature != null) {
+ // already attached
+ return true;
+ }
+
+ feature = latestInstance;
+ if (feature == null) {
+ logger.warn("no feature yet for {}", this);
+ return false;
+ }
+
+ // put this lock into the map
+ feature.resource2lock.putIfAbsent(getResourceId(), this);
+
+ return true;
+ }
+
+ /**
+ * Schedules a request for execution.
+ *
+ * @param schedreq the request that should be scheduled
+ */
+ private synchronized void scheduleRequest(RunnableWithEx schedreq) {
+ logger.debug("schedule lock action {}", this);
+ nretries = 0;
+ request = schedreq;
+ feature.exsvc.execute(this::doRequest);
+ }
+
+ /**
+ * Reschedules a request for execution, if there is not already a request in the
+ * queue, and if the retry count has not been exhausted.
+ *
+ * @param req request to be rescheduled
+ */
+ private void rescheduleRequest(RunnableWithEx req) {
+ synchronized (this) {
+ if (request != null) {
+ // a new request has already been scheduled - it supersedes "req"
+ logger.debug("not rescheduling lock action {}", this);
+ return;
+ }
+
+ if (nretries++ < feature.featProps.getMaxRetries()) {
+ logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this);
+ request = req;
+ feature.exsvc.schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS);
+ return;
+ }
+ }
+
+ logger.warn("retry count {} exhausted for lock: {}", feature.featProps.getMaxRetries(), this);
+ removeFromMap();
+ }
+
+ /**
+ * Gets, and removes, the next request from the queue. Clears {@link #busy} if
+ * there are no more requests in the queue.
+ *
+ * @param prevReq the previous request that was just run
+ *
+ * @return the next request, or {@code null} if the queue is empty
+ */
+ private synchronized RunnableWithEx getNextRequest(RunnableWithEx prevReq) {
+ if (request == null || request == prevReq) {
+ logger.debug("no more requests for {}", this);
+ busy = false;
+ return null;
+ }
+
+ RunnableWithEx req = request;
+ request = null;
+
+ return req;
+ }
+
+ /**
+ * Executes the current request, if none are currently executing.
+ */
+ private void doRequest() {
+ synchronized (this) {
+ if (busy) {
+ // another thread is already processing the request(s)
+ return;
+ }
+ busy = true;
+ }
+
+ /*
+ * There is a race condition wherein this thread could invoke run() while the
+ * next scheduled thread checks the busy flag and finds that work is being
+ * done and returns, leaving the next work item in "request". In that case,
+ * the next work item may never be executed, thus we use a loop here, instead
+ * of just executing a single request.
+ */
+ RunnableWithEx req = null;
+ while ((req = getNextRequest(req)) != null) {
+ if (feature.resource2lock.get(getResourceId()) != this) {
+ /*
+ * no longer in the map - don't apply the action, as it may interfere
+ * with any newly added Lock object
+ */
+ logger.debug("discard lock action {}", this);
+ synchronized (this) {
+ busy = false;
+ }
+ return;
+ }
+
+ try {
+ /*
+ * Run the request. If it throws an exception, then it will be
+ * rescheduled for execution a little later.
+ */
+ req.run();
+
+ } catch (SQLException e) {
+ logger.warn("request failed for lock: {}", this, e);
+
+ if (feature.featProps.isTransient(e.getErrorCode())) {
+ // retry the request a little later
+ rescheduleRequest(req);
+ } else {
+ removeFromMap();
+ }
+
+ } catch (RuntimeException e) {
+ logger.warn("request failed for lock: {}", this, e);
+ removeFromMap();
+ }
+ }
+ }
+
+ /**
+ * Attempts to add a lock to the DB. Generates a callback, indicating success or
+ * failure.
+ *
+ * @throws SQLException if a DB error occurs
+ */
+ private void doLock() throws SQLException {
+ if (!isWaiting()) {
+ logger.debug("discard doLock {}", this);
+ return;
+ }
+
+ /*
+ * There is a small window in which a client could invoke free() before the DB
+ * is updated. In that case, doUnlock will be added to the queue to run after
+ * this, which will delete the record, as desired. In addition, grant() will
+ * not do anything, because the lock state will have been set to UNAVAILABLE
+ * by free().
+ */
+
+ logger.debug("doLock {}", this);
+ try (Connection conn = feature.dataSource.getConnection()) {
+ boolean success = false;
+ try {
+ success = doDbInsert(conn);
+
+ } catch (SQLException e) {
+ logger.info("failed to insert lock record - attempting update: {}", this, e);
+ success = doDbUpdate(conn);
+ }
+
+ if (success) {
+ grant();
+ return;
+ }
+ }
+
+ removeFromMap();
+ }
+
+ /**
+ * Attempts to remove a lock from the DB. Does <i>not</i> generate a callback if
+ * it fails, as this should only be executed in response to a call to
+ * {@link #free()}.
+ *
+ * @throws SQLException if a DB error occurs
+ */
+ private void doUnlock() throws SQLException {
+ logger.debug("unlock {}", this);
+ try (Connection conn = feature.dataSource.getConnection()) {
+ doDbDelete(conn);
+ }
+
+ removeFromMap();
+ }
+
+ /**
+ * Attempts to extend a lock in the DB. Generates a callback, indicating success
+ * or failure.
+ *
+ * @throws SQLException if a DB error occurs
+ */
+ private void doExtend() throws SQLException {
+ if (!isWaiting()) {
+ logger.debug("discard doExtend {}", this);
+ return;
+ }
+
+ /*
+ * There is a small window in which a client could invoke free() before the DB
+ * is updated. In that case, doUnlock will be added to the queue to run after
+ * this, which will delete the record, as desired. In addition, grant() will
+ * not do anything, because the lock state will have been set to UNAVAILABLE
+ * by free().
+ */
+
+ logger.debug("doExtend {}", this);
+ try (Connection conn = feature.dataSource.getConnection()) {
+ /*
+ * invoker may have called extend() before free() had a chance to insert
+ * the record, thus we have to try to insert, if the update fails
+ */
+ if (doDbUpdate(conn) || doDbInsert(conn)) {
+ grant();
+ return;
+ }
+ }
+
+ removeFromMap();
+ }
+
+ /**
+ * Inserts the lock into the DB.
+ *
+ * @param conn DB connection
+ * @return {@code true} if a record was successfully inserted, {@code false}
+ * otherwise
+ * @throws SQLException if a DB error occurs
+ */
+ protected boolean doDbInsert(Connection conn) throws SQLException {
+ logger.debug("insert lock record {}", this);
+ try (PreparedStatement stmt =
+ conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
+ + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
+
+ stmt.setString(1, getResourceId());
+ stmt.setString(2, feature.hostName);
+ stmt.setString(3, feature.uuidString);
+ stmt.setInt(4, getHoldSec());
+
+ stmt.executeUpdate();
+
+ this.hostName = feature.hostName;
+ this.uuidString = feature.uuidString;
+
+ return true;
+ }
+ }
+
+ /**
+ * Updates the lock in the DB.
+ *
+ * @param conn DB connection
+ * @return {@code true} if a record was successfully updated, {@code false}
+ * otherwise
+ * @throws SQLException if a DB error occurs
+ */
+ protected boolean doDbUpdate(Connection conn) throws SQLException {
+ logger.debug("update lock record {}", this);
+ try (PreparedStatement stmt =
+ conn.prepareStatement("UPDATE pooling.locks SET resourceId=?, host=?, owner=?,"
+ + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?"
+ + " AND ((host=? AND owner=?) OR expirationTime < now())")) {
+
+ stmt.setString(1, getResourceId());
+ stmt.setString(2, feature.hostName);
+ stmt.setString(3, feature.uuidString);
+ stmt.setInt(4, getHoldSec());
+
+ stmt.setString(5, getResourceId());
+ stmt.setString(6, this.hostName);
+ stmt.setString(7, this.uuidString);
+
+ if (stmt.executeUpdate() != 1) {
+ return false;
+ }
+
+ this.hostName = feature.hostName;
+ this.uuidString = feature.uuidString;
+
+ return true;
+ }
+ }
+
+ /**
+ * Deletes the lock from the DB.
+ *
+ * @param conn DB connection
+ * @throws SQLException if a DB error occurs
+ */
+ protected void doDbDelete(Connection conn) throws SQLException {
+ logger.debug("delete lock record {}", this);
+ try (PreparedStatement stmt =
+ conn.prepareStatement("DELETE pooling.locks WHERE resourceId=? AND host=? AND owner=?")) {
+
+ stmt.setString(1, getResourceId());
+ stmt.setString(2, this.hostName);
+ stmt.setString(3, this.uuidString);
+
+ stmt.executeUpdate();
+ }
+ }
+
+ /**
+ * Removes the lock from the map, and sends a notification using the current
+ * thread.
+ */
+ private void removeFromMap() {
+ logger.debug("remove lock from map {}", this);
+ feature.resource2lock.remove(getResourceId(), this);
+
+ synchronized (this) {
+ if (!isUnavailable()) {
+ deny(LOCK_LOST_MSG, true);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "DistributedLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey="
+ + getOwnerKey() + ", holdSec=" + getHoldSec() + ", hostName=" + hostName + ", uuidString="
+ + uuidString + "]";
+ }
+ }
+
+ @FunctionalInterface
+ private static interface RunnableWithEx {
+ void run() throws SQLException;
+ }
+
+ // these may be overridden by junit tests
+
+ protected Properties getProperties(String fileName) {
+ return SystemPersistenceConstants.getManager().getProperties(fileName);
+ }
+
+ protected ScheduledExecutorService getThreadPool() {
+ return engine.getExecutorService();
+ }
+
+ protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+ return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this);
+ }
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java
index 55fc4fab..e720f9a1 100644
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* feature-distributed-locking
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,15 +20,15 @@
package org.onap.policy.distributed.locking;
-public class DistributedLockingFeatureException extends RuntimeException {
+public class DistributedLockManagerException extends RuntimeException {
private static final long serialVersionUID = 1L;
/**
* Constructor.
- *
+ *
* @param ex exception to be wrapped
*/
- public DistributedLockingFeatureException(Exception ex) {
+ public DistributedLockManagerException(Exception ex) {
super(ex);
}
}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java
new file mode 100644
index 00000000..f470c8e2
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java
@@ -0,0 +1,136 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.utils.properties.BeanConfigurator;
+import org.onap.policy.common.utils.properties.Property;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+
+
+@Getter
+@Setter
+public class DistributedLockProperties {
+ public static final String PREFIX = "distributed.locking.";
+
+ public static final String DB_DRIVER = "javax.persistence.jdbc.driver";
+ public static final String DB_URL = "javax.persistence.jdbc.url";
+ public static final String DB_USER = "javax.persistence.jdbc.user";
+ public static final String DB_PASS = "javax.persistence.jdbc.password";
+ public static final String TRANSIENT_ERROR_CODES = PREFIX + "transient.error.codes";
+ public static final String EXPIRE_CHECK_SEC = PREFIX + "expire.check.seconds";
+ public static final String RETRY_SEC = PREFIX + "retry.seconds";
+ public static final String MAX_RETRIES = PREFIX + "max.retries";
+
+ /**
+ * Database driver.
+ */
+ @Property(name = DB_DRIVER)
+ private String dbDriver;
+
+ /**
+ * Database url.
+ */
+ @Property(name = DB_URL)
+ private String dbUrl;
+
+ /**
+ * Database user.
+ */
+ @Property(name = DB_USER)
+ private String dbUser;
+
+ /**
+ * Database password.
+ */
+ @Property(name = DB_PASS)
+ private String dbPwd;
+
+ /**
+ * Vendor-specific error codes that are "transient", meaning they may go away if the
+ * command is repeated (e.g., connection issue), as opposed to something like a syntax
+ * error or a duplicate key.
+ */
+ @Property(name = TRANSIENT_ERROR_CODES)
+ private String errorCodeStrings;
+
+ private final Set<Integer> transientErrorCodes;
+
+ /**
+ * Time, in seconds, to wait between checks for expired locks.
+ */
+ @Property(name = EXPIRE_CHECK_SEC, defaultValue = "900")
+ private int expireCheckSec;
+
+ /**
+ * Number of seconds to wait before retrying, after a DB error.
+ */
+ @Property(name = RETRY_SEC, defaultValue = "60")
+ private int retrySec;
+
+ /**
+ * Maximum number of times to retry a DB operation.
+ */
+ @Property(name = MAX_RETRIES, defaultValue = "2")
+ private int maxRetries;
+
+ /**
+ * Constructs the object, populating fields from the properties.
+ *
+ * @param props properties from which to configure this
+ * @throws PropertyException if an error occurs
+ */
+ public DistributedLockProperties(Properties props) throws PropertyException {
+ new BeanConfigurator().configureFromProperties(this, props);
+
+ Set<Integer> set = new HashSet<>();
+ for (String text : errorCodeStrings.split(",")) {
+ text = text.trim();
+ if (text.isEmpty()) {
+ continue;
+ }
+
+ try {
+ set.add(Integer.valueOf(text));
+
+ } catch (NumberFormatException e) {
+ throw new PropertyException(TRANSIENT_ERROR_CODES, "errorCodeStrings", e);
+ }
+ }
+
+ transientErrorCodes = Collections.unmodifiableSet(set);
+ }
+
+ /**
+ * Determines if an error is transient.
+ *
+ * @param errorCode error code to check
+ * @return {@code true} if the error is transient, {@code false} otherwise
+ */
+ public boolean isTransient(int errorCode) {
+ return transientErrorCodes.contains(errorCode);
+ }
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java
deleted file mode 100644
index d5e07a30..00000000
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Properties;
-import java.util.UUID;
-import org.apache.commons.dbcp2.BasicDataSource;
-import org.apache.commons.dbcp2.BasicDataSourceFactory;
-import org.onap.policy.common.utils.properties.exception.PropertyException;
-import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi;
-import org.onap.policy.drools.features.PolicyEngineFeatureApi;
-import org.onap.policy.drools.persistence.SystemPersistenceConstants;
-import org.onap.policy.drools.system.PolicyEngine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DistributedLockingFeature implements PolicyEngineFeatureApi, PolicyResourceLockFeatureApi {
-
- /**
- * Logger instance.
- */
- private static final Logger logger = LoggerFactory.getLogger(DistributedLockingFeature.class);
-
- /**
- * Properties Configuration Name.
- */
- public static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
-
- /**
- * Properties for locking feature.
- */
- private DistributedLockingProperties lockProps;
-
- /**
- * Data source used to connect to the DB containing locks.
- */
- private BasicDataSource dataSource;
-
- /**
- * UUID.
- */
- private static final UUID uuid = UUID.randomUUID();
-
- @Override
- public int getSequenceNumber() {
- return 1000;
- }
-
- @Override
- public OperResult beforeLock(String resourceId, String owner, int holdSec) {
-
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.lock(holdSec) ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeRefresh(String resourceId, String owner, int holdSec) {
-
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.refresh(holdSec) ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeUnlock(String resourceId, String owner) {
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.unlock() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeIsLockedBy(String resourceId, String owner) {
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.isActive() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeIsLocked(String resourceId) {
- TargetLock lock = new TargetLock(resourceId, uuid, "dummyOwner", dataSource);
-
- return (lock.isLocked() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public boolean afterStart(PolicyEngine engine) {
-
- try {
- this.lockProps = new DistributedLockingProperties(SystemPersistenceConstants.getManager()
- .getProperties(DistributedLockingFeature.CONFIGURATION_PROPERTIES_NAME));
- this.dataSource = makeDataSource();
- } catch (PropertyException e) {
- logger.error("DistributedLockingFeature feature properies have not been loaded", e);
- throw new DistributedLockingFeatureException(e);
- } catch (InterruptedException e) {
- logger.error("DistributedLockingFeature failed to create data source", e);
- Thread.currentThread().interrupt();
- throw new DistributedLockingFeatureException(e);
- } catch (Exception e) {
- logger.error("DistributedLockingFeature failed to create data source", e);
- throw new DistributedLockingFeatureException(e);
- }
-
- cleanLockTable();
-
- return false;
- }
-
- /**
- * Make data source.
- *
- * @return a new, pooled data source
- * @throws Exception exception
- */
- protected BasicDataSource makeDataSource() throws Exception {
- Properties props = new Properties();
- props.put("driverClassName", lockProps.getDbDriver());
- props.put("url", lockProps.getDbUrl());
- props.put("username", lockProps.getDbUser());
- props.put("password", lockProps.getDbPwd());
- props.put("testOnBorrow", "true");
- props.put("poolPreparedStatements", "true");
-
- // additional properties are listed in the GenericObjectPool API
-
- return BasicDataSourceFactory.createDataSource(props);
- }
-
- /**
- * This method kills the heartbeat thread and calls refreshLockTable which removes
- * any records from the db where the current host is the owner.
- */
- @Override
- public boolean beforeShutdown(PolicyEngine engine) {
- cleanLockTable();
- return false;
- }
-
- /**
- * This method removes all records owned by the current host from the db.
- */
- private void cleanLockTable() {
-
- try (Connection conn = dataSource.getConnection();
- PreparedStatement statement = conn.prepareStatement(
- "DELETE FROM pooling.locks WHERE host = ? OR expirationTime < now()")
- ) {
-
- statement.setString(1, uuid.toString());
- statement.executeUpdate();
-
- } catch (SQLException e) {
- logger.error("error in refreshLockTable()", e);
- }
-
- }
-}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java
deleted file mode 100644
index 0ed5930d..00000000
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import java.util.Properties;
-import org.onap.policy.common.utils.properties.BeanConfigurator;
-import org.onap.policy.common.utils.properties.Property;
-import org.onap.policy.common.utils.properties.exception.PropertyException;
-
-
-public class DistributedLockingProperties {
-
- /**
- * Feature properties all begin with this prefix.
- */
- public static final String PREFIX = "distributed.locking.";
-
- public static final String DB_DRIVER = "javax.persistence.jdbc.driver";
- public static final String DB_URL = "javax.persistence.jdbc.url";
- public static final String DB_USER = "javax.persistence.jdbc.user";
- public static final String DB_PWD = "javax.persistence.jdbc.password";
-
- /**
- * Properties from which this was constructed.
- */
- private final Properties source;
-
- /**
- * Database driver.
- */
- @Property(name = DB_DRIVER)
- private String dbDriver;
-
- /**
- * Database url.
- */
- @Property(name = DB_URL)
- private String dbUrl;
-
- /**
- * Database user.
- */
- @Property(name = DB_USER)
- private String dbUser;
-
- /**
- * Database password.
- */
- @Property(name = DB_PWD)
- private String dbPwd;
-
- /**
- * Constructs the object, populating fields from the properties.
- *
- * @param props properties from which to configure this
- * @throws PropertyException if an error occurs
- */
- public DistributedLockingProperties(Properties props) throws PropertyException {
- source = props;
-
- new BeanConfigurator().configureFromProperties(this, props);
- }
-
-
- public Properties getSource() {
- return source;
- }
-
-
- public String getDbDriver() {
- return dbDriver;
- }
-
-
- public String getDbUrl() {
- return dbUrl;
- }
-
-
- public String getDbUser() {
- return dbUser;
- }
-
-
- public String getDbPwd() {
- return dbPwd;
- }
-
-
- public void setDbDriver(String dbDriver) {
- this.dbDriver = dbDriver;
- }
-
-
- public void setDbUrl(String dbUrl) {
- this.dbUrl = dbUrl;
- }
-
-
- public void setDbUser(String dbUser) {
- this.dbUser = dbUser;
- }
-
-
- public void setDbPwd(String dbPwd) {
- this.dbPwd = dbPwd;
- }
-
-}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java
deleted file mode 100644
index 42e1f92f..00000000
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.UUID;
-import org.apache.commons.dbcp2.BasicDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TargetLock {
-
- private static final Logger logger = LoggerFactory.getLogger(TargetLock.class);
-
- /**
- * The Target resource we want to lock.
- */
- private String resourceId;
-
- /**
- * Data source used to connect to the DB containing locks.
- */
- private BasicDataSource dataSource;
-
- /**
- * UUID .
- */
- private UUID uuid;
-
- /**
- * Owner.
- */
- private String owner;
-
- /**
- * Constructs a TargetLock object.
- *
- * @param resourceId ID of the entity we want to lock
- * @param dataSource used to connect to the DB containing locks
- */
- public TargetLock(String resourceId, UUID uuid, String owner, BasicDataSource dataSource) {
- this.resourceId = resourceId;
- this.uuid = uuid;
- this.owner = owner;
- this.dataSource = dataSource;
- }
-
- /**
- * Obtain a lock.
- * @param holdSec the amount of time, in seconds, that the lock should be held
- */
- public boolean lock(int holdSec) {
-
- return grabLock(holdSec);
- }
-
- /**
- * Refresh a lock.
- *
- * @param holdSec the amount of time, in seconds, that the lock should be held
- * @return {@code true} if the lock was refreshed, {@code false} if the resource is
- * not currently locked by the given owner
- */
- public boolean refresh(int holdSec) {
- return updateLock(holdSec);
- }
-
- /**
- * Unlock a resource by deleting it's associated record in the db.
- */
- public boolean unlock() {
- return deleteLock();
- }
-
- /**
- * "Grabs" lock by attempting to insert a new record in the db.
- * If the insert fails due to duplicate key error resource is already locked
- * so we call secondGrab.
- * @param holdSec the amount of time, in seconds, that the lock should be held
- */
- private boolean grabLock(int holdSec) {
-
- // try to insert a record into the table(thereby grabbing the lock)
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement statement = conn.prepareStatement(
- "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
- + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
-
- int index = 1;
- statement.setString(index++, this.resourceId);
- statement.setString(index++, this.uuid.toString());
- statement.setString(index++, this.owner);
- statement.setInt(index++, holdSec);
- statement.executeUpdate();
- }
-
- catch (SQLException e) {
- logger.error("error in TargetLock.grabLock()", e);
- return secondGrab(holdSec);
- }
-
- return true;
- }
-
- /**
- * A second attempt at grabbing a lock. It first attempts to update the lock in case it is expired.
- * If that fails, it attempts to insert a new record again
- * @param holdSec the amount of time, in seconds, that the lock should be held
- */
- private boolean secondGrab(int holdSec) {
-
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement updateStatement = conn.prepareStatement(
- "UPDATE pooling.locks SET host = ?, owner = ?, "
- + "expirationTime = timestampadd(second, ?, now()) "
- + "WHERE resourceId = ? AND expirationTime < now()");
-
- PreparedStatement insertStatement = conn.prepareStatement(
- "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
- + "values (?, ?, ?, timestampadd(second, ?, now()))");) {
-
- int index = 1;
- updateStatement.setString(index++, this.uuid.toString());
- updateStatement.setString(index++, this.owner);
- updateStatement.setInt(index++, holdSec);
- updateStatement.setString(index++, this.resourceId);
-
- // The lock was expired and we grabbed it.
- // return true
- if (updateStatement.executeUpdate() == 1) {
- return true;
- }
-
- // If our update does not return 1 row, the lock either has not expired
- // or it was removed. Try one last grab
- else {
- index = 1;
- insertStatement.setString(index++, this.resourceId);
- insertStatement.setString(index++, this.uuid.toString());
- insertStatement.setString(index++, this.owner);
- insertStatement.setInt(index++, holdSec);
-
- // If our insert returns 1 we successfully grabbed the lock
- return (insertStatement.executeUpdate() == 1);
- }
-
- } catch (SQLException e) {
- logger.error("error in TargetLock.secondGrab()", e);
- return false;
- }
-
- }
-
- /**
- * Updates the DB record associated with the lock.
- *
- * @param holdSec the amount of time, in seconds, that the lock should be held
- * @return {@code true} if the record was updated, {@code false} otherwise
- */
- private boolean updateLock(int holdSec) {
-
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement updateStatement = conn.prepareStatement(
- "UPDATE pooling.locks SET host = ?, owner = ?, "
- + "expirationTime = timestampadd(second, ?, now()) "
- + "WHERE resourceId = ? AND owner = ? AND expirationTime >= now()")) {
-
- int index = 1;
- updateStatement.setString(index++, this.uuid.toString());
- updateStatement.setString(index++, this.owner);
- updateStatement.setInt(index++, holdSec);
- updateStatement.setString(index++, this.resourceId);
- updateStatement.setString(index++, this.owner);
-
- // refresh succeeded iff a record was updated
- return (updateStatement.executeUpdate() == 1);
-
- } catch (SQLException e) {
- logger.error("error in TargetLock.refreshLock()", e);
- return false;
- }
-
- }
-
- /**
- *To remove a lock we simply delete the record from the db .
- */
- private boolean deleteLock() {
-
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement deleteStatement = conn.prepareStatement(
- "DELETE FROM pooling.locks WHERE resourceId = ? AND owner = ? AND host = ?")) {
-
- deleteStatement.setString(1, this.resourceId);
- deleteStatement.setString(2, this.owner);
- deleteStatement.setString(3, this.uuid.toString());
-
- return (deleteStatement.executeUpdate() == 1);
-
- } catch (SQLException e) {
- logger.error("error in TargetLock.deleteLock()", e);
- return false;
- }
-
- }
-
- /**
- * Is the lock active.
- */
- public boolean isActive() {
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement selectStatement = conn.prepareStatement(
- "SELECT * FROM pooling.locks "
- + "WHERE resourceId = ? AND host = ? AND owner= ? AND expirationTime >= now()")) {
-
- selectStatement.setString(1, this.resourceId);
- selectStatement.setString(2, this.uuid.toString());
- selectStatement.setString(3, this.owner);
- try (ResultSet result = selectStatement.executeQuery()) {
-
- // This will return true if the
- // query returned at least one row
- return result.first();
- }
-
- }
-
- catch (SQLException e) {
- logger.error("error in TargetLock.isActive()", e);
- return false;
- }
-
- }
-
- /**
- * Is the resource locked.
- */
- public boolean isLocked() {
-
- try (Connection conn = dataSource.getConnection();
- PreparedStatement selectStatement = conn
- .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND expirationTime >= now()")) {
-
- selectStatement.setString(1, this.resourceId);
- try (ResultSet result = selectStatement.executeQuery()) {
- // This will return true if the
- // query returned at least one row
- return result.first();
- }
- } catch (SQLException e) {
- logger.error("error in TargetLock.isActive()", e);
- return false;
- }
- }
-
-}
diff --git a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi
deleted file mode 100644
index 19bdf505..00000000
--- a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi
+++ /dev/null
@@ -1 +0,0 @@
-org.onap.policy.distributed.locking.DistributedLockingFeature \ No newline at end of file
diff --git a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi
index 19bdf505..2a7c0547 100644
--- a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi
+++ b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi
@@ -1 +1 @@
-org.onap.policy.distributed.locking.DistributedLockingFeature \ No newline at end of file
+org.onap.policy.distributed.locking.DistributedLockManager \ No newline at end of file
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureExceptionTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerExceptionTest.java
index 7ba2384a..cfd6a151 100644
--- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureExceptionTest.java
+++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerExceptionTest.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* feature-distributed-locking
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,13 +24,13 @@ import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.onap.policy.common.utils.test.ExceptionsTester;
-import org.onap.policy.distributed.locking.DistributedLockingFeatureException;
+import org.onap.policy.distributed.locking.DistributedLockManagerException;
-public class DistributedLockingFeatureExceptionTest extends ExceptionsTester {
+public class DistributedLockManagerExceptionTest extends ExceptionsTester {
@Test
public void test() {
- assertEquals(1, test(DistributedLockingFeatureException.class));
+ assertEquals(1, test(DistributedLockManagerException.class));
}
}
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java
new file mode 100644
index 00000000..59b56224
--- /dev/null
+++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java
@@ -0,0 +1,1818 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.utils.services.OrderedServiceImpl;
+import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.LockState;
+import org.onap.policy.drools.features.PolicyEngineFeatureApi;
+import org.onap.policy.drools.persistence.SystemPersistenceConstants;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.onap.policy.drools.system.PolicyEngineConstants;
+import org.powermock.reflect.Whitebox;
+
+public class DistributedLockManagerTest {
+ private static final long EXPIRE_SEC = 900L;
+ private static final long RETRY_SEC = 60L;
+ private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
+ private static final String OTHER_HOST = "other-host";
+ private static final String OTHER_OWNER = "other-owner";
+ private static final String EXPECTED_EXCEPTION = "expected exception";
+ private static final String DB_CONNECTION =
+ "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
+ private static final String DB_USER = "user";
+ private static final String DB_PASSWORD = "password";
+ private static final String OWNER_KEY = "my key";
+ private static final String RESOURCE = "my resource";
+ private static final String RESOURCE2 = "my resource #2";
+ private static final String RESOURCE3 = "my resource #3";
+ private static final String RESOURCE4 = "my resource #4";
+ private static final String RESOURCE5 = "my resource #5";
+ private static final int HOLD_SEC = 100;
+ private static final int HOLD_SEC2 = 120;
+ private static final int MAX_THREADS = 5;
+ private static final int MAX_LOOPS = 100;
+ private static final int TRANSIENT = 500;
+ private static final int PERMANENT = 600;
+
+ // number of execute() calls before the first lock attempt
+ private static final int PRE_LOCK_EXECS = 1;
+
+ // number of execute() calls before the first schedule attempt
+ private static final int PRE_SCHED_EXECS = 1;
+
+ private static Connection conn = null;
+ private static ScheduledExecutorService saveExec;
+ private static ScheduledExecutorService realExec;
+
+ @Mock
+ private ScheduledExecutorService exsvc;
+
+ @Mock
+ private LockCallback callback;
+
+ @Mock
+ private BasicDataSource datasrc;
+
+ @Mock
+ private PolicyEngine engine;
+
+ private DistributedLock lock;
+
+ private AtomicInteger nactive;
+ private AtomicInteger nsuccesses;
+ private DistributedLockManager feature;
+
+
+ /**
+ * Configures the location of the property files and creates the DB.
+ *
+ * @throws SQLException if the DB cannot be created
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws SQLException {
+ SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
+
+ conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
+
+ try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
+ + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
+ + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
+ createStmt.executeUpdate();
+ }
+
+ saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
+
+ realExec = Executors.newScheduledThreadPool(3);
+ Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
+ }
+
+ /**
+ * Restores static fields.
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws SQLException {
+ Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
+ realExec.shutdown();
+ conn.close();
+ }
+
+ /**
+ * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
+ * tasks.
+ *
+ * @throws SQLException if the lock records cannot be deleted from the DB
+ */
+ @Before
+ public void setUp() throws SQLException {
+ MockitoAnnotations.initMocks(this);
+
+ nactive = new AtomicInteger(0);
+ nsuccesses = new AtomicInteger(0);
+
+ cleanDb();
+
+ when(engine.getExecutorService()).thenReturn(exsvc);
+
+ feature = new MyLockingFeature(true);
+ }
+
+ @After
+ public void tearDown() throws SQLException {
+ shutdownFeature();
+ cleanDb();
+ }
+
+ private void cleanDb() throws SQLException {
+ try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
+ stmt.executeUpdate();
+ }
+ }
+
+ private void shutdownFeature() {
+ if (feature != null) {
+ feature.afterStop(engine);
+ feature = null;
+ }
+ }
+
+ /**
+ * Tests that the feature is found in the expected service sets.
+ */
+ @Test
+ public void testServiceApis() {
+ assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
+ .anyMatch(obj -> obj instanceof DistributedLockManager));
+ }
+
+ /**
+ * Tests constructor() when properties are invalid.
+ */
+ @Test
+ public void testDistributedLockManagerInvalidProperties() {
+ // use properties containing an invalid value
+ Properties props = new Properties();
+ props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "abc");
+
+ feature = new MyLockingFeature(false) {
+ @Override
+ protected Properties getProperties(String fileName) {
+ return props;
+ }
+ };
+
+ assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
+ }
+
+ @Test
+ public void testGetSequenceNumber() {
+ assertEquals(1000, feature.getSequenceNumber());
+ }
+
+ @Test
+ public void testStartableApi() {
+ assertTrue(feature.isAlive());
+ assertTrue(feature.start());
+ assertTrue(feature.stop());
+ feature.shutdown();
+
+ // above should have had no effect
+ assertTrue(feature.isAlive());
+
+ feature.afterStop(engine);
+ assertFalse(feature.isAlive());
+ }
+
+ @Test
+ public void testLockApi() {
+ assertFalse(feature.isLocked());
+ assertTrue(feature.lock());
+ assertTrue(feature.unlock());
+ }
+
+ @Test
+ public void testBeforeCreateLockManager() {
+ assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
+ }
+
+ /**
+ * Tests beforeCreate(), when getProperties() throws a runtime exception.
+ */
+ @Test
+ public void testBeforeCreateLockManagerEx() {
+ shutdownFeature();
+
+ feature = new MyLockingFeature(false) {
+ @Override
+ protected Properties getProperties(String fileName) {
+ throw new IllegalArgumentException(EXPECTED_EXCEPTION);
+ }
+ };
+
+ assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, new Properties()))
+ .isInstanceOf(DistributedLockManagerException.class);
+ }
+
+ @Test
+ public void testAfterStart() {
+ // verify that cleanup & expire check are both added to the queue
+ verify(exsvc).execute(any());
+ verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
+ }
+
+ /**
+ * Tests afterStart(), when thread pool throws a runtime exception.
+ */
+ @Test
+ public void testAfterStartExInThreadPool() {
+ shutdownFeature();
+
+ feature = new MyLockingFeature(false);
+
+ when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
+ .thenThrow(new IllegalArgumentException(EXPECTED_EXCEPTION));
+
+ assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
+ }
+
+ @Test
+ public void testDeleteExpiredDbLocks() throws SQLException {
+ // add records: two expired, one not
+ insertRecord(RESOURCE, feature.getUuidString(), -1);
+ insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
+ insertRecord(RESOURCE3, OTHER_OWNER, 0);
+ insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
+
+ // get the clean-up function and execute it
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc).execute(captor.capture());
+
+ long tbegin = System.currentTimeMillis();
+ Runnable action = captor.getValue();
+ action.run();
+
+ assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
+ assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
+ assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
+ assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
+
+ assertEquals(2, getRecordCount());
+ }
+
+ /**
+ * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDeleteExpiredDbLocksEx() throws SQLException {
+ feature = new InvalidDbLockingFeature(TRANSIENT);
+
+ // get the clean-up function and execute it
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc).execute(captor.capture());
+
+ Runnable action = captor.getValue();
+
+ // should not throw an exception
+ action.run();
+ }
+
+ @Test
+ public void testAfterStop() {
+ shutdownFeature();
+
+ feature = new DistributedLockManager();
+
+ // shutdown without calling afterStart()
+
+ shutdownFeature();
+ }
+
+ /**
+ * Tests afterStop(), when the data source throws an exception when close() is called.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testAfterStopEx() throws SQLException {
+ shutdownFeature();
+
+ // use a data source that throws an exception when closed
+ feature = new InvalidDbLockingFeature(TRANSIENT);
+
+ shutdownFeature();
+ }
+
+ @Test
+ public void testCreateLock() throws SQLException {
+ verify(exsvc).execute(any());
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertTrue(lock.isWaiting());
+
+ verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
+
+ // this lock should fail
+ LockCallback callback2 = mock(LockCallback.class);
+ DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
+ assertTrue(lock2.isUnavailable());
+ verify(callback2, never()).lockAvailable(lock2);
+ verify(callback2).lockUnavailable(lock2);
+
+ // this should fail, too
+ LockCallback callback3 = mock(LockCallback.class);
+ DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
+ assertTrue(lock3.isUnavailable());
+ verify(callback3, never()).lockAvailable(lock3);
+ verify(callback3).lockUnavailable(lock3);
+
+ // no change to first
+ assertTrue(lock.isWaiting());
+
+ // no callbacks to the first lock
+ verify(callback, never()).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ assertTrue(lock.isWaiting());
+ assertEquals(0, getRecordCount());
+
+ runLock(0, 0);
+ assertTrue(lock.isActive());
+ assertEquals(1, getRecordCount());
+
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ // this should succeed
+ DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
+ assertTrue(lock4.isWaiting());
+
+ // after running checker, original records should still remain
+ runChecker(0, 0, EXPIRE_SEC);
+ assertEquals(1, getRecordCount());
+ verify(callback, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests lock() when the feature is not the latest instance.
+ */
+ @Test
+ public void testCreateLockNotLatestInstance() {
+ DistributedLockManager.setLatestInstance(null);
+
+ Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertTrue(lock.isUnavailable());
+ verify(callback, never()).lockAvailable(any());
+ verify(callback).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testCheckExpired() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
+ runLock(1, 0);
+
+ LockCallback callback3 = mock(LockCallback.class);
+ final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
+ runLock(2, 0);
+
+ LockCallback callback4 = mock(LockCallback.class);
+ final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
+ runLock(3, 0);
+
+ LockCallback callback5 = mock(LockCallback.class);
+ final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
+ runLock(4, 0);
+
+ assertEquals(5, getRecordCount());
+
+ // expire one record
+ updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
+
+ // change host of another record
+ updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
+
+ // change uuid of another record
+ updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC);
+
+
+ // run the checker
+ runChecker(0, 0, EXPIRE_SEC);
+
+
+ // check lock states
+ assertTrue(lock.isUnavailable());
+ assertTrue(lock2.isActive());
+ assertTrue(lock3.isUnavailable());
+ assertTrue(lock4.isActive());
+ assertTrue(lock5.isUnavailable());
+
+ // allow callbacks
+ runLock(5, 2);
+ runLock(6, 1);
+ runLock(7, 0);
+ verify(callback).lockUnavailable(lock);
+ verify(callback3).lockUnavailable(lock3);
+ verify(callback5).lockUnavailable(lock5);
+
+ verify(callback2, never()).lockUnavailable(lock2);
+ verify(callback4, never()).lockUnavailable(lock4);
+
+
+ // another check should have been scheduled, with the normal interval
+ runChecker(1, 0, EXPIRE_SEC);
+ }
+
+ /**
+ * Tests checkExpired(), when schedule() throws an exception.
+ */
+ @Test
+ public void testCheckExpiredExecRejected() {
+ // arrange for execution to be rejected
+ when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
+ .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
+
+ runChecker(0, 0, EXPIRE_SEC);
+ }
+
+ /**
+ * Tests checkExpired(), when getConnection() throws an exception.
+ */
+ @Test
+ public void testCheckExpiredSqlEx() {
+ // use a data source that throws an exception when getConnection() is called
+ feature = new InvalidDbLockingFeature(TRANSIENT);
+
+ runChecker(0, 0, EXPIRE_SEC);
+
+ // it should have scheduled another check, sooner
+ runChecker(0, 0, RETRY_SEC);
+ }
+
+ @Test
+ public void testExpireLocks() throws SQLException {
+ AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
+
+ feature = new MyLockingFeature(true) {
+ @Override
+ protected BasicDataSource makeDataSource() throws Exception {
+ // get the real data source
+ BasicDataSource src2 = super.makeDataSource();
+
+ when(datasrc.getConnection()).thenAnswer(answer -> {
+ DistributedLock lck = freeLock.getAndSet(null);
+ if (lck != null) {
+ // free it
+ lck.free();
+
+ // run its doUnlock
+ runLock(4, 0);
+ }
+
+ return src2.getConnection();
+ });
+
+ return datasrc;
+ }
+ };
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
+ runLock(1, 0);
+
+ LockCallback callback3 = mock(LockCallback.class);
+ final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
+ // don't run doLock for lock3 - leave it in the waiting state
+
+ LockCallback callback4 = mock(LockCallback.class);
+ final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
+ runLock(3, 0);
+
+ assertEquals(3, getRecordCount());
+
+ // expire one record
+ updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
+
+ // arrange to free lock4 while the checker is running
+ freeLock.set(lock4);
+
+ // run the checker
+ runChecker(0, 0, EXPIRE_SEC);
+
+
+ // check lock states
+ assertTrue(lock.isUnavailable());
+ assertTrue(lock2.isActive());
+ assertTrue(lock3.isWaiting());
+ assertTrue(lock4.isUnavailable());
+
+ runLock(5, 0);
+ verify(exsvc, times(PRE_LOCK_EXECS + 6)).execute(any());
+
+ verify(callback).lockUnavailable(lock);
+ verify(callback2, never()).lockUnavailable(lock2);
+ verify(callback3, never()).lockUnavailable(lock3);
+ verify(callback4, never()).lockUnavailable(lock4);
+ }
+
+ @Test
+ public void testDistributedLockNoArgs() {
+ DistributedLock lock = new DistributedLock();
+ assertNull(lock.getResourceId());
+ assertNull(lock.getOwnerKey());
+ assertNull(lock.getCallback());
+ assertEquals(0, lock.getHoldSec());
+ }
+
+ @Test
+ public void testDistributedLock() {
+ assertThatIllegalArgumentException()
+ .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
+ .withMessageContaining("holdSec");
+
+ // should generate no exception
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ }
+
+ @Test
+ public void testDistributedLockSerializable() throws Exception {
+ DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ lock = roundTrip(lock);
+
+ assertTrue(lock.isWaiting());
+
+ assertEquals(RESOURCE, lock.getResourceId());
+ assertEquals(OWNER_KEY, lock.getOwnerKey());
+ assertNull(lock.getCallback());
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+ }
+
+ @Test
+ public void testGrant() {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertFalse(lock.isActive());
+
+ // execute the doLock() call
+ runLock(0, 0);
+
+ assertTrue(lock.isActive());
+
+ // the callback for the lock should have been run in the foreground thread
+ verify(callback).lockAvailable(lock);
+ }
+
+ /**
+ * Tests grant() when the lock is already unavailable.
+ */
+ @Test
+ public void testDistributedLockGrantUnavailable() {
+ DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ lock.setState(LockState.UNAVAILABLE);
+ lock.grant();
+
+ assertTrue(lock.isUnavailable());
+ verify(callback, never()).lockAvailable(any());
+ verify(callback, never()).lockUnavailable(any());
+ }
+
+ @Test
+ public void testDistributedLockDeny() {
+ // get a lock
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // get another lock - should fail
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ assertTrue(lock.isUnavailable());
+
+ // the callback for the second lock should have been run in the foreground thread
+ verify(callback).lockUnavailable(lock);
+
+ // should only have a request for the first lock
+ verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
+ }
+
+ @Test
+ public void testDistributedLockFree() {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ assertTrue(lock.free());
+ assertTrue(lock.isUnavailable());
+
+ // run both requests associated with the lock
+ runLock(0, 1);
+ runLock(1, 0);
+
+ // should not have changed state
+ assertTrue(lock.isUnavailable());
+
+ // attempt to free it again
+ assertFalse(lock.free());
+
+ // should not have queued anything else
+ verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
+
+ // new lock should succeed
+ DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertTrue(lock2 != lock);
+ assertTrue(lock2.isWaiting());
+ }
+
+ /**
+ * Tests that free() works on a serialized lock with a new feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testDistributedLockFreeSerialized() throws Exception {
+ DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ feature = new MyLockingFeature(true);
+
+ lock = roundTrip(lock);
+ assertTrue(lock.free());
+ assertTrue(lock.isUnavailable());
+ }
+
+ /**
+ * Tests free() on a serialized lock without a feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testDistributedLockFreeNoFeature() throws Exception {
+ DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ DistributedLockManager.setLatestInstance(null);
+
+ lock = roundTrip(lock);
+ assertFalse(lock.free());
+ assertTrue(lock.isUnavailable());
+ }
+
+ /**
+ * Tests the case where the lock is freed and doUnlock called between the call to
+ * isUnavailable() and the call to compute().
+ */
+ @Test
+ public void testDistributedLockFreeUnlocked() {
+ feature = new FreeWithFreeLockingFeature(true);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ assertFalse(lock.free());
+ assertTrue(lock.isUnavailable());
+ }
+
+ /**
+ * Tests the case where the lock is freed, but doUnlock is not completed, between the
+ * call to isUnavailable() and the call to compute().
+ */
+ @Test
+ public void testDistributedLockFreeLockFreed() {
+ feature = new FreeWithFreeLockingFeature(false);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ assertFalse(lock.free());
+ assertTrue(lock.isUnavailable());
+ }
+
+ @Test
+ public void testDistributedLockExtend() {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // lock2 should be denied - called back by this thread
+ DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ verify(callback, never()).lockAvailable(lock2);
+ verify(callback).lockUnavailable(lock2);
+
+ // lock2 will still be denied - called back by this thread
+ lock2.extend(HOLD_SEC, callback);
+ verify(callback, times(2)).lockUnavailable(lock2);
+
+ // force lock2 to be active - should still be denied
+ Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
+ lock2.extend(HOLD_SEC, callback);
+ verify(callback, times(3)).lockUnavailable(lock2);
+
+ assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
+ .withMessageContaining("holdSec");
+
+ // execute doLock()
+ runLock(0, 0);
+ assertTrue(lock.isActive());
+
+ // now extend the first lock
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+ assertTrue(lock.isWaiting());
+
+ // execute doExtend()
+ runLock(1, 0);
+ lock.extend(HOLD_SEC2, callback2);
+ assertEquals(HOLD_SEC2, lock.getHoldSec());
+ verify(callback2).lockAvailable(lock);
+ verify(callback2, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests that extend() works on a serialized lock with a new feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testDistributedLockExtendSerialized() throws Exception {
+ DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // run doLock
+ runLock(0, 0);
+ assertTrue(lock.isActive());
+
+ feature = new MyLockingFeature(true);
+
+ lock = roundTrip(lock);
+ assertTrue(lock.isActive());
+
+ LockCallback scallback = mock(LockCallback.class);
+
+ lock.extend(HOLD_SEC, scallback);
+ assertTrue(lock.isWaiting());
+
+ // run doExtend (in new feature)
+ runLock(0, 0);
+ assertTrue(lock.isActive());
+
+ verify(scallback).lockAvailable(lock);
+ verify(scallback, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests extend() on a serialized lock without a feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testDistributedLockExtendNoFeature() throws Exception {
+ DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // run doLock
+ runLock(0, 0);
+ assertTrue(lock.isActive());
+
+ DistributedLockManager.setLatestInstance(null);
+
+ lock = roundTrip(lock);
+ assertTrue(lock.isActive());
+
+ LockCallback scallback = mock(LockCallback.class);
+
+ lock.extend(HOLD_SEC, scallback);
+ assertTrue(lock.isUnavailable());
+
+ verify(scallback, never()).lockAvailable(lock);
+ verify(scallback).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests the case where the lock is freed and doUnlock called between the call to
+ * isUnavailable() and the call to compute().
+ */
+ @Test
+ public void testDistributedLockExtendUnlocked() {
+ feature = new FreeWithFreeLockingFeature(true);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ lock.extend(HOLD_SEC2, callback);
+ assertTrue(lock.isUnavailable());
+ verify(callback).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests the case where the lock is freed, but doUnlock is not completed, between the
+ * call to isUnavailable() and the call to compute().
+ */
+ @Test
+ public void testDistributedLockExtendLockFreed() {
+ feature = new FreeWithFreeLockingFeature(false);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ lock.extend(HOLD_SEC2, callback);
+ assertTrue(lock.isUnavailable());
+ verify(callback).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testDistributedLockScheduleRequest() {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ verify(callback).lockAvailable(lock);
+ }
+
+ @Test
+ public void testDistributedLockRescheduleRequest() throws SQLException {
+ // use a data source that throws an exception when getConnection() is called
+ InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
+ feature = invfeat;
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should fail and reschedule
+ runLock(0, 0);
+
+ // should still be waiting
+ assertTrue(lock.isWaiting());
+ verify(callback, never()).lockUnavailable(lock);
+
+ // free the lock while doLock is executing
+ invfeat.freeLock = true;
+
+ // try scheduled request - should just invoke doUnlock
+ runSchedule(0, 0);
+
+ // should still be waiting
+ assertTrue(lock.isUnavailable());
+ verify(callback, never()).lockUnavailable(lock);
+
+ // should have scheduled a retry of doUnlock
+ verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
+ }
+
+ @Test
+ public void testDistributedLockGetNextRequest() {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ /*
+ * run doLock. This should cause getNextRequest() to be called twice, once with a
+ * request in the queue, and the second time with request=null.
+ */
+ runLock(0, 0);
+ }
+
+ /**
+ * Tests getNextRequest(), where the same request is still in the queue the second
+ * time it's called.
+ */
+ @Test
+ public void testDistributedLockGetNextRequestSameRequest() {
+ // force reschedule to be invoked
+ feature = new InvalidDbLockingFeature(TRANSIENT);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ /*
+ * run doLock. This should cause getNextRequest() to be called twice, once with a
+ * request in the queue, and the second time with the same request again.
+ */
+ runLock(0, 0);
+
+ verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
+ }
+
+ @Test
+ public void testDistributedLockDoRequest() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ assertTrue(lock.isWaiting());
+
+ // run doLock via doRequest
+ runLock(0, 0);
+
+ assertTrue(lock.isActive());
+ }
+
+ /**
+ * Tests doRequest(), when doRequest() is already running within another thread.
+ */
+ @Test
+ public void testDistributedLockDoRequestBusy() {
+ /*
+ * this feature will invoke a request in a background thread while it's being run
+ * in a foreground thread.
+ */
+ AtomicBoolean running = new AtomicBoolean(false);
+ AtomicBoolean returned = new AtomicBoolean(false);
+
+ feature = new MyLockingFeature(true) {
+ @Override
+ protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+ return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean doDbInsert(Connection conn) throws SQLException {
+ if (running.get()) {
+ // already inside the thread - don't recurse any further
+ return super.doDbInsert(conn);
+ }
+
+ running.set(true);
+
+ Thread thread = new Thread(() -> {
+ // run doLock from within the new thread
+ runLock(0, 0);
+ });
+ thread.setDaemon(true);
+ thread.start();
+
+ // wait for the background thread to complete before continuing
+ try {
+ thread.join(5000);
+ } catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+ }
+
+ returned.set(!thread.isAlive());
+
+ return super.doDbInsert(conn);
+ }
+ };
+ }
+ };
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // run doLock
+ runLock(0, 0);
+
+ assertTrue(returned.get());
+ }
+
+ /**
+ * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
+ // throw run-time exception
+ when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
+
+ // use a data source that throws an exception when getConnection() is called
+ feature = new MyLockingFeature(true) {
+ @Override
+ protected BasicDataSource makeDataSource() throws Exception {
+ return datasrc;
+ }
+ };
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should NOT reschedule
+ runLock(0, 0);
+
+ assertTrue(lock.isUnavailable());
+ verify(callback).lockUnavailable(lock);
+
+ verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
+ }
+
+ /**
+ * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
+ * state.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
+ // throw run-time exception
+ when(datasrc.getConnection()).thenAnswer(answer -> {
+ lock.free();
+ throw new IllegalStateException(EXPECTED_EXCEPTION);
+ });
+
+ // use a data source that throws an exception when getConnection() is called
+ feature = new MyLockingFeature(true) {
+ @Override
+ protected BasicDataSource makeDataSource() throws Exception {
+ return datasrc;
+ }
+ };
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should NOT reschedule
+ runLock(0, 0);
+
+ assertTrue(lock.isUnavailable());
+ verify(callback, never()).lockUnavailable(lock);
+
+ verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
+ }
+
+ /**
+ * Tests doRequest() when the retry count gets exhausted.
+ */
+ @Test
+ public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
+ // use a data source that throws an exception when getConnection() is called
+ feature = new InvalidDbLockingFeature(TRANSIENT);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should fail and reschedule
+ runLock(0, 0);
+
+ // should still be waiting
+ assertTrue(lock.isWaiting());
+ verify(callback, never()).lockUnavailable(lock);
+
+ // try again, via SCHEDULER - first retry fails
+ runSchedule(0, 0);
+
+ // should still be waiting
+ assertTrue(lock.isWaiting());
+ verify(callback, never()).lockUnavailable(lock);
+
+ // try again, via SCHEDULER - final retry fails
+ runSchedule(1, 0);
+ assertTrue(lock.isUnavailable());
+
+ // now callback should have been called
+ verify(callback).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests doRequest() when a non-transient DB exception is thrown.
+ */
+ @Test
+ public void testDistributedLockDoRequestNotTransient() {
+ /*
+ * use a data source that throws a PERMANENT exception when getConnection() is
+ * called
+ */
+ feature = new InvalidDbLockingFeature(PERMANENT);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should fail
+ runLock(0, 0);
+
+ assertTrue(lock.isUnavailable());
+ verify(callback).lockUnavailable(lock);
+
+ // should not have scheduled anything new
+ verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
+ verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
+ }
+
+ @Test
+ public void testDistributedLockDoLock() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should simply do an insert
+ long tbegin = System.currentTimeMillis();
+ runLock(0, 0);
+
+ assertEquals(1, getRecordCount());
+ assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
+ verify(callback).lockAvailable(lock);
+ }
+
+ /**
+ * Tests doLock() when the lock is freed before doLock runs.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoLockFreed() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ lock.setState(LockState.UNAVAILABLE);
+
+ // invoke doLock - should do nothing
+ runLock(0, 0);
+
+ assertEquals(0, getRecordCount());
+
+ verify(callback, never()).lockAvailable(lock);
+ }
+
+ /**
+ * Tests doLock() when a DB exception is thrown.
+ */
+ @Test
+ public void testDistributedLockDoLockEx() {
+ // use a data source that throws an exception when getConnection() is called
+ feature = new InvalidDbLockingFeature(PERMANENT);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should simply do an insert
+ runLock(0, 0);
+
+ // lock should have failed due to exception
+ verify(callback).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
+ * to be called.
+ */
+ @Test
+ public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
+ // insert an expired record
+ insertRecord(RESOURCE, feature.getUuidString(), 0);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should simply do an update
+ runLock(0, 0);
+ verify(callback).lockAvailable(lock);
+ }
+
+ /**
+ * Tests doLock() when a locked record already exists.
+ */
+ @Test
+ public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
+ // insert an expired record
+ insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock
+ runLock(0, 0);
+
+ // lock should have failed because it's already locked
+ verify(callback).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testDistributedLockDoUnlock() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock()
+ runLock(0, 0);
+
+ lock.free();
+
+ // invoke doUnlock()
+ long tbegin = System.currentTimeMillis();
+ runLock(1, 0);
+
+ assertEquals(0, getRecordCount());
+ assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
+
+ assertTrue(lock.isUnavailable());
+
+ // no more callbacks should have occurred
+ verify(callback, times(1)).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests doUnlock() when a DB exception is thrown.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoUnlockEx() throws SQLException {
+ feature = new InvalidDbLockingFeature(PERMANENT);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // do NOT invoke doLock() - it will fail without a DB connection
+
+ lock.free();
+
+ // invoke doUnlock()
+ runLock(1, 0);
+
+ assertTrue(lock.isUnavailable());
+
+ // no more callbacks should have occurred
+ verify(callback, never()).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testDistributedLockDoExtend() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+
+ // call doExtend()
+ long tbegin = System.currentTimeMillis();
+ runLock(1, 0);
+
+ assertEquals(1, getRecordCount());
+ assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
+
+ assertTrue(lock.isActive());
+
+ // no more callbacks should have occurred
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ // extension should have succeeded
+ verify(callback2).lockAvailable(lock);
+ verify(callback2, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests doExtend() when the lock is freed before doExtend runs.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoExtendFreed() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ lock.extend(HOLD_SEC2, callback);
+
+ lock.setState(LockState.UNAVAILABLE);
+
+ // invoke doExtend - should do nothing
+ runLock(1, 0);
+
+ assertEquals(0, getRecordCount());
+
+ verify(callback, never()).lockAvailable(lock);
+ }
+
+ /**
+ * Tests doExtend() when the lock record is missing from the DB, thus requiring an
+ * insert.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+
+ // delete the record so it's forced to re-insert it
+ cleanDb();
+
+ // call doExtend()
+ long tbegin = System.currentTimeMillis();
+ runLock(1, 0);
+
+ assertEquals(1, getRecordCount());
+ assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
+
+ assertTrue(lock.isActive());
+
+ // no more callbacks should have occurred
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ // extension should have succeeded
+ verify(callback2).lockAvailable(lock);
+ verify(callback2, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests doExtend() when both update and insert fail.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException {
+ /*
+ * this feature will create a lock that returns false when doDbUpdate() is
+ * invoked, or when doDbInsert() is invoked a second time
+ */
+ feature = new MyLockingFeature(true) {
+ @Override
+ protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+ return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
+ private static final long serialVersionUID = 1L;
+ private int ntimes = 0;
+
+ @Override
+ protected boolean doDbInsert(Connection conn) throws SQLException {
+ if (ntimes++ > 0) {
+ return false;
+ }
+
+ return super.doDbInsert(conn);
+ }
+
+ @Override
+ protected boolean doDbUpdate(Connection conn) {
+ return false;
+ }
+ };
+ }
+ };
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+
+ // call doExtend()
+ runLock(1, 0);
+
+ assertTrue(lock.isUnavailable());
+
+ // no more callbacks should have occurred
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ // extension should have failed
+ verify(callback2, never()).lockAvailable(lock);
+ verify(callback2).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests doExtend() when an exception occurs.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoExtendEx() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ /*
+ * delete the record and insert one with a different owner, which will cause
+ * doDbInsert() to throw an exception
+ */
+ cleanDb();
+ insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+
+ // call doExtend()
+ runLock(1, 0);
+
+ assertTrue(lock.isUnavailable());
+
+ // no more callbacks should have occurred
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ // extension should have failed
+ verify(callback2, never()).lockAvailable(lock);
+ verify(callback2).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testDistributedLockToString() {
+ String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
+ assertNotNull(text);
+ assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
+ }
+
+ @Test
+ public void testMakeThreadPool() {
+ // use a REAL feature to test this
+ feature = new DistributedLockManager();
+
+ // this should create a thread pool
+ feature.beforeCreateLockManager(engine, new Properties());
+ feature.afterStart(engine);
+
+ shutdownFeature();
+ }
+
+ /**
+ * Performs a multi-threaded test of the locking facility.
+ *
+ * @throws InterruptedException if the current thread is interrupted while waiting for
+ * the background threads to complete
+ */
+ @Test
+ public void testMultiThreaded() throws InterruptedException {
+ feature = new DistributedLockManager();
+ feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
+ feature.afterStart(PolicyEngineConstants.getManager());
+
+ List<MyThread> threads = new ArrayList<>(MAX_THREADS);
+ for (int x = 0; x < MAX_THREADS; ++x) {
+ threads.add(new MyThread());
+ }
+
+ threads.forEach(Thread::start);
+
+ for (MyThread thread : threads) {
+ thread.join(6000);
+ assertFalse(thread.isAlive());
+ }
+
+ for (MyThread thread : threads) {
+ if (thread.err != null) {
+ throw thread.err;
+ }
+ }
+
+ assertTrue(nsuccesses.get() > 0);
+ }
+
+ private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
+ boolean waitForLock) {
+ return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
+ }
+
+ private DistributedLock roundTrip(DistributedLock lock) throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(lock);
+ }
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ try (ObjectInputStream ois = new ObjectInputStream(bais)) {
+ return (DistributedLock) ois.readObject();
+ }
+ }
+
+ /**
+ * Runs the checkExpired() action.
+ *
+ * @param nskip number of actions in the work queue to skip
+ * @param nadditional number of additional actions that appear in the work queue
+ * <i>after</i> the checkExpired action
+ * @param schedSec number of seconds for which the checker should have been scheduled
+ */
+ private void runChecker(int nskip, int nadditional, long schedSec) {
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
+ Runnable action = captor.getAllValues().get(nskip);
+ action.run();
+ }
+
+ /**
+ * Runs a lock action (e.g., doLock, doUnlock).
+ *
+ * @param nskip number of actions in the work queue to skip
+ * @param nadditional number of additional actions that appear in the work queue
+ * <i>after</i> the desired action
+ */
+ void runLock(int nskip, int nadditional) {
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
+
+ Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
+ action.run();
+ }
+
+ /**
+ * Runs a scheduled action (e.g., "retry" action).
+ *
+ * @param nskip number of actions in the work queue to skip
+ * @param nadditional number of additional actions that appear in the work queue
+ * <i>after</i> the desired action
+ */
+ void runSchedule(int nskip, int nadditional) {
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
+
+ Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
+ action.run();
+ }
+
+ /**
+ * Gets a count of the number of lock records in the DB.
+ *
+ * @return the number of lock records in the DB
+ * @throws SQLException if an error occurs accessing the DB
+ */
+ private int getRecordCount() throws SQLException {
+ try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
+ ResultSet result = stmt.executeQuery()) {
+
+ if (result.next()) {
+ return result.getInt(1);
+
+ } else {
+ return 0;
+ }
+ }
+ }
+
+ /**
+ * Determines if there is a record for the given resource whose expiration time is in
+ * the expected range.
+ *
+ * @param resourceId ID of the resource of interest
+ * @param uuidString UUID string of the owner
+ * @param holdSec seconds for which the lock was to be held
+ * @param tbegin earliest time, in milliseconds, at which the record could have been
+ * inserted into the DB
+ * @return {@code true} if a record is found, {@code false} otherwise
+ * @throws SQLException if an error occurs accessing the DB
+ */
+ private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
+ try (PreparedStatement stmt =
+ conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
+ + " WHERE resourceId=? AND host=? AND owner=?")) {
+
+ stmt.setString(1, resourceId);
+ stmt.setString(2, feature.getHostName());
+ stmt.setString(3, uuidString);
+
+ try (ResultSet result = stmt.executeQuery()) {
+ if (result.next()) {
+ int remaining = result.getInt(1);
+ long maxDiff = System.currentTimeMillis() - tbegin;
+ return (remaining >= 0 && holdSec - remaining <= maxDiff);
+
+ } else {
+ return false;
+ }
+ }
+ }
+ }
+
+ /**
+ * Inserts a record into the DB.
+ *
+ * @param resourceId ID of the resource of interest
+ * @param uuidString UUID string of the owner
+ * @param expireOffset offset, in seconds, from "now", at which the lock should expire
+ * @throws SQLException if an error occurs accessing the DB
+ */
+ private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
+ this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset);
+ }
+
+ private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
+ throws SQLException {
+ try (PreparedStatement stmt =
+ conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
+ + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
+
+ stmt.setString(1, resourceId);
+ stmt.setString(2, hostName);
+ stmt.setString(3, uuidString);
+ stmt.setInt(4, expireOffset);
+
+ assertEquals(1, stmt.executeUpdate());
+ }
+ }
+
+ /**
+ * Updates a record in the DB.
+ *
+ * @param resourceId ID of the resource of interest
+ * @param newUuid UUID string of the <i>new</i> owner
+ * @param expireOffset offset, in seconds, from "now", at which the lock should expire
+ * @throws SQLException if an error occurs accessing the DB
+ */
+ private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
+ try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
+ + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
+
+ stmt.setString(1, newHost);
+ stmt.setString(2, newUuid);
+ stmt.setInt(3, expireOffset);
+ stmt.setString(4, resourceId);
+
+ assertEquals(1, stmt.executeUpdate());
+ }
+ }
+
+ /**
+ * Feature that uses <i>exsvc</i> to execute requests.
+ */
+ private class MyLockingFeature extends DistributedLockManager {
+
+ public MyLockingFeature(boolean init) {
+ shutdownFeature();
+
+ exsvc = mock(ScheduledExecutorService.class);
+ when(engine.getExecutorService()).thenReturn(exsvc);
+
+ if (init) {
+ beforeCreateLockManager(engine, new Properties());
+ afterStart(engine);
+ }
+ }
+ }
+
+ /**
+ * Feature whose data source all throws exceptions.
+ */
+ private class InvalidDbLockingFeature extends MyLockingFeature {
+ private int errcode;
+ private boolean freeLock = false;
+
+ public InvalidDbLockingFeature(int errcode) {
+ // pass "false" because we have to set the error code BEFORE calling
+ // afterStart()
+ super(false);
+
+ this.errcode = errcode;
+
+ this.beforeCreateLockManager(engine, new Properties());
+ this.afterStart(engine);
+ }
+
+ @Override
+ protected BasicDataSource makeDataSource() throws Exception {
+ when(datasrc.getConnection()).thenAnswer(answer -> {
+ if (freeLock) {
+ freeLock = false;
+ lock.free();
+ }
+
+ throw new SQLException(EXPECTED_EXCEPTION, "", errcode);
+ });
+
+ doThrow(new SQLException(EXPECTED_EXCEPTION, "", errcode)).when(datasrc).close();
+
+ return datasrc;
+ }
+ }
+
+ /**
+ * Feature whose locks free themselves while free() is already running.
+ */
+ private class FreeWithFreeLockingFeature extends MyLockingFeature {
+ private boolean relock;
+
+ public FreeWithFreeLockingFeature(boolean relock) {
+ super(true);
+ this.relock = relock;
+ }
+
+ @Override
+ protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+
+ return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
+ private static final long serialVersionUID = 1L;
+ private boolean checked = false;
+
+ @Override
+ public boolean isUnavailable() {
+ if (checked) {
+ return super.isUnavailable();
+ }
+
+ checked = true;
+
+ // release and relock
+ free();
+
+ if (relock) {
+ // run doUnlock
+ runLock(1, 0);
+
+ // relock it
+ createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
+ }
+
+ return false;
+ }
+ };
+ }
+ }
+
+ /**
+ * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
+ * extend it, and then unlock it.
+ */
+ private class MyThread extends Thread {
+ AssertionError err = null;
+
+ public MyThread() {
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (int x = 0; x < MAX_LOOPS; ++x) {
+ makeAttempt();
+ }
+
+ } catch (AssertionError e) {
+ err = e;
+ }
+ }
+
+ private void makeAttempt() {
+ try {
+ Semaphore sem = new Semaphore(0);
+
+ LockCallback cb = new LockCallback() {
+ @Override
+ public void lockAvailable(Lock lock) {
+ sem.release();
+ }
+
+ @Override
+ public void lockUnavailable(Lock lock) {
+ sem.release();
+ }
+ };
+
+ Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
+
+ // wait for callback, whether available or unavailable
+ assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
+ if (!lock.isActive()) {
+ return;
+ }
+
+ nsuccesses.incrementAndGet();
+
+ assertEquals(1, nactive.incrementAndGet());
+
+ lock.extend(HOLD_SEC2, cb);
+ assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
+ assertTrue(lock.isActive());
+
+ // decrement BEFORE free()
+ nactive.decrementAndGet();
+
+ assertTrue(lock.free());
+ assertTrue(lock.isUnavailable());
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new AssertionError("interrupted", e);
+ }
+ }
+ }
+}
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java
new file mode 100644
index 00000000..5f76d657
--- /dev/null
+++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Properties;
+import java.util.TreeSet;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+
+public class DistributedLockPropertiesTest {
+
+ private Properties props;
+
+ /**
+ * Populates {@link #props}.
+ */
+ @Before
+ public void setUp() {
+ props = new Properties();
+
+ props.setProperty(DistributedLockProperties.DB_DRIVER, "my driver");
+ props.setProperty(DistributedLockProperties.DB_URL, "my url");
+ props.setProperty(DistributedLockProperties.DB_USER, "my user");
+ props.setProperty(DistributedLockProperties.DB_PASS, "my pass");
+ props.setProperty(DistributedLockProperties.TRANSIENT_ERROR_CODES, "10,-20,,,30");
+ props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "100");
+ props.setProperty(DistributedLockProperties.RETRY_SEC, "200");
+ props.setProperty(DistributedLockProperties.MAX_RETRIES, "300");
+ }
+
+ @Test
+ public void test() throws PropertyException {
+ DistributedLockProperties dlp = new DistributedLockProperties(props);
+
+ assertEquals("my driver", dlp.getDbDriver());
+ assertEquals("my url", dlp.getDbUrl());
+ assertEquals("my user", dlp.getDbUser());
+ assertEquals("my pass", dlp.getDbPwd());
+ assertEquals("10,-20,,,30", dlp.getErrorCodeStrings());
+ assertEquals("[-20, 10, 30]", new TreeSet<>(dlp.getTransientErrorCodes()).toString());
+ assertEquals(100, dlp.getExpireCheckSec());
+ assertEquals(200, dlp.getRetrySec());
+ assertEquals(300, dlp.getMaxRetries());
+
+ assertTrue(dlp.isTransient(10));
+ assertTrue(dlp.isTransient(-20));
+ assertTrue(dlp.isTransient(30));
+
+ assertFalse(dlp.isTransient(-10));
+
+ // invalid value
+ props.setProperty(DistributedLockProperties.TRANSIENT_ERROR_CODES, "10,abc,30");
+
+ assertThatThrownBy(() -> new DistributedLockProperties(props)).isInstanceOf(PropertyException.class)
+ .hasMessageContaining(DistributedLockProperties.TRANSIENT_ERROR_CODES);
+ }
+}
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java
deleted file mode 100644
index 68a5a31b..00000000
--- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.sql.SQLException;
-import org.apache.commons.dbcp2.BasicDataSource;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.onap.policy.common.utils.properties.exception.PropertyException;
-import org.onap.policy.drools.persistence.SystemPersistenceConstants;
-
-/**
- * Partially tests DistributedLockingFeature; most of the methods are tested via
- * {@link TargetLockTest}.
- */
-public class DistributedLockingFeatureTest {
- private static final String EXPECTED = "expected exception";
-
- private BasicDataSource dataSrc;
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
- }
-
- @Before
- public void setUp() throws Exception {
- dataSrc = mock(BasicDataSource.class);
- }
-
- @Test
- public void testGetSequenceNumber() {
- assertEquals(1000, new DistributedLockingFeature().getSequenceNumber());
- }
-
- @Test(expected = DistributedLockingFeatureException.class)
- public void testAfterStart_PropEx() {
- new DistributedLockingFeatureImpl(new PropertyException("prop", "val")).afterStart(null);
- }
-
- @Test(expected = DistributedLockingFeatureException.class)
- public void testAfterStart_InterruptEx() {
- new DistributedLockingFeatureImpl(new InterruptedException(EXPECTED)).afterStart(null);
- }
-
- @Test(expected = DistributedLockingFeatureException.class)
- public void testAfterStart_OtherEx() {
- new DistributedLockingFeatureImpl(new RuntimeException(EXPECTED)).afterStart(null);
- }
-
- @Test
- public void testCleanLockTable() throws Exception {
- when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED));
-
- new DistributedLockingFeatureImpl().afterStart(null);
- }
-
- /**
- * Feature that overrides {@link #makeDataSource()}.
- */
- private class DistributedLockingFeatureImpl extends DistributedLockingFeature {
- /**
- * Exception to throw when {@link #makeDataSource()} is invoked.
- */
- private final Exception makeEx;
-
- public DistributedLockingFeatureImpl() {
- makeEx = null;
- }
-
- public DistributedLockingFeatureImpl(Exception ex) {
- this.makeEx = ex;
- }
-
- @Override
- protected BasicDataSource makeDataSource() throws Exception {
- if (makeEx != null) {
- throw makeEx;
- }
-
- return dataSrc;
- }
- }
-}
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java
deleted file mode 100644
index 84eba6b6..00000000
--- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import org.apache.commons.dbcp2.BasicDataSource;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi.OperResult;
-import org.onap.policy.drools.persistence.SystemPersistenceConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TargetLockTest {
- private static final Logger logger = LoggerFactory.getLogger(TargetLockTest.class);
- private static final int MAX_AGE_SEC = 4 * 60;
- private static final String DB_CONNECTION =
- "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
- private static final String DB_USER = "user";
- private static final String DB_PASSWORD = "password";
- private static final String EXPECTED = "expected exception";
- private static final String MY_RESOURCE = "my-resource-id";
- private static final String MY_OWNER = "my-owner";
- private static final UUID MY_UUID = UUID.randomUUID();
- private static Connection conn = null;
- private static DistributedLockingFeature distLockFeat;
-
- /**
- * Setup the database.
- */
- @BeforeClass
- public static void setup() {
- getDbConnection();
- createTable();
- SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
- distLockFeat = new DistributedLockingFeature();
- distLockFeat.afterStart(null);
- }
-
- /**
- * Cleanup the lock.
- */
- @AfterClass
- public static void cleanUp() {
- distLockFeat.beforeShutdown(null);
- try {
- conn.close();
- } catch (SQLException e) {
- logger.error("Error in TargetLockTest.cleanUp()", e);
- }
- }
-
- /**
- * Wipe the database.
- */
- @Before
- public void wipeDb() {
- try (PreparedStatement lockDelete = conn.prepareStatement("DELETE FROM pooling.locks"); ) {
- lockDelete.executeUpdate();
- } catch (SQLException e) {
- logger.error("Error in TargetLockTest.wipeDb()", e);
- throw new RuntimeException(e);
- }
- }
-
- @Test
- public void testGrabLockSuccess() throws InterruptedException, ExecutionException {
- assertEquals(
- OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC));
-
- // attempt to grab expiredLock
- try (PreparedStatement updateStatement =
- conn.prepareStatement(
- "UPDATE pooling.locks SET expirationTime = timestampadd(second, -1, now()) WHERE resourceId = ?"); ) {
- updateStatement.setString(1, "resource1");
- updateStatement.executeUpdate();
-
- } catch (SQLException e) {
- logger.error("Error in TargetLockTest.testGrabLockSuccess()", e);
- throw new RuntimeException(e);
- }
-
- assertEquals(
- OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC));
-
- // cannot re-lock
- assertEquals(
- OperResult.OPER_DENIED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC));
-
- assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
- }
-
- @Test
- public void testExpiredLocks() throws Exception {
-
- // grab lock
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
-
- // force lock to expire
- try (PreparedStatement lockExpire =
- conn.prepareStatement(
- "UPDATE pooling.locks SET expirationTime = timestampadd(second, -?, now())"); ) {
- lockExpire.setInt(1, MAX_AGE_SEC + 1);
- lockExpire.executeUpdate();
- }
-
- assertEquals(
- OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner2", MAX_AGE_SEC));
- }
-
- @Test
- public void testGrabLockFail() throws InterruptedException, ExecutionException {
-
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
-
- assertEquals(
- OperResult.OPER_DENIED, distLockFeat.beforeLock("resource1", "owner2", MAX_AGE_SEC));
- }
-
- @Test
- public void testSecondGrab_UpdateOk() throws Exception {
- PreparedStatement grabLockInsert = mock(PreparedStatement.class);
- when(grabLockInsert.executeUpdate()).thenThrow(new SQLException(EXPECTED));
-
- PreparedStatement secondGrabUpdate = mock(PreparedStatement.class);
- when(secondGrabUpdate.executeUpdate()).thenReturn(1);
-
- Connection connMock = mock(Connection.class);
- when(connMock.prepareStatement(anyString())).thenReturn(grabLockInsert, secondGrabUpdate);
-
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenReturn(connMock);
-
- assertTrue(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).lock(MAX_AGE_SEC));
- }
-
- @Test
- public void testSecondGrab_UpdateFail_InsertOk() throws Exception {
- PreparedStatement grabLockInsert = mock(PreparedStatement.class);
- when(grabLockInsert.executeUpdate()).thenThrow(new SQLException(EXPECTED));
-
- PreparedStatement secondGrabUpdate = mock(PreparedStatement.class);
- when(secondGrabUpdate.executeUpdate()).thenReturn(0);
-
- PreparedStatement secondGrabInsert = mock(PreparedStatement.class);
- when(secondGrabInsert.executeUpdate()).thenReturn(1);
-
- Connection connMock = mock(Connection.class);
- when(connMock.prepareStatement(anyString())).thenReturn(grabLockInsert, secondGrabUpdate, secondGrabInsert);
-
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenReturn(connMock);
-
- assertTrue(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).lock(MAX_AGE_SEC));
- }
-
- @Test
- public void testSecondGrab_UpdateFail_InsertFail() throws Exception {
- PreparedStatement grabLockInsert = mock(PreparedStatement.class);
- when(grabLockInsert.executeUpdate()).thenThrow(new SQLException(EXPECTED));
-
- PreparedStatement secondGrabUpdate = mock(PreparedStatement.class);
- when(secondGrabUpdate.executeUpdate()).thenReturn(0);
-
- PreparedStatement secondGrabInsert = mock(PreparedStatement.class);
- when(secondGrabInsert.executeUpdate()).thenReturn(0);
-
- Connection connMock = mock(Connection.class);
- when(connMock.prepareStatement(anyString())).thenReturn(grabLockInsert, secondGrabUpdate, secondGrabInsert);
-
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenReturn(connMock);
-
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).lock(MAX_AGE_SEC));
- }
-
- @Test
- public void testUpdateLock() throws Exception {
- // not locked yet - refresh should fail
- assertEquals(
- OperResult.OPER_DENIED, distLockFeat.beforeRefresh("resource1", "owner1", MAX_AGE_SEC));
-
- // now lock it
- assertEquals(
- OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC));
-
- // refresh should work now
- assertEquals(
- OperResult.OPER_ACCEPTED, distLockFeat.beforeRefresh("resource1", "owner1", MAX_AGE_SEC));
-
- assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
-
- // expire the lock
- try (PreparedStatement updateStatement =
- conn.prepareStatement(
- "UPDATE pooling.locks SET expirationTime = timestampadd(second, -1, now()) WHERE resourceId = ?"); ) {
- updateStatement.setString(1, "resource1");
- updateStatement.executeUpdate();
- }
-
- // refresh should fail now
- assertEquals(
- OperResult.OPER_DENIED, distLockFeat.beforeRefresh("resource1", "owner1", MAX_AGE_SEC));
-
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
-
- // test exception case
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED));
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).refresh(MAX_AGE_SEC));
- }
-
- @Test
- public void testUnlock() throws Exception {
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
-
- assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeUnlock("resource1", "owner1"));
- assertEquals(
- OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner2", MAX_AGE_SEC));
-
- // test exception case
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED));
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).unlock());
- }
-
- @Test
- public void testIsActive() throws Exception {
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
- assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner2"));
-
- // isActive on expiredLock
- try (PreparedStatement updateStatement =
- conn.prepareStatement(
- "UPDATE pooling.locks SET expirationTime = timestampadd(second, -5, now()) WHERE resourceId = ?"); ) {
- updateStatement.setString(1, "resource1");
- updateStatement.executeUpdate();
- }
-
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
-
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
- // Unlock record, next isActive attempt should fail
- distLockFeat.beforeUnlock("resource1", "owner1");
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
-
- // test exception case for outer "try"
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED));
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isActive());
-
- // test exception case for inner "try"
- PreparedStatement stmt = mock(PreparedStatement.class);
- when(stmt.executeQuery()).thenThrow(new SQLException(EXPECTED));
- Connection connMock = mock(Connection.class);
- when(connMock.prepareStatement(anyString())).thenReturn(stmt);
- dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenReturn(connMock);
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isActive());
- }
-
- @Test
- public void unlockBeforeLock() {
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeUnlock("resource1", "owner1"));
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
- assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeUnlock("resource1", "owner1"));
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeUnlock("resource1", "owner1"));
- }
-
- @Test
- public void testIsLocked() throws Exception {
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLocked("resource1"));
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
- assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLocked("resource1"));
-
- // test exception case for outer "try"
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED));
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isLocked());
-
- // test exception case for inner "try"
- PreparedStatement stmt = mock(PreparedStatement.class);
- when(stmt.executeQuery()).thenThrow(new SQLException(EXPECTED));
- Connection connMock = mock(Connection.class);
- when(connMock.prepareStatement(anyString())).thenReturn(stmt);
- dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenReturn(connMock);
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isLocked());
- }
-
- private static void getDbConnection() {
- try {
- conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
- } catch (SQLException e) {
- logger.error("Error in TargetLockTest.getDBConnection()", e);
- }
- }
-
- private static void createTable() {
- String createString =
- "create table if not exists pooling.locks "
- + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
- + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))";
- try (PreparedStatement createStmt = conn.prepareStatement(createString); ) {
- createStmt.executeUpdate();
-
- } catch (SQLException e) {
- logger.error("Error in TargetLockTest.createTable()", e);
- }
- }
-}
diff --git a/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties b/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties
index d1a07e82..061cc608 100644
--- a/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties
+++ b/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties
@@ -2,14 +2,14 @@
# ============LICENSE_START=======================================================
# feature-distributed-locking
# ================================================================================
-# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,5 +22,8 @@ javax.persistence.jdbc.driver=org.h2.Driver
javax.persistence.jdbc.url=jdbc:h2:mem:pooling
javax.persistence.jdbc.user=user
javax.persistence.jdbc.password=password
-distributed.locking.lock.aging=150
-distributed.locking.heartbeat.interval=500 \ No newline at end of file
+
+distributed.locking.transient.error.codes=500
+distributed.locking.expire.check.seconds=900
+distributed.locking.retry.seconds=60
+distributed.locking.max.retries=2
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/AlwaysFailLock.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/AlwaysFailLock.java
new file mode 100644
index 00000000..0a4d327b
--- /dev/null
+++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/AlwaysFailLock.java
@@ -0,0 +1,71 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.core.lock;
+
+
+/**
+ * Lock implementation whose operations always fail.
+ */
+public class AlwaysFailLock extends LockImpl {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs the object.
+ */
+ public AlwaysFailLock() {
+ super();
+ }
+
+ /**
+ * Constructs the object.
+ *
+ * @param resourceId identifier of the resource to be locked
+ * @param ownerKey information identifying the owner requesting the lock
+ * @param holdSec amount of time, in seconds, for which the lock should be held once
+ * it has been granted, after which it will automatically be released
+ * @param callback callback to be invoked once the lock is granted, or subsequently
+ * lost; must not be {@code null}
+ */
+ public AlwaysFailLock(String resourceId, String ownerKey, int holdSec, LockCallback callback) {
+ super(LockState.UNAVAILABLE, resourceId, ownerKey, holdSec, callback);
+ }
+
+ /**
+ * Always returns false.
+ */
+ @Override
+ public boolean free() {
+ return false;
+ }
+
+ /**
+ * Always fails and invokes {@link LockCallback#lockUnavailable(Lock)}.
+ */
+ @Override
+ public void extend(int holdSec, LockCallback callback) {
+ synchronized (this) {
+ setHoldSec(holdSec);
+ setCallback(callback);
+ }
+
+ notifyUnavailable();
+ }
+}
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/Lock.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/Lock.java
index de62b24a..b2ed9c7f 100644
--- a/policy-core/src/main/java/org/onap/policy/drools/core/lock/Lock.java
+++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/Lock.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,145 +20,67 @@
package org.onap.policy.drools.core.lock;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map.Entry;
-import org.onap.policy.drools.utils.Pair;
-
/**
- * Lock that is held for a resource. This not only identifies the current owner of the
- * lock, but it also includes a queue of requesters. An item is associated with each
- * requester that is waiting in the queue. Note: this class is <b>not</b> thread-safe.
- *
- * @param <T> type of item to be associated with a request
+ * Lock held on a resource.
*/
-public class Lock<T> {
+public interface Lock {
/**
- * Result returned by <i>removeRequester()</i>.
+ * Frees/release the lock.
+ *
+ * <p/>
+ * Note: client code may choose to invoke this method <i>before</i> the lock has been
+ * granted.
+ *
+ * @return {@code true} if the request was accepted, {@code false} if the lock is
+ * unavailable
*/
- public enum RemoveResult {
- /**
- * The requester was the owner of the lock, and the lock is no longer needed,
- * because there were no other requesters waiting to get the lock.
- */
- UNLOCKED,
-
- /**
- * The requester was the owner of the lock, and has been replaced with the next
- * requester waiting in the queue.
- */
- RELOCKED,
-
- /**
- * The requester had been waiting in the queue, and has now been removed.
- */
- REMOVED,
-
- /**
- * The requester was not the owner, nor was it waiting in the queue.
- */
- NOT_FOUND
- }
+ boolean free();
/**
- * The last owner to grab the lock, never {@code null}.
+ * Determines if the lock is active.
+ *
+ * @return {@code true} if the lock is <b>ACTIVE</b>, {@code false} otherwise
*/
- private String owner;
+ boolean isActive();
/**
- * Requesters waiting to get the lock. Maps the requester (i.e., owner for which the
- * request is being made) to its associated item. Uses a Linked map so that the order
- * of the requesters is maintained. We don't expect many requesters for any given
- * lock, thus we'll start with a small hash size.
+ * Determines if the lock is unavailable. Once a lock object becomes unavailable, it
+ * will never become active again.
+ *
+ * @return {@code true} if the lock is <b>UNAVAILABLE</b>, {@code false} otherwise
*/
- private LinkedHashMap<String, T> requester2item = new LinkedHashMap<>(5);
+ boolean isUnavailable();
/**
- * Constructor.
- *
- * @param owner the current owner of this lock
+ * Determines if this object is waiting for a lock to be granted or denied. This
+ * applies when the lock is first created, or after {@link #extend(int, LockCallback)}
+ * has been invoked.
+ *
+ * @return {@code true} if the lock is <b>WAITING</b>, {@code false} otherwise
*/
- public Lock(String owner) {
- this.owner = owner;
- }
+ boolean isWaiting();
/**
- * Get owner.
- *
- * @return the current owner of the lock, or the last owner of the lock, if the lock
- * is not currently owned. (This will never be {@code null}.)
+ * Gets the ID of the resource to which the lock applies.
+ *
+ * @return the ID of the resource to which the lock applies
*/
- public String getOwner() {
- return owner;
- }
+ String getResourceId();
/**
- * Adds a new requester to the queue of requesters.
- *
- * @param requester the requester
- * @param item to be associated with the requester, must not be {@code null}
- * @return {@code true} if the requester was added, {@code false} if it already owns
- * the lock or is already in the queue
- * @throws IllegalArgumentException if the item is null
+ * Gets the lock's owner key.
+ *
+ * @return the lock's owner key
*/
- public boolean add(String requester, T item) {
- if (item == null) {
- throw SimpleLockManager.makeNullArgException("lock requester item is null");
- }
-
- if (requester.equals(owner)) {
- // requester already owns the lock
- return false;
- }
-
- T prev = requester2item.putIfAbsent(requester, item);
-
- // if there's a previous value, then that means this requester is already
- // waiting for a lock on this resource. In that case, we return false
- return (prev == null);
- }
+ String getOwnerKey();
/**
- * Removes a requester from the lock. The requester may currently own the lock, or it
- * may be in the queue waiting for the lock. Note: as this is agnostic to the type of
- * item associated with the requester, it is unable to notify the new owner that it's
- * the new owner; that is left up to the code that invokes this method.
- *
- * @param requester the requester
- * @param newOwner the new owner info is placed here, if the result is <i>RELOCKED</i>
- * @return the result
+ * Extends a lock an additional amount of time from now. The callback will always be
+ * invoked, and may be invoked <i>before</i> this method returns.
+ *
+ * @param holdSec the additional amount of time to hold the lock, in seconds
+ * @param callback callback to be invoked when the extension completes
*/
- public RemoveResult removeRequester(String requester, Pair<String, T> newOwner) {
-
- if (!requester.equals(owner)) {
- // requester does not currently own the lock - remove it from the
- // queue
- T ent = requester2item.remove(requester);
-
- // if there was an entry in the queue, then return true to indicate
- // that it was removed. Otherwise, return false
- return (ent != null ? RemoveResult.REMOVED : RemoveResult.NOT_FOUND);
- }
-
- /*
- * requester was the owner - find something to take over
- */
- Iterator<Entry<String, T>> it = requester2item.entrySet().iterator();
- if (!it.hasNext()) {
- // no one to take over the lock - it's now unlocked
- return RemoveResult.UNLOCKED;
- }
-
- // there's another requester to take over
- Entry<String, T> ent = it.next();
- it.remove();
-
- owner = ent.getKey();
-
- newOwner.first(owner);
- newOwner.second(ent.getValue());
-
- return RemoveResult.RELOCKED;
- }
+ void extend(int holdSec, LockCallback callback);
}
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiConstants.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockCallback.java
index 8510e3d9..fae1cb43 100644
--- a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiConstants.java
+++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockCallback.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * api-resource-locks
+ * ONAP
* ================================================================================
* Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
@@ -20,20 +20,25 @@
package org.onap.policy.drools.core.lock;
-import lombok.Getter;
-import org.onap.policy.common.utils.services.OrderedServiceImpl;
-
-public class PolicyResourceLockFeatureApiConstants {
+/**
+ * Callback invoked when a lock is granted or lost.
+ *
+ * <p/>
+ * Note: these methods may or may not be invoked by the thread that requested the lock.
+ */
+public interface LockCallback {
/**
- * 'FeatureAPI.impl.getList()' returns an ordered list of objects implementing the
- * 'FeatureAPI' interface.
+ * Called to indicate that a lock has been granted.
+ *
+ * @param lock lock that has been granted
*/
- @Getter
- private static final OrderedServiceImpl<PolicyResourceLockFeatureApi> impl =
- new OrderedServiceImpl<>(PolicyResourceLockFeatureApi.class);
+ void lockAvailable(Lock lock);
- private PolicyResourceLockFeatureApiConstants() {
- // do nothing
- }
+ /**
+ * Called to indicate that a lock is permanently unavailable (e.g., lost, expired).
+ *
+ * @param lock lock that has been lost
+ */
+ void lockUnavailable(Lock lock);
}
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockImpl.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockImpl.java
new file mode 100644
index 00000000..9596dbe8
--- /dev/null
+++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockImpl.java
@@ -0,0 +1,158 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.core.lock;
+
+import java.io.Serializable;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.Setter;
+import lombok.ToString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Lock implementation.
+ */
+@Getter
+@Setter
+@ToString(exclude = {"callback"})
+public class LockImpl implements Lock, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger logger = LoggerFactory.getLogger(LockImpl.class);
+
+ private LockState state;
+ private final String resourceId;
+ private final String ownerKey;
+ private transient LockCallback callback;
+ private int holdSec;
+
+ /**
+ * Constructs the object.
+ */
+ public LockImpl() {
+ this.state = LockState.UNAVAILABLE;
+ this.resourceId = null;
+ this.ownerKey = null;
+ this.callback = null;
+ this.holdSec = 0;
+ }
+
+ /**
+ * Constructs the object.
+ *
+ * @param state the initial lock state
+ * @param resourceId identifier of the resource to be locked
+ * @param ownerKey information identifying the owner requesting the lock
+ * @param holdSec amount of time, in seconds, for which the lock should be held once
+ * it has been granted, after which it will automatically be released
+ * @param callback callback to be invoked once the lock is granted, or subsequently
+ * lost; must not be {@code null}
+ */
+ public LockImpl(@NonNull LockState state, @NonNull String resourceId, @NonNull String ownerKey, int holdSec,
+ @NonNull LockCallback callback) {
+
+ if (holdSec < 0) {
+ throw new IllegalArgumentException("holdSec is negative");
+ }
+
+ this.state = state;
+ this.resourceId = resourceId;
+ this.ownerKey = ownerKey;
+ this.callback = callback;
+ this.holdSec = holdSec;
+ }
+
+ @Override
+ public boolean isActive() {
+ return (getState() == LockState.ACTIVE);
+ }
+
+ @Override
+ public boolean isUnavailable() {
+ return (getState() == LockState.UNAVAILABLE);
+ }
+
+ @Override
+ public boolean isWaiting() {
+ return (getState() == LockState.WAITING);
+ }
+
+ /**
+ * This method always succeeds, unless the lock is already unavailable.
+ */
+ @Override
+ public synchronized boolean free() {
+ if (isUnavailable()) {
+ return false;
+ }
+
+ logger.info("releasing lock: {}", this);
+ setState(LockState.UNAVAILABLE);
+
+ return true;
+ }
+
+ /**
+ * This method always succeeds, unless the lock is already unavailable.
+ */
+ @Override
+ public void extend(int holdSec, LockCallback callback) {
+ synchronized (this) {
+ if (isUnavailable()) {
+ return;
+ }
+
+ logger.info("lock granted: {}", this);
+ setState(LockState.ACTIVE);
+ setHoldSec(holdSec);
+ setCallback(callback);
+ }
+
+ notifyAvailable();
+ }
+
+ /**
+ * Invokes the {@link LockCallback#lockAvailable(Lock)}, <i>from the current
+ * thread</i>. Note: subclasses may choose to invoke the callback from other threads.
+ */
+ public void notifyAvailable() {
+ try {
+ callback.lockAvailable(this);
+
+ } catch (RuntimeException e) {
+ logger.warn("lock callback threw an exception", e);
+ }
+ }
+
+ /**
+ * Invokes the {@link LockCallback#lockUnavailable(Lock)}, <i>from the current
+ * thread</i>. Note: subclasses may choose to invoke the callback from other threads.
+ */
+ public void notifyUnavailable() {
+ try {
+ callback.lockUnavailable(this);
+
+ } catch (RuntimeException e) {
+ logger.warn("lock callback threw an exception", e);
+ }
+ }
+}
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockState.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockState.java
new file mode 100644
index 00000000..41699ce6
--- /dev/null
+++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockState.java
@@ -0,0 +1,43 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.core.lock;
+
+/**
+ * States of a Lock.
+ */
+
+public enum LockState {
+
+ /**
+ * Waiting for the lock request to complete.
+ */
+ WAITING,
+
+ /**
+ * This lock currently holds the resource.
+ */
+ ACTIVE,
+
+ /**
+ * The resource is no longer available to the lock.
+ */
+ UNAVAILABLE
+}
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApi.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApi.java
deleted file mode 100644
index b7968486..00000000
--- a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApi.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * api-resource-locks
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.core.lock;
-
-import org.onap.policy.common.utils.services.OrderedService;
-
-/**
- * Resource locks. Each lock has an "owner", which is intended to be unique across a
- * single instance of a running PolicyEngine.
- *
- * <p>This interface provides a way to invoke optional features at various points in the
- * code. At appropriate points in the application, the code iterates through this list,
- * invoking these optional methods.
- *
- * <p>Implementers may choose to implement a level of locking appropriate to the application.
- * For instance, they may choose to implement an engine-wide locking scheme, or they may
- * choose to implement a global locking scheme (e.g., through a shared DB).
- */
-public interface PolicyResourceLockFeatureApi extends OrderedService {
-
- /**
- * Result of a requested operation.
- */
- public enum OperResult {
-
- /**
- * The implementer accepted the request; no additional locking logic should be
- * performed.
- */
- OPER_ACCEPTED,
-
- /**
- * The implementer denied the request; no additional locking logic should be
- * performed.
- */
- OPER_DENIED,
-
-
- /**
- * The implementer did not handle the request; additional locking logic <i>should
- * be</i> performed.
- */
- OPER_UNHANDLED
- }
-
- /**
- * This method is called before a lock is acquired on a resource.
- *
- * @param resourceId resource id
- * @param owner owner
- * @param holdSec the amount of time, in seconds, that the lock should be held
- * @return the result, where <b>OPER_DENIED</b> indicates that the lock is currently
- * held by another owner
- */
- public default OperResult beforeLock(String resourceId, String owner, int holdSec) {
- return OperResult.OPER_UNHANDLED;
- }
-
- /**
- * This method is called after a lock for a resource has been acquired or denied.
- *
- * @param resourceId resource id
- * @param owner owner
- * @param locked {@code true} if the lock was acquired, {@code false} if it was denied
- * @return {@code true} if the implementer handled the request, {@code false}
- * otherwise
- */
- public default boolean afterLock(String resourceId, String owner, boolean locked) {
- return false;
- }
-
- /**
- * This method is called before a lock is refreshed on a resource. It may be invoked
- * repeatedly to extend the time that a lock is held.
- *
- * @param resourceId resource id
- * @param owner owner
- * @param holdSec the amount of time, in seconds, that the lock should be held
- * @return the result, where <b>OPER_DENIED</b> indicates that the resource is not
- * currently locked by the given owner
- */
- public default OperResult beforeRefresh(String resourceId, String owner, int holdSec) {
- return OperResult.OPER_UNHANDLED;
- }
-
- /**
- * This method is called after a lock for a resource has been refreshed (or after the
- * refresh has been denied).
- *
- * @param resourceId resource id
- * @param owner owner
- * @param locked {@code true} if the lock was acquired, {@code false} if it was denied
- * @return {@code true} if the implementer handled the request, {@code false}
- * otherwise
- */
- public default boolean afterRefresh(String resourceId, String owner, boolean locked) {
- return false;
- }
-
- /**
- * This method is called before a lock on a resource is released.
- *
- * @param resourceId resource id
- * @param owner owner
- * @return the result, where <b>OPER_DENIED</b> indicates that the lock is not
- * currently held by the given owner
- */
- public default OperResult beforeUnlock(String resourceId, String owner) {
- return OperResult.OPER_UNHANDLED;
- }
-
- /**
- * This method is called after a lock on a resource is released.
- *
- * @param resourceId resource id
- * @param owner owner
- * @param unlocked {@code true} if the lock was released, {@code false} if the owner
- * did not have a lock on the resource
- * @return {@code true} if the implementer handled the request, {@code false}
- * otherwise
- */
- public default boolean afterUnlock(String resourceId, String owner, boolean unlocked) {
- return false;
- }
-
- /**
- * This method is called before a check is made to determine if a resource is locked.
- *
- * @param resourceId resource id
- * @return the result, where <b>OPER_ACCEPTED</b> indicates that the resource is
- * locked, while <b>OPER_DENIED</b> indicates that it is not
- */
- public default OperResult beforeIsLocked(String resourceId) {
- return OperResult.OPER_UNHANDLED;
- }
-
- /**
- * This method is called before a check is made to determine if a particular owner
- * holds the lock on a resource.
- *
- * @param resourceId resource id
- * @param owner owner
- * @return the result, where <b>OPER_ACCEPTED</b> indicates that the resource is
- * locked by the given owner, while <b>OPER_DENIED</b> indicates that it is
- * not
- */
- public default OperResult beforeIsLockedBy(String resourceId, String owner) {
- return OperResult.OPER_UNHANDLED;
- }
-}
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockManager.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockManager.java
index 0e73eac1..bbf7d229 100644
--- a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockManager.java
+++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockManager.java
@@ -20,213 +20,36 @@
package org.onap.policy.drools.core.lock;
-import java.util.List;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi.OperResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.onap.policy.common.capabilities.Lockable;
+import org.onap.policy.common.capabilities.Startable;
/**
- * Manager of resource locks. Checks for API implementers.
+ * Manager of resource locks.
*/
-public class PolicyResourceLockManager extends SimpleLockManager {
-
- private static Logger logger = LoggerFactory.getLogger(PolicyResourceLockManager.class);
-
- /**
- * Used by junit tests.
- */
- protected PolicyResourceLockManager() {
- super();
- }
-
- /**
- * Get instance.
- *
- * @return the manager singleton
- */
- public static PolicyResourceLockManager getInstance() {
- return Singleton.instance;
- }
-
- @Override
- public boolean lock(String resourceId, String owner, int holdSec) {
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
- if (owner == null) {
- throw makeNullArgException(MSG_NULL_OWNER);
- }
-
-
- return doBoolIntercept(impl -> impl.beforeLock(resourceId, owner, holdSec), () -> {
-
- // implementer didn't do the work - defer to the superclass
- boolean locked = super.lock(resourceId, owner, holdSec);
-
- doIntercept(false, impl -> impl.afterLock(resourceId, owner, locked));
-
- return locked;
- });
- }
-
- @Override
- public boolean refresh(String resourceId, String owner, int holdSec) {
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
- if (owner == null) {
- throw makeNullArgException(MSG_NULL_OWNER);
- }
-
-
- return doBoolIntercept(impl -> impl.beforeRefresh(resourceId, owner, holdSec), () -> {
-
- // implementer didn't do the work - defer to the superclass
- boolean refreshed = super.refresh(resourceId, owner, holdSec);
-
- doIntercept(false, impl -> impl.afterRefresh(resourceId, owner, refreshed));
-
- return refreshed;
- });
- }
-
- @Override
- public boolean unlock(String resourceId, String owner) {
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
- if (owner == null) {
- throw makeNullArgException(MSG_NULL_OWNER);
- }
-
-
- return doBoolIntercept(impl -> impl.beforeUnlock(resourceId, owner), () -> {
-
- // implementer didn't do the work - defer to the superclass
- boolean unlocked = super.unlock(resourceId, owner);
-
- doIntercept(false, impl -> impl.afterUnlock(resourceId, owner, unlocked));
-
- return unlocked;
- });
- }
-
- /**
- * Is locked.
- *
- * @throws IllegalArgumentException if the resourceId is {@code null}
- */
- @Override
- public boolean isLocked(String resourceId) {
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
-
- return doBoolIntercept(impl -> impl.beforeIsLocked(resourceId), () ->
-
- // implementer didn't do the work - defer to the superclass
- super.isLocked(resourceId)
- );
- }
+public interface PolicyResourceLockManager extends Startable, Lockable {
/**
- * Is locked by.
+ * Requests a lock on a resource. Typically, the lock is not immediately granted,
+ * though a "lock" object is always returned. Once the lock has been granted (or
+ * denied), the callback will be invoked to indicate the result.
*
- * @throws IllegalArgumentException if the resourceId or owner is {@code null}
- */
- @Override
- public boolean isLockedBy(String resourceId, String owner) {
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
- if (owner == null) {
- throw makeNullArgException(MSG_NULL_OWNER);
- }
-
- return doBoolIntercept(impl -> impl.beforeIsLockedBy(resourceId, owner), () ->
-
- // implementer didn't do the work - defer to the superclass
- super.isLockedBy(resourceId, owner)
- );
- }
-
- /**
- * Applies a function to each implementer of the lock feature. Returns as soon as one
- * of them returns a result other than <b>OPER_UNHANDLED</b>. If they all return
- * <b>OPER_UNHANDLED</b>, then it returns the result of applying the default function.
+ * <p/>
+ * Notes:
+ * <dl>
+ * <li>The callback may be invoked <i>before</i> this method returns</li>
+ * <li>The implementation need not honor waitForLock={@code true}</li>
+ * </dl>
*
- * @param interceptFunc intercept function
- * @param defaultFunc default function
- * @return {@code true} if success, {@code false} otherwise
- */
- private boolean doBoolIntercept(Function<PolicyResourceLockFeatureApi, OperResult> interceptFunc,
- Supplier<Boolean> defaultFunc) {
-
- OperResult result = doIntercept(OperResult.OPER_UNHANDLED, interceptFunc);
- if (result != OperResult.OPER_UNHANDLED) {
- return (result == OperResult.OPER_ACCEPTED);
- }
-
- return defaultFunc.get();
- }
-
- /**
- * Applies a function to each implementer of the lock feature. Returns as soon as one
- * of them returns a non-null value.
- *
- * @param continueValue if the implementer returns this value, then it continues to
- * check addition implementers
- * @param func function to be applied to the implementers
- * @return first non-null value returned by an implementer, <i>continueValue</i> if
- * they all returned <i>continueValue</i>
- */
- private <T> T doIntercept(T continueValue, Function<PolicyResourceLockFeatureApi, T> func) {
-
- for (PolicyResourceLockFeatureApi impl : getImplementers()) {
- try {
- T result = func.apply(impl);
- if (result != continueValue) {
- return result;
- }
-
- } catch (RuntimeException e) {
- logger.warn("lock feature {} threw an exception", impl, e);
- }
- }
-
- return continueValue;
- }
-
- // these may be overridden by junit tests
-
- /**
- * Get implementers.
- *
- * @return the list of feature implementers
- */
- protected List<PolicyResourceLockFeatureApi> getImplementers() {
- return PolicyResourceLockFeatureApiConstants.getImpl().getList();
- }
-
- /**
- * Initialization-on-demand holder idiom.
- */
- private static class Singleton {
-
- private static final PolicyResourceLockManager instance = new PolicyResourceLockManager();
-
- /**
- * Not invoked.
- */
- private Singleton() {
- super();
- }
- }
+ * @param resourceId identifier of the resource to be locked
+ * @param ownerKey information identifying the owner requesting the lock
+ * @param holdSec amount of time, in seconds, for which the lock should be held once
+ * it has been granted, after which it will automatically be released
+ * @param callback callback to be invoked once the lock is granted, or subsequently
+ * lost; must not be {@code null}
+ * @param waitForLock {@code true} to wait for the lock, if it is currently locked,
+ * {@code false} otherwise
+ * @return a new lock
+ */
+ public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback,
+ boolean waitForLock);
}
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/SimpleLockManager.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/SimpleLockManager.java
deleted file mode 100644
index 427fbbc6..00000000
--- a/policy-core/src/main/java/org/onap/policy/drools/core/lock/SimpleLockManager.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.core.lock;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import org.onap.policy.common.utils.time.CurrentTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple lock manager. Callbacks are ignored. Does not redirect to lock feature
- * implementers.
- */
-public class SimpleLockManager {
-
- protected static Logger logger = LoggerFactory.getLogger(SimpleLockManager.class);
-
- // messages used in exceptions
- public static final String MSG_NULL_RESOURCE_ID = "null resourceId";
- public static final String MSG_NULL_OWNER = "null owner";
-
- /**
- * Used to access the current time. May be overridden by junit tests.
- */
- private static CurrentTime currentTime = new CurrentTime();
-
- /**
- * Used to synchronize updates to {@link #resource2data} and {@link #locks}.
- */
- private final Object locker = new Object();
-
- /**
- * Maps a resource to its lock data. Lock data is stored in both this and in
- * {@link #locks}.
- */
- private final Map<String, Data> resource2data = new HashMap<>();
-
- /**
- * Lock data, sorted by expiration time. Lock data is stored in both this and in
- * {@link #resource2data}. Whenever a lock operation is performed, this structure is
- * examined and any expired locks are removed; thus no timer threads are needed to
- * remove expired locks.
- */
- private final SortedSet<Data> locks = new TreeSet<>();
-
- /**
- * Constructor.
- */
- public SimpleLockManager() {
- super();
- }
-
- /**
- * Attempts to lock a resource, rejecting the lock if it is already owned, even if
- * it's the same owner; the same owner can use {@link #refresh(String, String, int)
- * refresh()}, instead, to extend a lock on a resource.
- *
- * @param resourceId resource id
- * @param owner owner
- * @param holdSec the amount of time, in seconds, that the lock should be held
- * @return {@code true} if locked, {@code false} if the resource is already locked by
- * a different owner
- * @throws IllegalArgumentException if the resourceId or owner is {@code null}
- */
- public boolean lock(String resourceId, String owner, int holdSec) {
-
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
- if (owner == null) {
- throw makeNullArgException(MSG_NULL_OWNER);
- }
-
- boolean locked = false;
-
- synchronized (locker) {
- cleanUpLocks();
-
- if (!resource2data.containsKey(resourceId)) {
- Data data = new Data(owner, resourceId, currentTime.getMillis() + TimeUnit.SECONDS.toMillis(holdSec));
- resource2data.put(resourceId, data);
- locks.add(data);
- locked = true;
- }
- }
-
- logger.info("lock {} for resource {} owner {}", locked, resourceId, owner);
-
- return locked;
- }
-
- /**
- * Attempts to refresh a lock on a resource.
- *
- * @param resourceId resource id
- * @param owner owner
- * @param holdSec the amount of time, in seconds, that the lock should be held
- * @return {@code true} if locked, {@code false} if the resource is not currently
- * locked by the given owner
- * @throws IllegalArgumentException if the resourceId or owner is {@code null}
- */
- public boolean refresh(String resourceId, String owner, int holdSec) {
-
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
- if (owner == null) {
- throw makeNullArgException(MSG_NULL_OWNER);
- }
-
- boolean refreshed = false;
-
- synchronized (locker) {
- cleanUpLocks();
-
- Data existingLock = resource2data.get(resourceId);
- if (existingLock != null && existingLock.getOwner().equals(owner)) {
- // MUST remove the existing lock from the set
- locks.remove(existingLock);
-
- refreshed = true;
-
- Data data = new Data(owner, resourceId, currentTime.getMillis() + TimeUnit.SECONDS.toMillis(holdSec));
- resource2data.put(resourceId, data);
- locks.add(data);
- }
- }
-
- logger.info("refresh lock {} for resource {} owner {}", refreshed, resourceId, owner);
-
- return refreshed;
- }
-
- /**
- * Unlocks a resource.
- *
- * @param resourceId resource id
- * @param owner owner
- * @return {@code true} if unlocked, {@code false} if the given owner does not
- * currently hold a lock on the resource
- * @throws IllegalArgumentException if the resourceId or owner is {@code null}
- */
- public boolean unlock(String resourceId, String owner) {
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
- if (owner == null) {
- throw makeNullArgException(MSG_NULL_OWNER);
- }
-
- Data data;
-
- synchronized (locker) {
- cleanUpLocks();
-
- if ((data = resource2data.get(resourceId)) != null) {
- if (owner.equals(data.getOwner())) {
- resource2data.remove(resourceId);
- locks.remove(data);
-
- } else {
- data = null;
- }
- }
- }
-
- boolean unlocked = (data != null);
- logger.info("unlock resource {} owner {} = {}", resourceId, owner, unlocked);
-
- return unlocked;
- }
-
- /**
- * Determines if a resource is locked by anyone.
- *
- * @param resourceId resource id
- * @return {@code true} if the resource is locked, {@code false} otherwise
- * @throws IllegalArgumentException if the resourceId is {@code null}
- */
- public boolean isLocked(String resourceId) {
-
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
- boolean locked;
-
- synchronized (locker) {
- cleanUpLocks();
-
- locked = resource2data.containsKey(resourceId);
- }
-
- logger.debug("resource {} isLocked = {}", resourceId, locked);
-
- return locked;
- }
-
- /**
- * Determines if a resource is locked by a particular owner.
- *
- * @param resourceId resource id
- * @param owner owner
- * @return {@code true} if the resource is locked, {@code false} otherwise
- * @throws IllegalArgumentException if the resourceId or owner is {@code null}
- */
- public boolean isLockedBy(String resourceId, String owner) {
-
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
- if (owner == null) {
- throw makeNullArgException(MSG_NULL_OWNER);
- }
-
- Data data;
-
- synchronized (locker) {
- cleanUpLocks();
-
- data = resource2data.get(resourceId);
- }
-
- boolean locked = (data != null && owner.equals(data.getOwner()));
- logger.debug("resource {} isLockedBy {} = {}", resourceId, owner, locked);
-
- return locked;
- }
-
- /**
- * Releases expired locks.
- */
- private void cleanUpLocks() {
- long tcur = currentTime.getMillis();
-
- synchronized (locker) {
- Iterator<Data> it = locks.iterator();
- while (it.hasNext()) {
- Data data = it.next();
- if (data.getExpirationMs() <= tcur) {
- it.remove();
- resource2data.remove(data.getResource());
- } else {
- break;
- }
- }
- }
- }
-
- /**
- * Makes an exception for when an argument is {@code null}.
- *
- * @param msg exception message
- * @return a new Exception
- */
- public static IllegalArgumentException makeNullArgException(String msg) {
- return new IllegalArgumentException(msg);
- }
-
- /**
- * Data for a single Lock. Sorts by expiration time, then resource, and
- * then owner.
- */
- protected static class Data implements Comparable<Data> {
-
- /**
- * Owner of the lock.
- */
- private final String owner;
-
- /**
- * Resource that is locked.
- */
- private final String resource;
-
- /**
- * Time when the lock will expire, in milliseconds.
- */
- private final long texpireMs;
-
- /**
- * Constructor.
- *
- * @param resource resource
- * @param owner owner
- * @param texpireMs time expire in milliseconds
- */
- public Data(String owner, String resource, long texpireMs) {
- this.owner = owner;
- this.resource = resource;
- this.texpireMs = texpireMs;
- }
-
- public String getOwner() {
- return owner;
- }
-
- public String getResource() {
- return resource;
- }
-
- public long getExpirationMs() {
- return texpireMs;
- }
-
- @Override
- public int compareTo(Data data) {
- int diff = Long.compare(texpireMs, data.texpireMs);
- if (diff == 0) {
- diff = resource.compareTo(data.resource);
- }
- if (diff == 0) {
- diff = owner.compareTo(data.owner);
- }
- return diff;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((owner == null) ? 0 : owner.hashCode());
- result = prime * result + ((resource == null) ? 0 : resource.hashCode());
- result = prime * result + (int) (texpireMs ^ (texpireMs >>> 32));
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- Data other = (Data) obj;
- if (owner == null) {
- if (other.owner != null) {
- return false;
- }
- } else if (!owner.equals(other.owner)) {
- return false;
- }
- if (resource == null) {
- if (other.resource != null) {
- return false;
- }
- } else if (!resource.equals(other.resource)) {
- return false;
- }
- return (texpireMs == other.texpireMs);
- }
- }
-}
diff --git a/policy-core/src/test/java/org/onap/policy/drools/core/lock/AlwaysFailLockTest.java b/policy-core/src/test/java/org/onap/policy/drools/core/lock/AlwaysFailLockTest.java
new file mode 100644
index 00000000..ce4ca5fd
--- /dev/null
+++ b/policy-core/src/test/java/org/onap/policy/drools/core/lock/AlwaysFailLockTest.java
@@ -0,0 +1,106 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.core.lock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AlwaysFailLockTest {
+ private static final String RESOURCE = "hello";
+ private static final String OWNER_KEY = "world";
+ private static final int HOLD_SEC = 10;
+ private static final int HOLD_SEC2 = 10;
+
+ private LockCallback callback;
+ private AlwaysFailLock lock;
+
+ /**
+ * Populates {@link #lock}.
+ */
+ @Before
+ public void setUp() {
+ callback = mock(LockCallback.class);
+
+ lock = new AlwaysFailLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback);
+ }
+
+ @Test
+ public void testSerializable() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(lock);
+ }
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ try (ObjectInputStream ois = new ObjectInputStream(bais)) {
+ lock = (AlwaysFailLock) ois.readObject();
+ }
+
+ assertEquals(LockState.UNAVAILABLE, lock.getState());
+ assertEquals(RESOURCE, lock.getResourceId());
+ assertEquals(OWNER_KEY, lock.getOwnerKey());
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+
+ // these fields are transient
+ assertNull(lock.getCallback());
+ }
+
+ @Test
+ public void testAlwaysFailLockNoArgs() {
+ // verify that no-arg constructor doesn't throw an exception
+ new AlwaysFailLock();
+ }
+
+ @Test
+ public void testAlwaysFailLock() {
+ assertTrue(lock.isUnavailable());
+ assertEquals(RESOURCE, lock.getResourceId());
+ assertEquals(OWNER_KEY, lock.getOwnerKey());
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+ assertSame(callback, lock.getCallback());
+ }
+
+ @Test
+ public void testFree() {
+ assertFalse(lock.free());
+ assertTrue(lock.isUnavailable());
+ }
+
+ @Test
+ public void testExtend() {
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+
+ assertEquals(HOLD_SEC2, lock.getHoldSec());
+ assertSame(callback2, lock.getCallback());
+ }
+}
diff --git a/policy-core/src/test/java/org/onap/policy/drools/core/lock/LockImplTest.java b/policy-core/src/test/java/org/onap/policy/drools/core/lock/LockImplTest.java
new file mode 100644
index 00000000..aab04dc4
--- /dev/null
+++ b/policy-core/src/test/java/org/onap/policy/drools/core/lock/LockImplTest.java
@@ -0,0 +1,261 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.core.lock;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LockImplTest {
+ private static final LockState STATE = LockState.WAITING;
+ private static final String RESOURCE = "hello";
+ private static final String OWNER_KEY = "world";
+ private static final int HOLD_SEC = 10;
+ private static final int HOLD_SEC2 = 20;
+ private static final String EXPECTED_EXCEPTION = "expected exception";
+
+ private LockCallback callback;
+ private LockImpl lock;
+
+ /**
+ * Populates {@link #lock}.
+ */
+ @Before
+ public void setUp() {
+ callback = mock(LockCallback.class);
+
+ lock = new LockImpl(STATE, RESOURCE, OWNER_KEY, HOLD_SEC, callback);
+ }
+
+ @Test
+ public void testSerializable() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(lock);
+ }
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ try (ObjectInputStream ois = new ObjectInputStream(bais)) {
+ lock = (LockImpl) ois.readObject();
+ }
+
+ assertEquals(STATE, lock.getState());
+ assertEquals(RESOURCE, lock.getResourceId());
+ assertEquals(OWNER_KEY, lock.getOwnerKey());
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+
+ // these fields are transient
+ assertNull(lock.getCallback());
+ }
+
+ @Test
+ public void testLockImplNoArgs() {
+ // use no-arg constructor
+ lock = new LockImpl();
+ assertEquals(LockState.UNAVAILABLE, lock.getState());
+ assertNull(lock.getResourceId());
+ assertNull(lock.getOwnerKey());
+ assertNull(lock.getCallback());
+ assertEquals(0, lock.getHoldSec());
+ }
+
+ @Test
+ public void testLockImpl_testGetters() {
+ assertEquals(STATE, lock.getState());
+ assertEquals(RESOURCE, lock.getResourceId());
+ assertEquals(OWNER_KEY, lock.getOwnerKey());
+ assertSame(callback, lock.getCallback());
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+
+ // test illegal args
+ assertThatThrownBy(() -> new LockImpl(null, RESOURCE, OWNER_KEY, HOLD_SEC, callback))
+ .hasMessageContaining("state");
+ assertThatThrownBy(() -> new LockImpl(STATE, null, OWNER_KEY, HOLD_SEC, callback))
+ .hasMessageContaining("resourceId");
+ assertThatThrownBy(() -> new LockImpl(STATE, RESOURCE, null, HOLD_SEC, callback))
+ .hasMessageContaining("ownerKey");
+ assertThatIllegalArgumentException().isThrownBy(() -> new LockImpl(STATE, RESOURCE, OWNER_KEY, -1, callback))
+ .withMessageContaining("holdSec");
+ assertThatThrownBy(() -> new LockImpl(STATE, RESOURCE, OWNER_KEY, HOLD_SEC, null))
+ .hasMessageContaining("callback");
+ }
+
+ @Test
+ public void testFree() {
+ assertTrue(lock.free());
+ assertTrue(lock.isUnavailable());
+
+ // should fail this time
+ assertFalse(lock.free());
+ assertTrue(lock.isUnavailable());
+
+ // no call-backs should have been invoked
+ verify(callback, never()).lockAvailable(any());
+ verify(callback, never()).lockUnavailable(any());
+ }
+
+ @Test
+ public void testExtend() {
+ lock.setState(LockState.WAITING);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+ assertTrue(lock.isActive());
+ assertEquals(HOLD_SEC2, lock.getHoldSec());
+ assertSame(callback2, lock.getCallback());
+ verify(callback2).lockAvailable(lock);
+ verify(callback2, never()).lockUnavailable(any());
+
+ // first call-back should never have been invoked
+ verify(callback, never()).lockAvailable(any());
+ verify(callback, never()).lockUnavailable(any());
+
+ // extend again
+ LockCallback callback3 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC, callback3);
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+ assertSame(callback3, lock.getCallback());
+ assertTrue(lock.isActive());
+ verify(callback3).lockAvailable(lock);
+ verify(callback3, never()).lockUnavailable(any());
+
+ // other call-backs should not have been invoked again
+ verify(callback, never()).lockAvailable(any());
+ verify(callback, never()).lockUnavailable(any());
+
+ verify(callback2).lockAvailable(any());
+ verify(callback2, never()).lockUnavailable(any());
+
+ assertTrue(lock.free());
+
+ // extend after free - should fail
+ lock.extend(HOLD_SEC2, callback);
+ assertTrue(lock.isUnavailable());
+
+ // call-backs should not have been invoked again
+ verify(callback, never()).lockAvailable(any());
+ verify(callback, never()).lockUnavailable(any());
+
+ verify(callback2).lockAvailable(any());
+ verify(callback2, never()).lockUnavailable(any());
+
+ verify(callback3).lockAvailable(lock);
+ verify(callback3, never()).lockUnavailable(any());
+ }
+
+ @Test
+ public void testNotifyAvailable() {
+ lock.notifyAvailable();
+
+ verify(callback).lockAvailable(any());
+ verify(callback, never()).lockUnavailable(any());
+ }
+
+ @Test
+ public void testNotifyAvailable_Ex() {
+ doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(callback).lockAvailable(any());
+ doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(callback).lockUnavailable(any());
+
+ // should not throw an exception
+ lock.notifyAvailable();
+ }
+
+ @Test
+ public void testNotifyUnavailable() {
+ lock.notifyUnavailable();
+
+ verify(callback, never()).lockAvailable(any());
+ verify(callback).lockUnavailable(any());
+ }
+
+ @Test
+ public void testNotifyUnavailable_Ex() {
+ doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(callback).lockAvailable(any());
+ doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(callback).lockUnavailable(any());
+
+ // should not throw an exception
+ lock.notifyUnavailable();
+ }
+
+ @Test
+ public void testSetState_testIsActive_testIsWaiting_testIsUnavailable() {
+ lock.setState(LockState.WAITING);
+ assertEquals(LockState.WAITING, lock.getState());
+ assertFalse(lock.isActive());
+ assertFalse(lock.isUnavailable());
+ assertTrue(lock.isWaiting());
+
+ lock.setState(LockState.ACTIVE);
+ assertEquals(LockState.ACTIVE, lock.getState());
+ assertTrue(lock.isActive());
+ assertFalse(lock.isUnavailable());
+ assertFalse(lock.isWaiting());
+
+ lock.setState(LockState.UNAVAILABLE);
+ assertEquals(LockState.UNAVAILABLE, lock.getState());
+ assertFalse(lock.isActive());
+ assertTrue(lock.isUnavailable());
+ assertFalse(lock.isWaiting());
+ }
+
+ @Test
+ public void testSetHoldSec() {
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+
+ lock.setHoldSec(HOLD_SEC2);
+ assertEquals(HOLD_SEC2, lock.getHoldSec());
+ }
+
+ @Test
+ public void testSetCallback() {
+ assertSame(callback, lock.getCallback());
+
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.setCallback(callback2);
+ assertSame(callback2, lock.getCallback());
+ }
+
+ @Test
+ public void testToString() {
+ String text = lock.toString();
+
+ assertNotNull(text);
+ assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback").doesNotContain("succeed");
+ }
+}
diff --git a/policy-core/src/test/java/org/onap/policy/drools/core/lock/LockTest.java b/policy-core/src/test/java/org/onap/policy/drools/core/lock/LockTest.java
deleted file mode 100644
index 5a88b02f..00000000
--- a/policy-core/src/test/java/org/onap/policy/drools/core/lock/LockTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.core.lock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.drools.core.lock.Lock.RemoveResult;
-import org.onap.policy.drools.utils.Pair;
-
-public class LockTest {
-
- private static final String OWNER = "my.owner";
- private static final String OWNER2 = "another.owner";
- private static final String OWNER3 = "third.owner";
-
- private static final Integer ITEM2 = 10;
- private static final Integer ITEM3 = 20;
-
- private Lock<Integer> lock;
- private Pair<String, Integer> newOwner;
-
- @Before
- public void setUp() {
- lock = new Lock<>(OWNER);
- newOwner = new Pair<>(null, null);
- }
-
-
- @Test
- public void testLock() {
- assertEquals(OWNER, lock.getOwner());
- }
-
- @Test
- public void testGetOwner() {
- assertEquals(OWNER, lock.getOwner());
- }
-
- @Test
- public void testAdd() {
- assertTrue(lock.add(OWNER2, ITEM2));
- assertTrue(lock.add(OWNER3, ITEM3));
-
- // attempt to re-add owner2 with the same item - should fail
- assertFalse(lock.add(OWNER2, ITEM2));
-
- // attempt to re-add owner2 with a different item - should fail
- assertFalse(lock.add(OWNER2, ITEM3));
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testAdd_ArgEx() {
- lock.add(OWNER2, null);
- }
-
- @Test
- public void testAdd_AlreadyOwner() {
- assertFalse(lock.add(OWNER, ITEM2));
- }
-
- @Test
- public void testAdd_AlreadyInQueue() {
- lock.add(OWNER2, ITEM2);
-
- assertFalse(lock.add(OWNER2, ITEM2));
- }
-
- @Test
- public void testRemoveRequester_Owner_QueueEmpty() {
- assertEquals(RemoveResult.UNLOCKED, lock.removeRequester(OWNER, newOwner));
- }
-
- @Test
- public void testRemoveRequester_Owner_QueueHasOneItem() {
- lock.add(OWNER2, ITEM2);
-
- assertEquals(RemoveResult.RELOCKED, lock.removeRequester(OWNER, newOwner));
- assertEquals(OWNER2, newOwner.first());
- assertEquals(ITEM2, newOwner.second());
-
- assertEquals(RemoveResult.UNLOCKED, lock.removeRequester(OWNER2, newOwner));
- }
-
- @Test
- public void testRemoveRequester_Owner_QueueHasMultipleItems() {
- lock.add(OWNER2, ITEM2);
- lock.add(OWNER3, ITEM3);
-
- assertEquals(RemoveResult.RELOCKED, lock.removeRequester(OWNER, newOwner));
- assertEquals(OWNER2, newOwner.first());
- assertEquals(ITEM2, newOwner.second());
-
- assertEquals(RemoveResult.RELOCKED, lock.removeRequester(OWNER2, newOwner));
- assertEquals(OWNER3, newOwner.first());
- assertEquals(ITEM3, newOwner.second());
-
- assertEquals(RemoveResult.UNLOCKED, lock.removeRequester(OWNER3, newOwner));
- }
-
- @Test
- public void testRemoveRequester_InQueue() {
- lock.add(OWNER2, ITEM2);
-
- assertEquals(RemoveResult.REMOVED, lock.removeRequester(OWNER2, newOwner));
- }
-
- @Test
- public void testRemoveRequester_NeitherOwnerNorInQueue() {
- assertEquals(RemoveResult.NOT_FOUND, lock.removeRequester(OWNER2, newOwner));
- }
-
-}
diff --git a/policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiTest.java b/policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiTest.java
deleted file mode 100644
index 999ae50f..00000000
--- a/policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * api-resource-locks
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.core.lock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi.OperResult;
-
-public class PolicyResourceLockFeatureApiTest {
-
- private static final String RESOURCE_ID = "the resource";
- private static final String OWNER = "the owner";
-
- private PolicyResourceLockFeatureApi api;
-
- /**
- * set up.
- */
- @Before
- public void setUp() {
- api = new PolicyResourceLockFeatureApi() {
- @Override
- public int getSequenceNumber() {
- return 0;
- }
- };
- }
-
- @Test
- public void testBeforeLock() {
- assertEquals(OperResult.OPER_UNHANDLED, api.beforeLock(RESOURCE_ID, OWNER, 0));
- }
-
- @Test
- public void testAfterLock() {
- assertFalse(api.afterLock(RESOURCE_ID, OWNER, true));
- assertFalse(api.afterLock(RESOURCE_ID, OWNER, false));
- }
-
- @Test
- public void testBeforeRefresh() {
- assertEquals(OperResult.OPER_UNHANDLED, api.beforeRefresh(RESOURCE_ID, OWNER, 0));
- }
-
- @Test
- public void testAfterRefresh() {
- assertFalse(api.afterRefresh(RESOURCE_ID, OWNER, true));
- assertFalse(api.afterRefresh(RESOURCE_ID, OWNER, false));
- }
-
- @Test
- public void testBeforeUnlock() {
- assertEquals(OperResult.OPER_UNHANDLED, api.beforeUnlock(RESOURCE_ID, OWNER));
- }
-
- @Test
- public void testAfterUnlock() {
- assertFalse(api.afterUnlock(RESOURCE_ID, OWNER, true));
- assertFalse(api.afterUnlock(RESOURCE_ID, OWNER, false));
- }
-
- @Test
- public void testBeforeIsLocked() {
- assertEquals(OperResult.OPER_UNHANDLED, api.beforeIsLocked(RESOURCE_ID));
- }
-
- @Test
- public void testBeforeIsLockedBy() {
- assertEquals(OperResult.OPER_UNHANDLED, api.beforeIsLockedBy(RESOURCE_ID, OWNER));
- }
-}
diff --git a/policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockManagerTest.java b/policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockManagerTest.java
deleted file mode 100644
index f575ce49..00000000
--- a/policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockManagerTest.java
+++ /dev/null
@@ -1,595 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.core.lock;
-
-import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi.OperResult;
-
-public class PolicyResourceLockManagerTest {
-
- private static final int MAX_AGE_SEC = 4 * 60;
-
- private static final String NULL_RESOURCE_ID = "null resourceId";
- private static final String NULL_OWNER = "null owner";
-
- private static final String RESOURCE_A = "resource.a";
- private static final String RESOURCE_B = "resource.b";
- private static final String RESOURCE_C = "resource.c";
-
- private static final String OWNER1 = "owner.one";
- private static final String OWNER2 = "owner.two";
- private static final String OWNER3 = "owner.three";
-
- private PolicyResourceLockFeatureApi impl1;
- private PolicyResourceLockFeatureApi impl2;
- private List<PolicyResourceLockFeatureApi> implList;
-
- private PolicyResourceLockManager mgr;
-
- /**
- * Set up.
- */
- @Before
- public void setUp() {
- impl1 = mock(PolicyResourceLockFeatureApi.class);
- impl2 = mock(PolicyResourceLockFeatureApi.class);
-
- initImplementer(impl1);
- initImplementer(impl2);
-
- // list of feature API implementers
- implList = new LinkedList<>(Arrays.asList(impl1, impl2));
-
- mgr = new PolicyResourceLockManager() {
- @Override
- protected List<PolicyResourceLockFeatureApi> getImplementers() {
- return implList;
- }
- };
- }
-
- /**
- * Initializes an implementer so it always returns {@code null}.
- *
- * @param impl implementer
- */
- private void initImplementer(PolicyResourceLockFeatureApi impl) {
- when(impl.beforeLock(anyString(), anyString(), anyInt())).thenReturn(OperResult.OPER_UNHANDLED);
- when(impl.beforeRefresh(anyString(), anyString(), anyInt())).thenReturn(OperResult.OPER_UNHANDLED);
- when(impl.beforeUnlock(anyString(), anyString())).thenReturn(OperResult.OPER_UNHANDLED);
- when(impl.beforeIsLocked(anyString())).thenReturn(OperResult.OPER_UNHANDLED);
- when(impl.beforeIsLockedBy(anyString(), anyString())).thenReturn(OperResult.OPER_UNHANDLED);
- }
-
- @Test
- public void testLock() throws Exception {
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- verify(impl1).beforeLock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- verify(impl2).beforeLock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- verify(impl1).afterLock(RESOURCE_A, OWNER1, true);
- verify(impl2).afterLock(RESOURCE_A, OWNER1, true);
-
- assertTrue(mgr.isLocked(RESOURCE_A));
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
- assertFalse(mgr.isLocked(RESOURCE_B));
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2));
-
- // null callback - not locked yet
- assertTrue(mgr.lock(RESOURCE_C, OWNER3, MAX_AGE_SEC));
-
- // null callback - already locked
- assertFalse(mgr.lock(RESOURCE_A, OWNER3, MAX_AGE_SEC));
- }
-
- @Test
- public void testLock_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.lock(null, OWNER1, MAX_AGE_SEC))
- .withMessage(NULL_RESOURCE_ID);
-
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.lock(RESOURCE_A, null, MAX_AGE_SEC))
- .withMessage(NULL_OWNER);
-
- // this should not throw an exception
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- }
-
- @Test
- public void testLock_Acquired_BeforeIntercepted() {
- // have impl1 intercept
- when(impl1.beforeLock(RESOURCE_A, OWNER1, MAX_AGE_SEC)).thenReturn(OperResult.OPER_ACCEPTED);
-
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- verify(impl1).beforeLock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- verify(impl2, never()).beforeLock(anyString(), anyString(), anyInt());
-
- verify(impl1, never()).afterLock(anyString(), anyString(), anyBoolean());
- verify(impl2, never()).afterLock(anyString(), anyString(), anyBoolean());
- }
-
- @Test
- public void testLock_Denied_BeforeIntercepted() {
- // have impl1 intercept
- when(impl1.beforeLock(RESOURCE_A, OWNER1, MAX_AGE_SEC)).thenReturn(OperResult.OPER_DENIED);
-
- assertFalse(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- verify(impl1).beforeLock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- verify(impl2, never()).beforeLock(anyString(), anyString(), anyInt());
-
- verify(impl1, never()).afterLock(anyString(), anyString(), anyBoolean());
- verify(impl2, never()).afterLock(anyString(), anyString(), anyBoolean());
- }
-
- @Test
- public void testLock_Acquired_AfterIntercepted() throws Exception {
-
- // impl1 intercepts during afterLock()
- when(impl1.afterLock(RESOURCE_A, OWNER1, true)).thenReturn(true);
-
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- // impl1 sees it, but impl2 does not
- verify(impl1).afterLock(RESOURCE_A, OWNER1, true);
- verify(impl2, never()).afterLock(anyString(), anyString(), anyBoolean());
- }
-
- @Test
- public void testLock_Acquired() throws Exception {
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- verify(impl1).afterLock(RESOURCE_A, OWNER1, true);
- verify(impl2).afterLock(RESOURCE_A, OWNER1, true);
- }
-
- @Test
- public void testLock_Denied_AfterIntercepted() throws Exception {
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // impl1 intercepts during afterLock()
- when(impl1.afterLock(RESOURCE_A, OWNER2, false)).thenReturn(true);
-
- // owner2 tries to lock
- assertFalse(mgr.lock(RESOURCE_A, OWNER2, MAX_AGE_SEC));
-
- // impl1 sees it, but impl2 does not
- verify(impl1).afterLock(RESOURCE_A, OWNER2, false);
- verify(impl2, never()).afterLock(RESOURCE_A, OWNER2, false);
- }
-
- @Test
- public void testLock_Denied() {
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // owner2 tries to lock
- mgr.lock(RESOURCE_A, OWNER2, MAX_AGE_SEC);
-
- verify(impl1).afterLock(RESOURCE_A, OWNER2, false);
- verify(impl2).afterLock(RESOURCE_A, OWNER2, false);
- }
-
- @Test
- public void testRefresh() throws Exception {
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
- assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- verify(impl1).beforeRefresh(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- verify(impl2).beforeRefresh(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- verify(impl1).afterRefresh(RESOURCE_A, OWNER1, true);
- verify(impl2).afterRefresh(RESOURCE_A, OWNER1, true);
-
- assertTrue(mgr.isLocked(RESOURCE_A));
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
- assertFalse(mgr.isLocked(RESOURCE_B));
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2));
-
- // different owner and resource
- assertFalse(mgr.refresh(RESOURCE_C, OWNER3, MAX_AGE_SEC));
-
- // different owner
- assertFalse(mgr.refresh(RESOURCE_A, OWNER3, MAX_AGE_SEC));
- }
-
- @Test
- public void testRefresh_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.refresh(null, OWNER1, MAX_AGE_SEC))
- .withMessage(NULL_RESOURCE_ID);
-
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.refresh(RESOURCE_A, null, MAX_AGE_SEC))
- .withMessage(NULL_OWNER);
-
- // this should not throw an exception
- mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- }
-
- @Test
- public void testRefresh_Acquired_BeforeIntercepted() {
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- // have impl1 intercept
- when(impl1.beforeRefresh(RESOURCE_A, OWNER1, MAX_AGE_SEC)).thenReturn(OperResult.OPER_ACCEPTED);
-
- assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- verify(impl1).beforeRefresh(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- verify(impl2, never()).beforeRefresh(anyString(), anyString(), anyInt());
-
- verify(impl1, never()).afterRefresh(anyString(), anyString(), anyBoolean());
- verify(impl2, never()).afterRefresh(anyString(), anyString(), anyBoolean());
- }
-
- @Test
- public void testRefresh_Denied_BeforeIntercepted() {
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- // have impl1 intercept
- when(impl1.beforeRefresh(RESOURCE_A, OWNER1, MAX_AGE_SEC)).thenReturn(OperResult.OPER_DENIED);
-
- assertFalse(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- verify(impl1).beforeRefresh(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- verify(impl2, never()).beforeRefresh(anyString(), anyString(), anyInt());
-
- verify(impl1, never()).afterRefresh(anyString(), anyString(), anyBoolean());
- verify(impl2, never()).afterRefresh(anyString(), anyString(), anyBoolean());
- }
-
- @Test
- public void testRefresh_Acquired_AfterIntercepted() throws Exception {
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- // impl1 intercepts during afterRefresh()
- when(impl1.afterRefresh(RESOURCE_A, OWNER1, true)).thenReturn(true);
-
- assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- // impl1 sees it, but impl2 does not
- verify(impl1).afterRefresh(RESOURCE_A, OWNER1, true);
- verify(impl2, never()).afterRefresh(anyString(), anyString(), anyBoolean());
- }
-
- @Test
- public void testRefresh_Acquired() throws Exception {
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- verify(impl1).afterRefresh(RESOURCE_A, OWNER1, true);
- verify(impl2).afterRefresh(RESOURCE_A, OWNER1, true);
- }
-
- @Test
- public void testRefresh_Denied_AfterIntercepted() throws Exception {
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // impl1 intercepts during afterRefresh()
- when(impl1.afterRefresh(RESOURCE_A, OWNER2, false)).thenReturn(true);
-
- // owner2 tries to lock
- assertFalse(mgr.refresh(RESOURCE_A, OWNER2, MAX_AGE_SEC));
-
- // impl1 sees it, but impl2 does not
- verify(impl1).afterRefresh(RESOURCE_A, OWNER2, false);
- verify(impl2, never()).afterRefresh(RESOURCE_A, OWNER2, false);
- }
-
- @Test
- public void testRefresh_Denied() {
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // owner2 tries to lock
- mgr.refresh(RESOURCE_A, OWNER2, MAX_AGE_SEC);
-
- verify(impl1).afterRefresh(RESOURCE_A, OWNER2, false);
- verify(impl2).afterRefresh(RESOURCE_A, OWNER2, false);
- }
-
- @Test
- public void testUnlock() throws Exception {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- mgr.lock(RESOURCE_B, OWNER1, MAX_AGE_SEC);
-
- assertTrue(mgr.unlock(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeUnlock(RESOURCE_A, OWNER1);
- verify(impl2).beforeUnlock(RESOURCE_A, OWNER1);
-
- verify(impl1).afterUnlock(RESOURCE_A, OWNER1, true);
- verify(impl2).afterUnlock(RESOURCE_A, OWNER1, true);
- }
-
- @Test
- public void testUnlock_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.unlock(null, OWNER1)).withMessage(NULL_RESOURCE_ID);
-
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.unlock(RESOURCE_A, null)).withMessage(NULL_OWNER);
- }
-
- @Test
- public void testUnlock_BeforeInterceptedTrue() {
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // have impl1 intercept
- when(impl1.beforeUnlock(RESOURCE_A, OWNER1)).thenReturn(OperResult.OPER_ACCEPTED);
-
- assertTrue(mgr.unlock(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeUnlock(RESOURCE_A, OWNER1);
- verify(impl2, never()).beforeUnlock(anyString(), anyString());
-
- verify(impl1, never()).afterUnlock(anyString(), anyString(), anyBoolean());
- verify(impl2, never()).afterUnlock(anyString(), anyString(), anyBoolean());
- }
-
- @Test
- public void testUnlock_BeforeInterceptedFalse() {
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // have impl1 intercept
- when(impl1.beforeUnlock(RESOURCE_A, OWNER1)).thenReturn(OperResult.OPER_DENIED);
-
- assertFalse(mgr.unlock(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeUnlock(RESOURCE_A, OWNER1);
- verify(impl2, never()).beforeUnlock(anyString(), anyString());
-
- verify(impl1, never()).afterUnlock(anyString(), anyString(), anyBoolean());
- verify(impl2, never()).afterUnlock(anyString(), anyString(), anyBoolean());
- }
-
- @Test
- public void testUnlock_Unlocked() {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- assertTrue(mgr.unlock(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeUnlock(RESOURCE_A, OWNER1);
- verify(impl2).beforeUnlock(RESOURCE_A, OWNER1);
-
- verify(impl1).afterUnlock(RESOURCE_A, OWNER1, true);
- verify(impl2).afterUnlock(RESOURCE_A, OWNER1, true);
- }
-
- @Test
- public void testUnlock_Unlocked_AfterIntercepted() {
- // have impl1 intercept
- when(impl1.afterUnlock(RESOURCE_A, OWNER1, true)).thenReturn(true);
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- assertTrue(mgr.unlock(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeUnlock(RESOURCE_A, OWNER1);
- verify(impl2).beforeUnlock(RESOURCE_A, OWNER1);
-
- verify(impl1).afterUnlock(RESOURCE_A, OWNER1, true);
- verify(impl2, never()).afterUnlock(RESOURCE_A, OWNER1, true);
- }
-
- @Test
- public void testUnlock_NotUnlocked() {
- assertFalse(mgr.unlock(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeUnlock(RESOURCE_A, OWNER1);
- verify(impl2).beforeUnlock(RESOURCE_A, OWNER1);
-
- verify(impl1).afterUnlock(RESOURCE_A, OWNER1, false);
- verify(impl2).afterUnlock(RESOURCE_A, OWNER1, false);
- }
-
- @Test
- public void testUnlock_NotUnlocked_AfterIntercepted() {
- // have impl1 intercept
- when(impl1.afterUnlock(RESOURCE_A, OWNER1, false)).thenReturn(true);
-
- assertFalse(mgr.unlock(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeUnlock(RESOURCE_A, OWNER1);
- verify(impl2).beforeUnlock(RESOURCE_A, OWNER1);
-
- verify(impl1).afterUnlock(RESOURCE_A, OWNER1, false);
- verify(impl2, never()).afterUnlock(RESOURCE_A, OWNER1, false);
- }
-
- @Test
- public void testIsLocked_True() {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- assertTrue(mgr.isLocked(RESOURCE_A));
-
- verify(impl1).beforeIsLocked(RESOURCE_A);
- verify(impl2).beforeIsLocked(RESOURCE_A);
- }
-
- @Test
- public void testIsLocked_False() {
- assertFalse(mgr.isLocked(RESOURCE_A));
-
- verify(impl1).beforeIsLocked(RESOURCE_A);
- verify(impl2).beforeIsLocked(RESOURCE_A);
- }
-
- @Test
- public void testIsLocked_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.isLocked(null)).withMessage(NULL_RESOURCE_ID);
- }
-
- @Test
- public void testIsLocked_BeforeIntercepted_True() {
-
- // have impl1 intercept
- when(impl1.beforeIsLocked(RESOURCE_A)).thenReturn(OperResult.OPER_ACCEPTED);;
-
- assertTrue(mgr.isLocked(RESOURCE_A));
-
- verify(impl1).beforeIsLocked(RESOURCE_A);
- verify(impl2, never()).beforeIsLocked(RESOURCE_A);
- }
-
- @Test
- public void testIsLocked_BeforeIntercepted_False() {
-
- // lock it so we can verify that impl1 overrides the superclass isLocker()
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // have impl1 intercept
- when(impl1.beforeIsLocked(RESOURCE_A)).thenReturn(OperResult.OPER_DENIED);
-
- assertFalse(mgr.isLocked(RESOURCE_A));
-
- verify(impl1).beforeIsLocked(RESOURCE_A);
- verify(impl2, never()).beforeIsLocked(RESOURCE_A);
- }
-
- @Test
- public void testIsLockedBy_True() {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeIsLockedBy(RESOURCE_A, OWNER1);
- verify(impl2).beforeIsLockedBy(RESOURCE_A, OWNER1);
- }
-
- @Test
- public void testIsLockedBy_False() {
- // different owner
- mgr.lock(RESOURCE_A, OWNER2, MAX_AGE_SEC);
-
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeIsLockedBy(RESOURCE_A, OWNER1);
- verify(impl2).beforeIsLockedBy(RESOURCE_A, OWNER1);
- }
-
- @Test
- public void testIsLockedBy_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.isLockedBy(null, OWNER1))
- .withMessage(NULL_RESOURCE_ID);
-
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.isLockedBy(RESOURCE_A, null)).withMessage(NULL_OWNER);
- }
-
- @Test
- public void testIsLockedBy_BeforeIntercepted_True() {
-
- // have impl1 intercept
- when(impl1.beforeIsLockedBy(RESOURCE_A, OWNER1)).thenReturn(OperResult.OPER_ACCEPTED);;
-
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeIsLockedBy(RESOURCE_A, OWNER1);
- verify(impl2, never()).beforeIsLockedBy(RESOURCE_A, OWNER1);
- }
-
- @Test
- public void testIsLockedBy_BeforeIntercepted_False() {
-
- // lock it so we can verify that impl1 overrides the superclass isLocker()
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // have impl1 intercept
- when(impl1.beforeIsLockedBy(RESOURCE_A, OWNER1)).thenReturn(OperResult.OPER_DENIED);
-
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeIsLockedBy(RESOURCE_A, OWNER1);
- verify(impl2, never()).beforeIsLockedBy(RESOURCE_A, OWNER1);
- }
-
- @Test
- public void testGetInstance() {
- PolicyResourceLockManager inst = PolicyResourceLockManager.getInstance();
- assertNotNull(inst);
-
- // should return the same instance each time
- assertEquals(inst, PolicyResourceLockManager.getInstance());
- assertEquals(inst, PolicyResourceLockManager.getInstance());
- }
-
- @Test
- public void testDoIntercept_Empty() {
- // clear the implementer list
- implList.clear();
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- assertTrue(mgr.isLocked(RESOURCE_A));
- assertFalse(mgr.isLocked(RESOURCE_B));
-
- verify(impl1, never()).beforeIsLocked(anyString());
- }
-
- @Test
- public void testDoIntercept_Impl1() {
- when(impl1.beforeIsLocked(RESOURCE_A)).thenReturn(OperResult.OPER_ACCEPTED);;
-
- assertTrue(mgr.isLocked(RESOURCE_A));
-
- verify(impl1).beforeIsLocked(RESOURCE_A);
- verify(impl2, never()).beforeIsLocked(anyString());
- }
-
- @Test
- public void testDoIntercept_Impl2() {
- when(impl2.beforeIsLocked(RESOURCE_A)).thenReturn(OperResult.OPER_ACCEPTED);;
-
- assertTrue(mgr.isLocked(RESOURCE_A));
-
- verify(impl1).beforeIsLocked(RESOURCE_A);
- verify(impl2).beforeIsLocked(RESOURCE_A);
- }
-
- @Test
- public void testDoIntercept_Ex() {
- doThrow(new RuntimeException("expected exception")).when(impl1).beforeIsLocked(RESOURCE_A);
-
- assertFalse(mgr.isLocked(RESOURCE_A));
-
- verify(impl1).beforeIsLocked(RESOURCE_A);
- verify(impl2).beforeIsLocked(RESOURCE_A);
- }
-}
diff --git a/policy-core/src/test/java/org/onap/policy/drools/core/lock/SimpleLockManagerTest.java b/policy-core/src/test/java/org/onap/policy/drools/core/lock/SimpleLockManagerTest.java
deleted file mode 100644
index 51cf68fc..00000000
--- a/policy-core/src/test/java/org/onap/policy/drools/core/lock/SimpleLockManagerTest.java
+++ /dev/null
@@ -1,527 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.core.lock;
-
-import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.LinkedList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.onap.policy.common.utils.time.CurrentTime;
-import org.onap.policy.common.utils.time.TestTime;
-import org.onap.policy.drools.core.lock.SimpleLockManager.Data;
-import org.powermock.reflect.Whitebox;
-
-public class SimpleLockManagerTest {
-
- // Note: this must be a multiple of four
- private static final int MAX_AGE_SEC = 4 * 60;
- private static final int MAX_AGE_MS = MAX_AGE_SEC * 1000;
-
- private static final String EXPECTED_EXCEPTION = "expected exception";
-
- private static final String NULL_RESOURCE_ID = "null resourceId";
- private static final String NULL_OWNER = "null owner";
-
- private static final String RESOURCE_A = "resource.a";
- private static final String RESOURCE_B = "resource.b";
- private static final String RESOURCE_C = "resource.c";
- private static final String RESOURCE_D = "resource.d";
-
- private static final String OWNER1 = "owner.one";
- private static final String OWNER2 = "owner.two";
- private static final String OWNER3 = "owner.three";
-
- /**
- * Name of the "current time" field within the {@link SimpleLockManager} class.
- */
- private static final String TIME_FIELD = "currentTime";
-
- private static CurrentTime savedTime;
-
- private TestTime testTime;
- private SimpleLockManager mgr;
-
- @BeforeClass
- public static void setUpBeforeClass() {
- savedTime = Whitebox.getInternalState(SimpleLockManager.class, TIME_FIELD);
- }
-
- @AfterClass
- public static void tearDownAfterClass() {
- Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, savedTime);
- }
-
- /**
- * Set up.
- */
- @Before
- public void setUp() {
- testTime = new TestTime();
- Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, testTime);
-
- mgr = new SimpleLockManager();
- }
-
- @Test
- public void testCurrentTime() {
- assertNotNull(savedTime);
- }
-
- @Test
- public void testLock() throws Exception {
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- assertTrue(mgr.isLocked(RESOURCE_A));
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
- assertFalse(mgr.isLocked(RESOURCE_B));
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2));
-
- // different owner and resource - should succeed
- assertTrue(mgr.lock(RESOURCE_C, OWNER3, MAX_AGE_SEC));
-
- // different owner - already locked
- assertFalse(mgr.lock(RESOURCE_A, OWNER3, MAX_AGE_SEC));
- }
-
- @Test
- public void testLock_ExtendLock() throws Exception {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // sleep half of the cycle
- testTime.sleep(MAX_AGE_MS / 2);
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
-
- // extend the lock
- mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // verify still locked after sleeping the other half of the cycle
- testTime.sleep(MAX_AGE_MS / 2 + 1);
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
-
- // and should release after another half cycle
- testTime.sleep(MAX_AGE_MS / 2);
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER1));
- }
-
- @Test
- public void testLock_AlreadyLocked() throws Exception {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // same owner
- assertFalse(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- // different owner
- assertFalse(mgr.lock(RESOURCE_A, OWNER2, MAX_AGE_SEC));
- }
-
- @Test
- public void testLock_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.lock(null, OWNER1, MAX_AGE_SEC))
- .withMessage(NULL_RESOURCE_ID);
-
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.lock(RESOURCE_A, null, MAX_AGE_SEC))
- .withMessage(NULL_OWNER);
-
- // this should not throw an exception
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- }
-
- @Test
- public void testRefresh() throws Exception {
- // don't own the lock yet - cannot refresh
- assertFalse(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- // now the lock is owned
- assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- // refresh again
- assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC + 1));
-
- assertTrue(mgr.isLocked(RESOURCE_A));
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
- assertFalse(mgr.isLocked(RESOURCE_B));
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2));
-
- // different owner
- assertFalse(mgr.refresh(RESOURCE_A, OWNER3, MAX_AGE_SEC));
-
- // different resource
- assertFalse(mgr.refresh(RESOURCE_C, OWNER1, MAX_AGE_SEC));
- }
-
- @Test
- public void testRefresh_ExtendLock() throws Exception {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // sleep half of the cycle
- testTime.sleep(MAX_AGE_MS / 2);
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
-
- // extend the lock
- mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // verify still locked after sleeping the other half of the cycle
- testTime.sleep(MAX_AGE_MS / 2 + 1);
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
-
- // and should release after another half cycle
- testTime.sleep(MAX_AGE_MS / 2);
-
- // cannot refresh expired lock
- assertFalse(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER1));
- }
-
- @Test
- public void testRefresh_AlreadyLocked() throws Exception {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // same owner
- assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- // different owner
- assertFalse(mgr.refresh(RESOURCE_A, OWNER2, MAX_AGE_SEC));
- assertFalse(mgr.lock(RESOURCE_A, OWNER2, MAX_AGE_SEC));
- }
-
- @Test
- public void testRefresh_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.refresh(null, OWNER1, MAX_AGE_SEC))
- .withMessage(NULL_RESOURCE_ID);
-
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.refresh(RESOURCE_A, null, MAX_AGE_SEC))
- .withMessage(NULL_OWNER);
-
- // this should not throw an exception
- mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- }
-
- @Test
- public void testUnlock() throws Exception {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // unlock it
- assertTrue(mgr.unlock(RESOURCE_A, OWNER1));
- }
-
- @Test
- public void testUnlock_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.unlock(null, OWNER1)).withMessage(NULL_RESOURCE_ID);
-
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.unlock(RESOURCE_A, null)).withMessage(NULL_OWNER);
- }
-
- @Test
- public void testUnlock_NotLocked() {
- assertFalse(mgr.unlock(RESOURCE_A, OWNER1));
- }
-
- @Test
- public void testUnlock_DiffOwner() {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- assertFalse(mgr.unlock(RESOURCE_A, OWNER2));
- }
-
- @Test
- public void testIsLocked() {
- assertFalse(mgr.isLocked(RESOURCE_A));
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- mgr.lock(RESOURCE_B, OWNER1, MAX_AGE_SEC);
-
- assertTrue(mgr.isLocked(RESOURCE_A));
- assertTrue(mgr.isLocked(RESOURCE_B));
- assertFalse(mgr.isLocked(RESOURCE_C));
-
- // unlock from first resource
- mgr.unlock(RESOURCE_A, OWNER1);
- assertFalse(mgr.isLocked(RESOURCE_A));
- assertTrue(mgr.isLocked(RESOURCE_B));
- assertFalse(mgr.isLocked(RESOURCE_C));
-
- // unlock from second resource
- mgr.unlock(RESOURCE_B, OWNER1);
- assertFalse(mgr.isLocked(RESOURCE_A));
- assertFalse(mgr.isLocked(RESOURCE_B));
- assertFalse(mgr.isLocked(RESOURCE_C));
- }
-
- @Test
- public void testIsLocked_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.isLocked(null)).withMessage(NULL_RESOURCE_ID);
- }
-
- @Test
- public void testIsLockedBy() {
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER1));
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- assertFalse(mgr.isLockedBy(RESOURCE_B, OWNER1));
-
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2));
-
- // unlock from the resource
- mgr.unlock(RESOURCE_A, OWNER1);
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER1));
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2));
- assertFalse(mgr.isLockedBy(RESOURCE_B, OWNER1));
- }
-
- @Test
- public void testIsLockedBy_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.isLockedBy(null, OWNER1))
- .withMessage(NULL_RESOURCE_ID);
-
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.isLockedBy(RESOURCE_A, null)).withMessage(NULL_OWNER);
- }
-
- @Test
- public void testIsLockedBy_NotLocked() {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // different resource, thus no lock
- assertFalse(mgr.isLockedBy(RESOURCE_B, OWNER1));
- }
-
- @Test
- public void testIsLockedBy_LockedButNotOwner() {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // different owner
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2));
- }
-
- @Test
- public void testCleanUpLocks() throws Exception {
- // note: this assumes that MAX_AGE_MS is divisible by 4
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- assertTrue(mgr.isLocked(RESOURCE_A));
-
- testTime.sleep(10);
- mgr.lock(RESOURCE_B, OWNER1, MAX_AGE_SEC);
- assertTrue(mgr.isLocked(RESOURCE_A));
- assertTrue(mgr.isLocked(RESOURCE_B));
-
- testTime.sleep(MAX_AGE_MS / 4);
- mgr.lock(RESOURCE_C, OWNER1, MAX_AGE_SEC);
- assertTrue(mgr.isLocked(RESOURCE_A));
- assertTrue(mgr.isLocked(RESOURCE_B));
- assertTrue(mgr.isLocked(RESOURCE_C));
-
- testTime.sleep(MAX_AGE_MS / 4);
- mgr.lock(RESOURCE_D, OWNER1, MAX_AGE_SEC);
- assertTrue(mgr.isLocked(RESOURCE_A));
- assertTrue(mgr.isLocked(RESOURCE_B));
- assertTrue(mgr.isLocked(RESOURCE_C));
- assertTrue(mgr.isLocked(RESOURCE_D));
-
- // sleep remainder of max age - first two should expire
- testTime.sleep(MAX_AGE_MS / 2);
- assertFalse(mgr.isLocked(RESOURCE_A));
- assertFalse(mgr.isLocked(RESOURCE_B));
- assertTrue(mgr.isLocked(RESOURCE_C));
- assertTrue(mgr.isLocked(RESOURCE_D));
-
- // another quarter - next one should expire
- testTime.sleep(MAX_AGE_MS / 4);
- assertFalse(mgr.isLocked(RESOURCE_C));
- assertTrue(mgr.isLocked(RESOURCE_D));
-
- // another quarter - last one should expire
- testTime.sleep(MAX_AGE_MS / 4);
- assertFalse(mgr.isLocked(RESOURCE_D));
- }
-
- @Test
- public void testMakeNullArgException() {
- IllegalArgumentException ex = SimpleLockManager.makeNullArgException(EXPECTED_EXCEPTION);
- assertEquals(EXPECTED_EXCEPTION, ex.getMessage());
- }
-
- @Test
- public void testDataGetXxx() {
- long ttime = System.currentTimeMillis() + 5;
- Data data = new Data(OWNER1, RESOURCE_A, ttime);
-
- assertEquals(OWNER1, data.getOwner());
- assertEquals(RESOURCE_A, data.getResource());
- assertEquals(ttime, data.getExpirationMs());
- }
-
- @Test
- public void testDataCompareTo() {
- long ttime = System.currentTimeMillis() + 50;
- Data data = new Data(OWNER1, RESOURCE_A, ttime);
- Data dataSame = new Data(OWNER1, RESOURCE_A, ttime);
- Data dataDiffExpire = new Data(OWNER1, RESOURCE_A, ttime + 1);
-
- assertEquals(0, data.compareTo(data));
- assertEquals(0, data.compareTo(dataSame));
-
- assertTrue(data.compareTo(dataDiffExpire) < 0);
- assertTrue(dataDiffExpire.compareTo(data) > 0);
-
- Data dataDiffOwner = new Data(OWNER2, RESOURCE_A, ttime);
- Data dataDiffResource = new Data(OWNER1, RESOURCE_B, ttime);
-
- assertTrue(data.compareTo(dataDiffOwner) < 0);
- assertTrue(dataDiffOwner.compareTo(data) > 0);
-
- assertTrue(data.compareTo(dataDiffResource) < 0);
- assertTrue(dataDiffResource.compareTo(data) > 0);
- }
-
- @Test
- public void testDataHashCode() {
- long ttime = System.currentTimeMillis() + 1;
- Data data = new Data(OWNER1, RESOURCE_A, ttime);
- Data dataSame = new Data(OWNER1, RESOURCE_A, ttime);
- Data dataDiffExpire = new Data(OWNER1, RESOURCE_A, ttime + 1);
- Data dataDiffOwner = new Data(OWNER2, RESOURCE_A, ttime);
-
- int hc1 = data.hashCode();
- assertEquals(hc1, dataSame.hashCode());
-
- assertTrue(hc1 != dataDiffExpire.hashCode());
- assertTrue(hc1 != dataDiffOwner.hashCode());
-
- Data dataDiffResource = new Data(OWNER1, RESOURCE_B, ttime);
- Data dataNullOwner = new Data(null, RESOURCE_A, ttime);
- Data dataNullResource = new Data(OWNER1, null, ttime);
-
- assertTrue(hc1 != dataDiffResource.hashCode());
- assertTrue(hc1 != dataNullOwner.hashCode());
- assertTrue(hc1 != dataNullResource.hashCode());
- }
-
- @Test
- public void testDataEquals() {
- long ttime = System.currentTimeMillis() + 50;
- Data data = new Data(OWNER1, RESOURCE_A, ttime);
- Data dataSame = new Data(OWNER1, RESOURCE_A, ttime);
- Data dataDiffExpire = new Data(OWNER1, RESOURCE_A, ttime + 1);
-
- assertTrue(data.equals(data));
- assertTrue(data.equals(dataSame));
-
- Data dataDiffOwner = new Data(OWNER2, RESOURCE_A, ttime);
- Data dataDiffResource = new Data(OWNER1, RESOURCE_B, ttime);
-
- assertFalse(data.equals(dataDiffExpire));
- assertFalse(data.equals(dataDiffOwner));
- assertFalse(data.equals(dataDiffResource));
-
- assertFalse(data.equals(null));
- assertFalse(data.equals("string"));
-
- Data dataNullOwner = new Data(null, RESOURCE_A, ttime);
- Data dataNullResource = new Data(OWNER1, null, ttime);
-
- assertFalse(dataNullOwner.equals(data));
- assertFalse(dataNullResource.equals(data));
-
- assertTrue(dataNullOwner.equals(new Data(null, RESOURCE_A, ttime)));
- assertTrue(dataNullResource.equals(new Data(OWNER1, null, ttime)));
- }
-
- @Test
- public void testMultiThreaded() throws InterruptedException {
- int nthreads = 10;
- int nlocks = 100;
-
- LinkedList<Thread> threads = new LinkedList<>();
-
- String[] resources = {RESOURCE_A, RESOURCE_B};
-
- final AtomicInteger nfail = new AtomicInteger(0);
-
- CountDownLatch stopper = new CountDownLatch(1);
- CountDownLatch completed = new CountDownLatch(nthreads);
-
- for (int x = 0; x < nthreads; ++x) {
- String owner = "owner." + x;
-
- Thread thread = new Thread(() -> {
-
- for (int y = 0; y < nlocks; ++y) {
- String res = resources[y % resources.length];
-
- try {
- // some locks will be acquired, some denied
- mgr.lock(res, owner, MAX_AGE_SEC);
-
- // do some "work"
- stopper.await(1L, TimeUnit.MILLISECONDS);
-
- mgr.unlock(res, owner);
-
- } catch (InterruptedException expected) {
- Thread.currentThread().interrupt();
- break;
- }
- }
-
- completed.countDown();
- });
-
- thread.setDaemon(true);
- threads.add(thread);
- }
-
- // start the threads
- for (Thread t : threads) {
- t.start();
- }
-
- // wait for them to complete
- completed.await(5000L, TimeUnit.SECONDS);
-
- // stop the threads from sleeping
- stopper.countDown();
-
- completed.await(1L, TimeUnit.SECONDS);
-
- // interrupt those that are still alive
- for (Thread t : threads) {
- if (t.isAlive()) {
- t.interrupt();
- }
- }
-
- assertEquals(0, nfail.get());
- }
-
-}
diff --git a/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java b/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java
index fe31eb50..87001ad6 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java
@@ -23,6 +23,7 @@ package org.onap.policy.drools.features;
import java.util.Properties;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.utils.services.OrderedService;
+import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
import org.onap.policy.drools.protocol.configuration.PdpdConfiguration;
import org.onap.policy.drools.system.PolicyEngine;
@@ -271,4 +272,26 @@ public interface PolicyEngineFeatureApi extends OrderedService {
default boolean afterOpen(PolicyEngine engine) {
return false;
}
+
+ /**
+ * Called before the PolicyEngine creates a lock manager.
+ *
+ * @return a lock manager if this feature intercepts and takes ownership of the
+ * operation preventing the invocation of lower priority features. Null,
+ * otherwise
+ */
+ default PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) {
+ return null;
+ }
+
+ /**
+ * Called after the PolicyEngine creates a lock manager.
+ *
+ * @return True if this feature intercepts and takes ownership of the operation
+ * preventing the invocation of lower priority features. False, otherwise
+ */
+ default boolean afterCreateLockManager(PolicyEngine engine, Properties properties,
+ PolicyResourceLockManager lockManager) {
+ return false;
+ }
}
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java
index 653ff72e..cb0749d9 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java
@@ -22,6 +22,7 @@ package org.onap.policy.drools.system;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ScheduledExecutorService;
import org.onap.policy.common.capabilities.Lockable;
import org.onap.policy.common.capabilities.Startable;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
@@ -29,6 +30,8 @@ import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.endpoints.http.server.HttpServletServer;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
import org.onap.policy.drools.features.PolicyEngineFeatureApi;
import org.onap.policy.drools.protocol.configuration.ControllerConfiguration;
import org.onap.policy.drools.protocol.configuration.PdpdConfiguration;
@@ -198,6 +201,11 @@ public interface PolicyEngine extends Startable, Lockable, TopicListener {
List<HttpServletServer> getHttpServers();
/**
+ * Gets a thread pool that can be used to execute background tasks.
+ */
+ ScheduledExecutorService getExecutorService();
+
+ /**
* get properties configuration.
*
* @return properties objects
@@ -280,6 +288,31 @@ public interface PolicyEngine extends Startable, Lockable, TopicListener {
boolean deliver(CommInfrastructure busType, String topic, String event);
/**
+ * Requests a lock on a resource. Typically, the lock is not immediately granted,
+ * though a "lock" object is always returned. Once the lock has been granted (or
+ * denied), the callback will be invoked to indicate the result.
+ *
+ * <p/>
+ * Notes:
+ * <dl>
+ * <li>The callback may be invoked <i>before</i> this method returns</li>
+ * <li>The implementation need not honor waitForLock={@code true}</li>
+ * </dl>
+ *
+ * @param resourceId identifier of the resource to be locked
+ * @param ownerKey information identifying the owner requesting the lock
+ * @param holdSec amount of time, in seconds, for which the lock should be held once
+ * it has been granted, after which it will automatically be released
+ * @param callback callback to be invoked once the lock is granted, or subsequently
+ * lost; must not be {@code null}
+ * @param waitForLock {@code true} to wait for the lock, if it is currently locked,
+ * {@code false} otherwise
+ * @return a new lock
+ */
+ public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback,
+ boolean waitForLock);
+
+ /**
* Invoked when the host goes into the active state.
*/
void activate();
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
index 766848c6..c924fd69 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
@@ -24,7 +24,6 @@ import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERV
import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME;
import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_PORT;
-import com.att.aft.dme2.internal.apache.commons.lang3.StringUtils;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.gson.Gson;
@@ -32,11 +31,16 @@ import com.google.gson.GsonBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Stream;
+import lombok.AccessLevel;
import lombok.Getter;
+import lombok.NonNull;
+import org.apache.commons.lang.StringUtils;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
@@ -54,6 +58,9 @@ import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
import org.onap.policy.drools.core.PolicyContainer;
import org.onap.policy.drools.core.jmx.PdpJmxListener;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
import org.onap.policy.drools.features.PolicyControllerFeatureApi;
import org.onap.policy.drools.features.PolicyControllerFeatureApiConstants;
import org.onap.policy.drools.features.PolicyEngineFeatureApi;
@@ -67,6 +74,7 @@ import org.onap.policy.drools.protocol.configuration.ControllerConfiguration;
import org.onap.policy.drools.protocol.configuration.PdpdConfiguration;
import org.onap.policy.drools.server.restful.RestManager;
import org.onap.policy.drools.server.restful.aaf.AafTelemetryAuthFilter;
+import org.onap.policy.drools.system.internal.SimpleLockManager;
import org.onap.policy.drools.utils.PropertyUtil;
import org.onap.policy.drools.utils.logging.LoggerUtil;
import org.onap.policy.drools.utils.logging.MdcTransaction;
@@ -78,7 +86,6 @@ import org.slf4j.LoggerFactory;
* Policy Engine Manager Implementation.
*/
class PolicyEngineManager implements PolicyEngine {
-
/**
* String literals.
*/
@@ -88,6 +95,9 @@ class PolicyEngineManager implements PolicyEngine {
private static final String ENGINE_STOPPED_MSG = "Policy Engine is stopped";
private static final String ENGINE_LOCKED_MSG = "Policy Engine is locked";
+ public static final String EXECUTOR_THREAD_PROP = "executor.threads";
+ protected static final int DEFAULT_EXECUTOR_THREADS = 5;
+
/**
* logger.
*/
@@ -134,6 +144,17 @@ class PolicyEngineManager implements PolicyEngine {
private List<HttpServletServer> httpServers = new ArrayList<>();
/**
+ * Thread pool used to execute background tasks.
+ */
+ private ScheduledExecutorService executorService = null;
+
+ /**
+ * Lock manager used to create locks.
+ */
+ @Getter(AccessLevel.PROTECTED)
+ private PolicyResourceLockManager lockManager = null;
+
+ /**
* gson parser to decode configuration requests.
*/
private final Gson decoder = new GsonBuilder().disableHtmlEscaping().create();
@@ -214,6 +235,53 @@ class PolicyEngineManager implements PolicyEngine {
}
@Override
+ @JsonIgnore
+ @GsonJsonIgnore
+ public ScheduledExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ private ScheduledExecutorService makeExecutorService(Properties properties) {
+ int nthreads = DEFAULT_EXECUTOR_THREADS;
+ try {
+ nthreads = Integer.valueOf(
+ properties.getProperty(EXECUTOR_THREAD_PROP, String.valueOf(DEFAULT_EXECUTOR_THREADS)));
+
+ } catch (NumberFormatException e) {
+ logger.error("invalid number for " + EXECUTOR_THREAD_PROP + " property", e);
+ }
+
+ return makeScheduledExecutor(nthreads);
+ }
+
+ private void createLockManager(Properties properties) {
+ for (PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ this.lockManager = feature.beforeCreateLockManager(this, properties);
+ if (this.lockManager != null) {
+ return;
+ }
+ } catch (RuntimeException e) {
+ logger.error("{}: feature {} before-create-lock-manager failure because of {}", this,
+ feature.getClass().getName(), e.getMessage(), e);
+ }
+ }
+
+ try {
+ this.lockManager = new SimpleLockManager(this, properties);
+ } catch (RuntimeException e) {
+ logger.error("{}: cannot create simple lock manager because of {}", this, e.getMessage(), e);
+ this.lockManager = new SimpleLockManager(this, new Properties());
+ }
+
+ /* policy-engine dispatch post operation hook */
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterCreateLockManager(this, properties, this.lockManager),
+ (feature, ex) -> logger.error("{}: feature {} after-create-lock-manager failure because of {}",
+ this, feature.getClass().getName(), ex.getMessage(), ex));
+ }
+
+ @Override
public synchronized void configure(Properties properties) {
if (properties == null) {
@@ -257,6 +325,10 @@ class PolicyEngineManager implements PolicyEngine {
logger.error("{}: add-http-servers failed", this, e);
}
+ executorService = makeExecutorService(properties);
+
+ createLockManager(properties);
+
/* policy-engine dispatch post configure hook */
FeatureApiUtils.apply(getEngineProviders(),
feature -> feature.afterConfigure(this),
@@ -499,6 +571,13 @@ class PolicyEngineManager implements PolicyEngine {
AtomicReference<Boolean> success = new AtomicReference<>(true);
+ try {
+ success.compareAndSet(true, this.lockManager.start());
+ } catch (final RuntimeException e) {
+ logger.warn("{}: cannot start lock manager because of {}", this, e.getMessage(), e);
+ success.set(false);
+ }
+
/* Start Policy Engine exclusively-owned (unmanaged) http servers */
attempt(success, this.httpServers,
@@ -639,9 +718,16 @@ class PolicyEngineManager implements PolicyEngine {
(item, ex) -> logger.error("{}: cannot stop http-server {} because of {}", this, item,
ex.getMessage(), ex));
+ try {
+ success.compareAndSet(true, this.lockManager.stop());
+ } catch (final RuntimeException e) {
+ logger.warn("{}: cannot stop lock manager because of {}", this, e.getMessage(), e);
+ success.set(false);
+ }
+
// stop JMX?
- /* policy-engine dispatch pre stop hook */
+ /* policy-engine dispatch post stop hook */
FeatureApiUtils.apply(getEngineProviders(),
feature -> feature.afterStop(this),
(feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this,
@@ -688,6 +774,14 @@ class PolicyEngineManager implements PolicyEngine {
getTopicEndpointManager().shutdown();
getServletFactory().destroy();
+ try {
+ this.lockManager.shutdown();
+ } catch (final RuntimeException e) {
+ logger.warn("{}: cannot shutdown lock manager because of {}", this, e.getMessage(), e);
+ }
+
+ executorService.shutdownNow();
+
// Stop the JMX listener
stopPdpJmxListener();
@@ -806,6 +900,13 @@ class PolicyEngineManager implements PolicyEngine {
success = getTopicEndpointManager().lock() && success;
+ try {
+ success = (this.lockManager == null || this.lockManager.lock()) && success;
+ } catch (final RuntimeException e) {
+ logger.warn("{}: cannot lock() lock manager because of {}", this, e.getMessage(), e);
+ success = false;
+ }
+
/* policy-engine dispatch post lock hook */
FeatureApiUtils.apply(getEngineProviders(),
feature -> feature.afterLock(this),
@@ -833,6 +934,14 @@ class PolicyEngineManager implements PolicyEngine {
this.locked = false;
boolean success = true;
+
+ try {
+ success = (this.lockManager == null || this.lockManager.unlock()) && success;
+ } catch (final RuntimeException e) {
+ logger.warn("{}: cannot unlock() lock manager because of {}", this, e.getMessage(), e);
+ success = false;
+ }
+
final List<PolicyController> controllers = getControllerFactory().inventory();
for (final PolicyController controller : controllers) {
try {
@@ -1167,6 +1276,21 @@ class PolicyEngineManager implements PolicyEngine {
feature.getClass().getName(), ex.getMessage(), ex));
}
+ @Override
+ public Lock createLock(@NonNull String resourceId, @NonNull String ownerKey, int holdSec,
+ @NonNull LockCallback callback, boolean waitForLock) {
+
+ if (holdSec < 0) {
+ throw new IllegalArgumentException("holdSec is negative");
+ }
+
+ if (lockManager == null) {
+ throw new IllegalStateException("lock manager has not been initialized");
+ }
+
+ return lockManager.createLock(resourceId, ownerKey, holdSec, callback, waitForLock);
+ }
+
private boolean controllerConfig(PdpdConfiguration config) {
/* only this one supported for now */
final List<ControllerConfiguration> configControllers = config.getControllers();
@@ -1234,4 +1358,13 @@ class PolicyEngineManager implements PolicyEngine {
protected PolicyEngine getPolicyEngine() {
return PolicyEngineConstants.getManager();
}
+
+ protected ScheduledExecutorService makeScheduledExecutor(int nthreads) {
+ ScheduledThreadPoolExecutor exsvc = new ScheduledThreadPoolExecutor(nthreads);
+ exsvc.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ exsvc.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ exsvc.setRemoveOnCancelPolicy(true);
+
+ return exsvc;
+ }
}
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java
new file mode 100644
index 00000000..f5163e9b
--- /dev/null
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java
@@ -0,0 +1,426 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.system.internal;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+import org.onap.policy.common.utils.time.CurrentTime;
+import org.onap.policy.drools.core.lock.AlwaysFailLock;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.LockImpl;
+import org.onap.policy.drools.core.lock.LockState;
+import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Simple implementation of the Lock Feature. Locks do not span across instances of this
+ * object (i.e., locks do not span across servers).
+ *
+ * <p/>
+ * Note: this implementation does <i>not</i> honor the waitForLocks={@code true}
+ * parameter.
+ *
+ * <p/>
+ * When a lock is deserialized, it will not initially appear in this feature's map; it
+ * will be added to the map once free() or extend() is invoked, provided there isn't
+ * already an entry.
+ */
+public class SimpleLockManager implements PolicyResourceLockManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(SimpleLockManager.class);
+
+ private static final String NOT_LOCKED_MSG = "not locked";
+ private static final String LOCK_LOST_MSG = "lock lost";
+
+ /**
+ * Provider of current time. May be overridden by junit tests.
+ */
+ private static CurrentTime currentTime = new CurrentTime();
+
+ @Getter(AccessLevel.PROTECTED)
+ @Setter(AccessLevel.PROTECTED)
+ private static SimpleLockManager latestInstance = null;
+
+
+ /**
+ * Engine with which this manager is associated.
+ */
+ private final PolicyEngine engine;
+
+ /**
+ * Feature properties.
+ */
+ private final SimpleLockProperties featProps;
+
+ /**
+ * Maps a resource to the lock that owns it.
+ */
+ private final Map<String, SimpleLock> resource2lock = new ConcurrentHashMap<>();
+
+ /**
+ * Thread pool used to check for lock expiration and to notify owners when locks are
+ * lost.
+ */
+ private ScheduledExecutorService exsvc = null;
+
+ /**
+ * Used to cancel the expiration checker on shutdown.
+ */
+ private ScheduledFuture<?> checker = null;
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param engine engine with which this manager is associated
+ * @param properties properties used to configure the manager
+ */
+ public SimpleLockManager(PolicyEngine engine, Properties properties) {
+ try {
+ this.engine = engine;
+ this.featProps = new SimpleLockProperties(properties);
+
+ } catch (PropertyException e) {
+ throw new SimpleLockManagerException(e);
+ }
+ }
+
+ @Override
+ public boolean isAlive() {
+ return (checker != null);
+ }
+
+ @Override
+ public boolean start() {
+ if (checker != null) {
+ return false;
+ }
+
+ exsvc = getThreadPool();
+
+ checker = exsvc.scheduleWithFixedDelay(this::checkExpired, featProps.getExpireCheckSec(),
+ featProps.getExpireCheckSec(), TimeUnit.SECONDS);
+
+ setLatestInstance(this);
+
+ return true;
+ }
+
+ /**
+ * Stops the expiration checker. Does <i>not</i> invoke any lock call-backs.
+ */
+ @Override
+ public synchronized boolean stop() {
+ exsvc = null;
+
+ if (checker == null) {
+ return false;
+ }
+
+ ScheduledFuture<?> checker2 = checker;
+ checker = null;
+
+ checker2.cancel(true);
+
+ return true;
+ }
+
+ @Override
+ public void shutdown() {
+ stop();
+ }
+
+ @Override
+ public boolean isLocked() {
+ return false;
+ }
+
+ @Override
+ public boolean lock() {
+ return true;
+ }
+
+ @Override
+ public boolean unlock() {
+ return true;
+ }
+
+ @Override
+ public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback,
+ boolean waitForLock) {
+
+ if (latestInstance != this) {
+ AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback);
+ lock.notifyUnavailable();
+ return lock;
+ }
+
+ SimpleLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback);
+
+ SimpleLock existingLock = resource2lock.putIfAbsent(resourceId, lock);
+
+ if (existingLock == null) {
+ lock.grant();
+ } else {
+ lock.deny("resource is busy");
+ }
+
+ return lock;
+ }
+
+ /**
+ * Checks for expired locks.
+ */
+ private void checkExpired() {
+ long currentMs = currentTime.getMillis();
+ logger.info("checking for expired locks at {}", currentMs);
+
+ /*
+ * Could do this via an iterator, but using compute() guarantees that the lock
+ * doesn't get extended while it's being removed from the map.
+ */
+ for (Entry<String, SimpleLock> ent : resource2lock.entrySet()) {
+ if (!ent.getValue().expired(currentMs)) {
+ continue;
+ }
+
+ AtomicReference<SimpleLock> lockref = new AtomicReference<>(null);
+
+ resource2lock.computeIfPresent(ent.getKey(), (resourceId, lock) -> {
+ if (lock.expired(currentMs)) {
+ lockref.set(lock);
+ return null;
+ }
+
+ return lock;
+ });
+
+ SimpleLock lock = lockref.get();
+ if (lock != null) {
+ lock.deny("lock expired");
+ }
+ }
+ }
+
+ /**
+ * Simple Lock implementation.
+ */
+ public static class SimpleLock extends LockImpl {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Time, in milliseconds, when the lock expires.
+ */
+ @Getter
+ private long holdUntilMs;
+
+ /**
+ * Feature containing this lock. May be {@code null} until the feature is
+ * identified. Note: this can only be null if the lock has been de-serialized.
+ */
+ private transient SimpleLockManager feature;
+
+ /**
+ * Constructs the object.
+ */
+ public SimpleLock() {
+ this.holdUntilMs = 0;
+ this.feature = null;
+ }
+
+ /**
+ * Constructs the object.
+ *
+ * @param state initial state of the lock
+ * @param resourceId identifier of the resource to be locked
+ * @param ownerKey information identifying the owner requesting the lock
+ * @param holdSec amount of time, in seconds, for which the lock should be held,
+ * after which it will automatically be released
+ * @param callback callback to be invoked once the lock is granted, or
+ * subsequently lost; must not be {@code null}
+ * @param feature feature containing this lock
+ */
+ public SimpleLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback,
+ SimpleLockManager feature) {
+ super(state, resourceId, ownerKey, holdSec, callback);
+ this.feature = feature;
+ }
+
+ /**
+ * Determines if the owner's lock has expired.
+ *
+ * @param currentMs current time, in milliseconds
+ * @return {@code true} if the owner's lock has expired, {@code false} otherwise
+ */
+ public boolean expired(long currentMs) {
+ return (holdUntilMs <= currentMs);
+ }
+
+ /**
+ * Grants this lock. The notification is <i>always</i> invoked via a background
+ * thread.
+ */
+ protected synchronized void grant() {
+ if (isUnavailable()) {
+ return;
+ }
+
+ setState(LockState.ACTIVE);
+ holdUntilMs = currentTime.getMillis() + TimeUnit.SECONDS.toMillis(getHoldSec());
+
+ logger.info("lock granted: {}", this);
+
+ feature.exsvc.execute(this::notifyAvailable);
+ }
+
+ /**
+ * Permanently denies this lock. The notification is invoked via a background
+ * thread, if a feature instance is attached, otherwise it uses the foreground
+ * thread.
+ *
+ * @param reason the reason the lock was denied
+ */
+ protected void deny(String reason) {
+ synchronized (this) {
+ setState(LockState.UNAVAILABLE);
+ }
+
+ logger.info("{}: {}", reason, this);
+
+ if (feature == null) {
+ notifyUnavailable();
+
+ } else {
+ feature.exsvc.execute(this::notifyUnavailable);
+ }
+ }
+
+ @Override
+ public boolean free() {
+ // do a quick check of the state
+ if (isUnavailable()) {
+ return false;
+ }
+
+ logger.info("releasing lock: {}", this);
+
+ if (!attachFeature()) {
+ setState(LockState.UNAVAILABLE);
+ return false;
+ }
+
+ AtomicBoolean result = new AtomicBoolean(false);
+
+ feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
+
+ if (curlock == this) {
+ // this lock was the owner - resource is now available
+ result.set(true);
+ setState(LockState.UNAVAILABLE);
+ return null;
+
+ } else {
+ return curlock;
+ }
+ });
+
+ return result.get();
+ }
+
+ @Override
+ public void extend(int holdSec, LockCallback callback) {
+ if (holdSec < 0) {
+ throw new IllegalArgumentException("holdSec is negative");
+ }
+
+ setHoldSec(holdSec);
+ setCallback(callback);
+
+ // do a quick check of the state
+ if (isUnavailable() || !attachFeature()) {
+ deny(LOCK_LOST_MSG);
+ return;
+ }
+
+ if (feature.resource2lock.get(getResourceId()) == this) {
+ grant();
+ } else {
+ deny(NOT_LOCKED_MSG);
+ }
+ }
+
+ /**
+ * Attaches to the feature instance, if not already attached.
+ *
+ * @return {@code true} if the lock is now attached to a feature, {@code false}
+ * otherwise
+ */
+ private synchronized boolean attachFeature() {
+ if (feature != null) {
+ // already attached
+ return true;
+ }
+
+ feature = latestInstance;
+ if (feature == null) {
+ logger.warn("no feature yet for {}", this);
+ return false;
+ }
+
+ // put this lock into the map
+ feature.resource2lock.putIfAbsent(getResourceId(), this);
+
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "SimpleLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey=" + getOwnerKey()
+ + ", holdSec=" + getHoldSec() + ", holdUntilMs=" + holdUntilMs + "]";
+ }
+ }
+
+ // these may be overridden by junit tests
+
+ protected ScheduledExecutorService getThreadPool() {
+ return engine.getExecutorService();
+ }
+
+ protected SimpleLock makeLock(LockState waiting, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+ return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, this);
+ }
+}
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManagerException.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManagerException.java
new file mode 100644
index 00000000..ff02f39e
--- /dev/null
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManagerException.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.system.internal;
+
+public class SimpleLockManagerException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructor.
+ *
+ * @param ex exception to be wrapped
+ */
+ public SimpleLockManagerException(Exception ex) {
+ super(ex);
+ }
+}
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockProperties.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockProperties.java
new file mode 100644
index 00000000..0d1ca89b
--- /dev/null
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockProperties.java
@@ -0,0 +1,52 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.system.internal;
+
+import java.util.Properties;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.utils.properties.BeanConfigurator;
+import org.onap.policy.common.utils.properties.Property;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+
+
+@Getter
+@Setter
+public class SimpleLockProperties {
+ public static final String PREFIX = "simple.locking.";
+ public static final String EXPIRE_CHECK_SEC = PREFIX + "expire.check.seconds";
+
+ /**
+ * Time, in seconds, to wait between checks for expired locks.
+ */
+ @Property(name = EXPIRE_CHECK_SEC, defaultValue = "900")
+ private int expireCheckSec;
+
+ /**
+ * Constructs the object, populating fields from the properties.
+ *
+ * @param props properties from which to configure this
+ * @throws PropertyException if an error occurs
+ */
+ public SimpleLockProperties(Properties props) throws PropertyException {
+ new BeanConfigurator().configureFromProperties(this, props);
+ }
+}
diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java
index 5e0ead9d..fe1a2345 100644
--- a/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java
+++ b/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java
@@ -27,12 +27,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -41,6 +43,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.junit.Before;
@@ -53,6 +56,10 @@ import org.onap.policy.common.endpoints.http.server.HttpServletServer;
import org.onap.policy.common.endpoints.http.server.HttpServletServerFactory;
import org.onap.policy.common.utils.gson.GsonTestUtils;
import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.LockImpl;
+import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
import org.onap.policy.drools.features.PolicyControllerFeatureApi;
import org.onap.policy.drools.features.PolicyEngineFeatureApi;
import org.onap.policy.drools.persistence.SystemPersistence;
@@ -61,9 +68,10 @@ import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
import org.onap.policy.drools.protocol.configuration.ControllerConfiguration;
import org.onap.policy.drools.protocol.configuration.DroolsConfiguration;
import org.onap.policy.drools.protocol.configuration.PdpdConfiguration;
+import org.onap.policy.drools.system.internal.SimpleLockManager;
+import org.onap.policy.drools.system.internal.SimpleLockProperties;
public class PolicyEngineManagerTest {
-
private static final String EXPECTED = "expected exception";
private static final String NOOP_STR = CommInfrastructure.NOOP.name();
@@ -76,6 +84,8 @@ public class PolicyEngineManagerTest {
private static final String FEATURE2 = "feature-b";
private static final String MY_TOPIC = "my-topic";
private static final String MESSAGE = "my-message";
+ private static final String MY_OWNER = "my-owner";
+ private static final String MY_RESOURCE = "my-resource";
private static final Object MY_EVENT = new Object();
@@ -125,6 +135,8 @@ public class PolicyEngineManagerTest {
private PdpdConfiguration pdpConfig;
private String pdpConfigJson;
private PolicyEngineManager mgr;
+ private ScheduledExecutorService exsvc;
+ private PolicyResourceLockManager lockmgr;
/**
* Initializes the object to be tested.
@@ -176,6 +188,15 @@ public class PolicyEngineManagerTest {
config3 = new ControllerConfiguration();
config4 = new ControllerConfiguration();
pdpConfig = new PdpdConfiguration();
+ exsvc = mock(ScheduledExecutorService.class);
+ lockmgr = mock(PolicyResourceLockManager.class);
+
+ when(lockmgr.start()).thenReturn(true);
+ when(lockmgr.stop()).thenReturn(true);
+ when(lockmgr.lock()).thenReturn(true);
+ when(lockmgr.unlock()).thenReturn(true);
+
+ when(prov2.beforeCreateLockManager(any(), any())).thenReturn(lockmgr);
when(prov1.getName()).thenReturn(FEATURE1);
when(prov2.getName()).thenReturn(FEATURE2);
@@ -387,6 +408,86 @@ public class PolicyEngineManagerTest {
assertFalse(config.isEmpty());
}
+ /**
+ * Tests that makeExecutorService() uses the value from the thread
+ * property.
+ */
+ @Test
+ public void testMakeExecutorServicePropertyProvided() {
+ PolicyEngineManager mgrspy = spy(mgr);
+
+ properties.setProperty(PolicyEngineManager.EXECUTOR_THREAD_PROP, "3");
+ mgrspy.configure(properties);
+ assertSame(exsvc, mgrspy.getExecutorService());
+ verify(mgrspy).makeScheduledExecutor(3);
+ }
+
+ /**
+ * Tests that makeExecutorService() uses the default thread count when no thread
+ * property is provided.
+ */
+ @Test
+ public void testMakeExecutorServiceNoProperty() {
+ PolicyEngineManager mgrspy = spy(mgr);
+
+ mgrspy.configure(properties);
+ assertSame(exsvc, mgrspy.getExecutorService());
+ verify(mgrspy).makeScheduledExecutor(PolicyEngineManager.DEFAULT_EXECUTOR_THREADS);
+ }
+
+ /**
+ * Tests that makeExecutorService() uses the default thread count when the thread
+ * property is invalid.
+ */
+ @Test
+ public void testMakeExecutorServiceInvalidProperty() {
+ PolicyEngineManager mgrspy = spy(mgr);
+
+ properties.setProperty(PolicyEngineManager.EXECUTOR_THREAD_PROP, "abc");
+ mgrspy.configure(properties);
+ assertSame(exsvc, mgrspy.getExecutorService());
+ verify(mgrspy).makeScheduledExecutor(PolicyEngineManager.DEFAULT_EXECUTOR_THREADS);
+ }
+
+ /**
+ * Tests createLockManager() when beforeCreateLock throws an exception and returns a
+ * manager.
+ */
+ @Test
+ public void testCreateLockManagerHaveProvider() {
+ // first provider throws an exception
+ when(prov1.beforeCreateLockManager(any(), any())).thenThrow(new RuntimeException(EXPECTED));
+
+ mgr.configure(properties);
+ assertSame(lockmgr, mgr.getLockManager());
+ }
+
+ /**
+ * Tests createLockManager() when SimpleLockManager throws an exception.
+ */
+ @Test
+ public void testCreateLockManagerSimpleEx() {
+ when(prov2.beforeCreateLockManager(any(), any())).thenReturn(null);
+
+ // invalid property for SimpleLockManager
+ properties.setProperty(SimpleLockProperties.EXPIRE_CHECK_SEC, "abc");
+ mgr.configure(properties);
+
+ // should create a manager using default properties
+ assertTrue(mgr.getLockManager() instanceof SimpleLockManager);
+ }
+
+ /**
+ * Tests createLockManager() when SimpleLockManager is returned.
+ */
+ @Test
+ public void testCreateLockManagerSimple() {
+ when(prov2.beforeCreateLockManager(any(), any())).thenReturn(null);
+
+ mgr.configure(properties);
+ assertTrue(mgr.getLockManager() instanceof SimpleLockManager);
+ }
+
@Test
public void testConfigureProperties() throws Exception {
// arrange for first provider to throw exceptions
@@ -667,6 +768,12 @@ public class PolicyEngineManagerTest {
when(sink1.start()).thenThrow(new RuntimeException(EXPECTED));
});
+ // lock manager fails to start - still does everything
+ testStart(false, () -> when(lockmgr.start()).thenReturn(false));
+
+ // lock manager throws an exception - still does everything
+ testStart(false, () -> when(lockmgr.start()).thenThrow(new RuntimeException(EXPECTED)));
+
// servlet wait fails - still does everything
testStart(false, () -> when(server1.waitedStart(anyLong())).thenReturn(false));
@@ -796,6 +903,12 @@ public class PolicyEngineManagerTest {
// servlet fails to stop - still does everything
testStop(false, () -> when(server1.stop()).thenReturn(false));
+ // lock manager fails to stop - still does everything
+ testStop(false, () -> when(lockmgr.stop()).thenReturn(false));
+
+ // lock manager throws an exception - still does everything
+ testStop(false, () -> when(lockmgr.stop()).thenThrow(new RuntimeException(EXPECTED)));
+
// other tests
checkBeforeAfter(
(prov, flag) -> when(prov.beforeStop(mgr)).thenReturn(flag),
@@ -861,6 +974,10 @@ public class PolicyEngineManagerTest {
assertTrue(threadStarted);
assertTrue(threadInterrupted);
+
+ // lock manager throws an exception - still does everything
+ testShutdown(() -> doThrow(new RuntimeException(EXPECTED)).when(lockmgr).shutdown());
+
// other tests
checkBeforeAfter(
(prov, flag) -> when(prov.beforeShutdown(mgr)).thenReturn(flag),
@@ -906,6 +1023,8 @@ public class PolicyEngineManagerTest {
verify(prov1).afterShutdown(mgr);
verify(prov2).afterShutdown(mgr);
+
+ verify(exsvc).shutdownNow();
}
@Test
@@ -985,6 +1104,12 @@ public class PolicyEngineManagerTest {
// endpoint manager fails to lock - still does everything
testLock(false, () -> when(endpoint.lock()).thenReturn(false));
+ // lock manager fails to lock - still does everything
+ testLock(false, () -> when(lockmgr.lock()).thenReturn(false));
+
+ // lock manager throws an exception - still does everything
+ testLock(false, () -> when(lockmgr.lock()).thenThrow(new RuntimeException(EXPECTED)));
+
// other tests
checkBeforeAfter(
(prov, flag) -> when(prov.beforeLock(mgr)).thenReturn(flag),
@@ -1055,6 +1180,12 @@ public class PolicyEngineManagerTest {
// endpoint manager fails to unlock - still does everything
testUnlock(false, () -> when(endpoint.unlock()).thenReturn(false));
+ // lock manager fails to lock - still does everything
+ testUnlock(false, () -> when(lockmgr.unlock()).thenReturn(false));
+
+ // lock manager throws an exception - still does everything
+ testUnlock(false, () -> when(lockmgr.unlock()).thenThrow(new RuntimeException(EXPECTED)));
+
// other tests
checkBeforeAfter(
(prov, flag) -> when(prov.beforeUnlock(mgr)).thenReturn(flag),
@@ -1484,6 +1615,32 @@ public class PolicyEngineManagerTest {
}
@Test
+ public void testCreateLock() {
+ Lock lock = mock(Lock.class);
+ LockCallback callback = mock(LockCallback.class);
+ when(lockmgr.createLock(MY_RESOURCE, MY_OWNER, 10, callback, false)).thenReturn(lock);
+
+ // not configured yet, thus no lock manager
+ assertThatIllegalStateException()
+ .isThrownBy(() -> mgr.createLock(MY_RESOURCE, MY_OWNER, 10, callback, false));
+
+ // now configure it and try again
+ mgr.configure(properties);
+ assertSame(lock, mgr.createLock(MY_RESOURCE, MY_OWNER, 10, callback, false));
+
+ // test illegal args
+ assertThatThrownBy(() -> mgr.createLock(null, MY_OWNER, 10, callback, false))
+ .hasMessageContaining("resourceId");
+ assertThatThrownBy(() -> mgr.createLock(MY_RESOURCE, null, 10, callback, false))
+ .hasMessageContaining("ownerKey");
+ assertThatIllegalArgumentException()
+ .isThrownBy(() -> mgr.createLock(MY_RESOURCE, MY_OWNER, -1, callback, false))
+ .withMessageContaining("holdSec");
+ assertThatThrownBy(() -> mgr.createLock(MY_RESOURCE, MY_OWNER, 10, null, false))
+ .hasMessageContaining("callback");
+ }
+
+ @Test
public void testOpen() throws Throwable {
when(prov1.beforeOpen(mgr)).thenThrow(new RuntimeException(EXPECTED));
when(prov1.afterOpen(mgr)).thenThrow(new RuntimeException(EXPECTED));
@@ -1789,6 +1946,11 @@ public class PolicyEngineManagerTest {
return engine;
}
+ @Override
+ protected ScheduledExecutorService makeScheduledExecutor(int nthreads) {
+ return exsvc;
+ }
+
/**
* Shutdown thread with overrides.
*/
diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerExceptionTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerExceptionTest.java
new file mode 100644
index 00000000..7ffc72ff
--- /dev/null
+++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerExceptionTest.java
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.system.internal;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.onap.policy.common.utils.test.ExceptionsTester;
+import org.onap.policy.drools.system.internal.SimpleLockManagerException;
+
+public class SimpleLockManagerExceptionTest extends ExceptionsTester {
+
+ @Test
+ public void test() {
+ assertEquals(1, test(SimpleLockManagerException.class));
+ }
+}
diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java
new file mode 100644
index 00000000..66406898
--- /dev/null
+++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java
@@ -0,0 +1,781 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.system.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.utils.time.CurrentTime;
+import org.onap.policy.common.utils.time.TestTime;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.LockState;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.onap.policy.drools.system.PolicyEngineConstants;
+import org.onap.policy.drools.system.internal.SimpleLockManager.SimpleLock;
+import org.powermock.reflect.Whitebox;
+
+public class SimpleLockManagerTest {
+ private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
+ private static final String TIME_FIELD = "currentTime";
+ private static final String OWNER_KEY = "my key";
+ private static final String RESOURCE = "my resource";
+ private static final String RESOURCE2 = "my resource #2";
+ private static final String RESOURCE3 = "my resource #3";
+ private static final int HOLD_SEC = 100;
+ private static final int HOLD_SEC2 = 120;
+ private static final int HOLD_MS = HOLD_SEC * 1000;
+ private static final int HOLD_MS2 = HOLD_SEC2 * 1000;
+ private static final int MAX_THREADS = 10;
+ private static final int MAX_LOOPS = 50;
+
+ private static CurrentTime saveTime;
+ private static ScheduledExecutorService saveExec;
+ private static ScheduledExecutorService realExec;
+
+ private TestTime testTime;
+ private AtomicInteger nactive;
+ private AtomicInteger nsuccesses;
+ private SimpleLockManager feature;
+
+ @Mock
+ private ScheduledExecutorService exsvc;
+
+ @Mock
+ private PolicyEngine engine;
+
+ @Mock
+ private ScheduledFuture<?> future;
+
+ @Mock
+ private LockCallback callback;
+
+
+ /**
+ * Saves static fields and configures the location of the property files.
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ saveTime = Whitebox.getInternalState(SimpleLockManager.class, TIME_FIELD);
+ saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
+
+ realExec = Executors.newScheduledThreadPool(3);
+ Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
+ }
+
+ /**
+ * Restores static fields.
+ */
+ @AfterClass
+ public static void tearDownAfterClass() {
+ Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, saveTime);
+ Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
+
+ realExec.shutdown();
+ }
+
+ /**
+ * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
+ * tasks.
+ */
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ testTime = new TestTime();
+ nactive = new AtomicInteger(0);
+ nsuccesses = new AtomicInteger(0);
+
+ Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, testTime);
+
+ when(engine.getExecutorService()).thenReturn(exsvc);
+
+ feature = new MyLockingFeature();
+ feature.start();
+ }
+
+ /**
+ * Tests constructor() when properties are invalid.
+ */
+ @Test
+ public void testSimpleLockManagerInvalidProperties() {
+ // use properties containing an invalid value
+ Properties props = new Properties();
+ props.setProperty(SimpleLockProperties.EXPIRE_CHECK_SEC, "abc");
+
+ assertThatThrownBy(() -> new MyLockingFeature(engine, props)).isInstanceOf(SimpleLockManagerException.class);
+ }
+
+ @Test
+ public void testIsAlive() {
+ assertTrue(feature.isAlive());
+
+ feature.stop();
+ assertFalse(feature.isAlive());
+ }
+
+ @Test
+ public void testStart() {
+ assertFalse(feature.start());
+
+ feature.stop();
+ assertTrue(feature.start());
+ }
+
+ @Test
+ public void testStop() {
+ assertTrue(feature.stop());
+ verify(future).cancel(true);
+
+ assertFalse(feature.stop());
+
+ // no more invocations
+ verify(future).cancel(anyBoolean());
+ }
+
+ @Test
+ public void testShutdown() {
+ feature.shutdown();
+
+ verify(future).cancel(true);
+ }
+
+ @Test
+ public void testLockApi() {
+ assertFalse(feature.isLocked());
+ assertTrue(feature.lock());
+ assertTrue(feature.unlock());
+ }
+
+ @Test
+ public void testCreateLock() {
+ // this lock should be granted immediately
+ SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertTrue(lock.isActive());
+ assertEquals(testTime.getMillis() + HOLD_MS, lock.getHoldUntilMs());
+
+ invokeCallback(1);
+
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+
+ // this time it should be busy
+ Lock lock2 = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertFalse(lock2.isActive());
+ assertTrue(lock2.isUnavailable());
+
+ invokeCallback(2);
+
+ verify(callback, never()).lockAvailable(lock2);
+ verify(callback).lockUnavailable(lock2);
+
+ // should have been no change to the original lock
+ assertTrue(lock.isActive());
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ // should work with "true" value also
+ Lock lock3 = feature.createLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, true);
+ assertTrue(lock3.isActive());
+ invokeCallback(3);
+ verify(callback).lockAvailable(lock3);
+ verify(callback, never()).lockUnavailable(lock3);
+ }
+
+ /**
+ * Tests lock() when the feature is not the latest instance.
+ */
+ @Test
+ public void testCreateLockNotLatestInstance() {
+ SimpleLockManager.setLatestInstance(null);
+
+ Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertTrue(lock.isUnavailable());
+ verify(callback, never()).lockAvailable(any());
+ verify(callback).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testCheckExpired() throws InterruptedException {
+ final SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ final SimpleLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
+ final SimpleLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC2, callback, false);
+
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any());
+
+ Runnable checker = captor.getValue();
+
+ // time unchanged - checker should have no impact
+ checker.run();
+ assertTrue(lock.isActive());
+ assertTrue(lock2.isActive());
+ assertTrue(lock3.isActive());
+
+ // expire the first two locks
+ testTime.sleep(HOLD_MS);
+ checker.run();
+ assertFalse(lock.isActive());
+ assertFalse(lock2.isActive());
+ assertTrue(lock3.isActive());
+
+ // run the callbacks
+ captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc, times(5)).execute(captor.capture());
+ captor.getAllValues().forEach(Runnable::run);
+ verify(callback).lockUnavailable(lock);
+ verify(callback).lockUnavailable(lock2);
+ verify(callback, never()).lockUnavailable(lock3);
+
+ // should be able to get a lock on the first two resources
+ assertTrue(feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC + HOLD_SEC2, callback, false).isActive());
+ assertTrue(feature.createLock(RESOURCE2, OWNER_KEY, HOLD_SEC + HOLD_SEC2, callback, false).isActive());
+
+ // lock is still busy on the last resource
+ assertFalse(feature.createLock(RESOURCE3, OWNER_KEY, HOLD_SEC + HOLD_SEC2, callback, false).isActive());
+
+ // expire the last lock
+ testTime.sleep(HOLD_MS2);
+ checker.run();
+ assertFalse(lock3.isActive());
+
+ // run the callback
+ captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc, times(9)).execute(captor.capture());
+ captor.getValue().run();
+ verify(callback).lockUnavailable(lock3);
+ }
+
+ /**
+ * Tests checkExpired(), where the lock is removed from the map between invoking
+ * expired() and compute(). Should cause "null" to be returned by compute().
+ *
+ * @throws InterruptedException if the test is interrupted
+ */
+ @Test
+ public void testCheckExpiredLockDeleted() throws InterruptedException {
+ feature = new MyLockingFeature() {
+ @Override
+ protected SimpleLock makeLock(LockState waiting, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+ return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, feature) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean expired(long currentMs) {
+ // remove the lock from the map
+ free();
+ return true;
+ }
+ };
+ }
+ };
+
+ feature.start();
+
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ invokeCallback(1);
+
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any());
+
+ Runnable checker = captor.getValue();
+
+ checker.run();
+
+ // lock should now be gone and we should be able to get another
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ invokeCallback(2);
+
+ // should have succeeded twice
+ verify(callback, times(2)).lockAvailable(any());
+
+ // lock should not be available now
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ invokeCallback(3);
+ verify(callback).lockUnavailable(any());
+ }
+
+ /**
+ * Tests checkExpired(), where the lock is removed from the map and replaced with a
+ * new lock, between invoking expired() and compute(). Should cause the new lock to be
+ * returned.
+ *
+ * @throws InterruptedException if the test is interrupted
+ */
+ @Test
+ public void testCheckExpiredLockReplaced() throws InterruptedException {
+ feature = new MyLockingFeature() {
+ private boolean madeLock = false;
+
+ @Override
+ protected SimpleLock makeLock(LockState waiting, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+ if (madeLock) {
+ return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, feature);
+ }
+
+ madeLock = true;
+
+ return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, feature) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean expired(long currentMs) {
+ // remove the lock from the map and add a new lock
+ free();
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ return true;
+ }
+ };
+ }
+ };
+
+ feature.start();
+
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ invokeCallback(1);
+
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any());
+
+ Runnable checker = captor.getValue();
+
+ checker.run();
+
+ // lock should not be available now
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ invokeCallback(3);
+ verify(callback).lockUnavailable(any());
+ }
+
+ @Test
+ public void testGetThreadPool() {
+ // use a real feature
+ feature = new SimpleLockManager(engine, new Properties());
+
+ // load properties
+ feature.start();
+
+ // should create thread pool
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // should shut down thread pool
+ feature.stop();
+ }
+
+ @Test
+ public void testSimpleLockNoArgs() {
+ SimpleLock lock = new SimpleLock();
+ assertNull(lock.getResourceId());
+ assertNull(lock.getOwnerKey());
+ assertNull(lock.getCallback());
+ assertEquals(0, lock.getHoldSec());
+
+ assertEquals(0, lock.getHoldUntilMs());
+ }
+
+ @Test
+ public void testSimpleLockSimpleLock() {
+ SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertEquals(RESOURCE, lock.getResourceId());
+ assertEquals(OWNER_KEY, lock.getOwnerKey());
+ assertSame(callback, lock.getCallback());
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+
+ assertThatIllegalArgumentException()
+ .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
+ .withMessageContaining("holdSec");
+ }
+
+ @Test
+ public void testSimpleLockSerializable() throws Exception {
+ SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ lock = roundTrip(lock);
+
+ assertTrue(lock.isActive());
+
+ assertEquals(RESOURCE, lock.getResourceId());
+ assertEquals(OWNER_KEY, lock.getOwnerKey());
+ assertNull(lock.getCallback());
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+ }
+
+ @Test
+ public void testSimpleLockExpired() {
+ SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ lock.grant();
+
+ assertFalse(lock.expired(testTime.getMillis()));
+ assertFalse(lock.expired(testTime.getMillis() + HOLD_MS - 1));
+ assertTrue(lock.expired(testTime.getMillis() + HOLD_MS));
+ }
+
+ /**
+ * Tests grant() when the lock is already unavailable.
+ */
+ @Test
+ public void testSimpleLockGrantUnavailable() {
+ SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ lock.setState(LockState.UNAVAILABLE);
+ lock.grant();
+
+ assertTrue(lock.isUnavailable());
+ verify(callback, never()).lockAvailable(any());
+ verify(callback, never()).lockUnavailable(any());
+ }
+
+ @Test
+ public void testSimpleLockFree() {
+ final SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // lock2 should be denied
+ SimpleLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ invokeCallback(2);
+ verify(callback, never()).lockAvailable(lock2);
+ verify(callback).lockUnavailable(lock2);
+
+ // lock2 was denied, so nothing new should happen when freed
+ assertFalse(lock2.free());
+ invokeCallback(2);
+
+ // force lock2 to be active - still nothing should happen
+ Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
+ assertFalse(lock2.free());
+ invokeCallback(2);
+
+ // now free the first lock
+ assertTrue(lock.free());
+ assertEquals(LockState.UNAVAILABLE, lock.getState());
+
+ // should be able to get the lock now
+ SimpleLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertTrue(lock3.isActive());
+ }
+
+ /**
+ * Tests that free() works on a serialized lock with a new feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testSimpleLockFreeSerialized() throws Exception {
+ SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ feature = new MyLockingFeature();
+ feature.start();
+
+ lock = roundTrip(lock);
+ assertTrue(lock.free());
+ assertTrue(lock.isUnavailable());
+ }
+
+ /**
+ * Tests free() on a serialized lock without a feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testSimpleLockFreeNoFeature() throws Exception {
+ SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ SimpleLockManager.setLatestInstance(null);
+
+ lock = roundTrip(lock);
+ assertFalse(lock.free());
+ assertTrue(lock.isUnavailable());
+ }
+
+ @Test
+ public void testSimpleLockExtend() {
+ final SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // lock2 should be denied
+ SimpleLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ invokeCallback(2);
+ verify(callback, never()).lockAvailable(lock2);
+ verify(callback).lockUnavailable(lock2);
+
+ // lock2 will still be denied
+ lock2.extend(HOLD_SEC, callback);
+ invokeCallback(3);
+ verify(callback, times(2)).lockUnavailable(lock2);
+
+ // force lock2 to be active - should still be denied
+ Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
+ lock2.extend(HOLD_SEC, callback);
+ invokeCallback(4);
+ verify(callback, times(3)).lockUnavailable(lock2);
+
+ assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
+ .withMessageContaining("holdSec");
+
+ // now extend the first lock
+ lock.extend(HOLD_SEC2, callback);
+ assertEquals(HOLD_SEC2, lock.getHoldSec());
+ assertEquals(testTime.getMillis() + HOLD_MS2, lock.getHoldUntilMs());
+ invokeCallback(5);
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests that extend() works on a serialized lock with a new feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testSimpleLockExtendSerialized() throws Exception {
+ SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ feature = new MyLockingFeature();
+ feature.start();
+
+ lock = roundTrip(lock);
+ LockCallback scallback = mock(LockCallback.class);
+
+ lock.extend(HOLD_SEC, scallback);
+ assertTrue(lock.isActive());
+
+ invokeCallback(1);
+ verify(scallback).lockAvailable(lock);
+ verify(scallback, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests extend() on a serialized lock without a feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testSimpleLockExtendNoFeature() throws Exception {
+ SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ SimpleLockManager.setLatestInstance(null);
+
+ lock = roundTrip(lock);
+ LockCallback scallback = mock(LockCallback.class);
+
+ lock.extend(HOLD_SEC, scallback);
+ assertTrue(lock.isUnavailable());
+
+ invokeCallback(1);
+ verify(scallback, never()).lockAvailable(lock);
+ verify(scallback).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testSimpleLockToString() {
+ String text = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
+ assertNotNull(text);
+ assertThat(text).contains("holdUntil").doesNotContain("ownerInfo").doesNotContain("callback");
+ }
+
+ /**
+ * Performs a multi-threaded test of the locking facility.
+ *
+ * @throws InterruptedException if the current thread is interrupted while waiting for
+ * the background threads to complete
+ */
+ @Test
+ public void testMultiThreaded() throws InterruptedException {
+ Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, testTime);
+ feature = new SimpleLockManager(PolicyEngineConstants.getManager(), new Properties());
+ feature.start();
+
+ List<MyThread> threads = new ArrayList<>(MAX_THREADS);
+ for (int x = 0; x < MAX_THREADS; ++x) {
+ threads.add(new MyThread());
+ }
+
+ threads.forEach(Thread::start);
+
+ for (MyThread thread : threads) {
+ thread.join(6000);
+ assertFalse(thread.isAlive());
+ }
+
+ for (MyThread thread : threads) {
+ if (thread.err != null) {
+ throw thread.err;
+ }
+ }
+
+ assertTrue(nsuccesses.get() > 0);
+ }
+
+ private SimpleLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
+ boolean waitForLock) {
+ return (SimpleLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
+ }
+
+ private SimpleLock roundTrip(SimpleLock lock) throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(lock);
+ }
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ try (ObjectInputStream ois = new ObjectInputStream(bais)) {
+ return (SimpleLock) ois.readObject();
+ }
+ }
+
+ /**
+ * Invokes the last call-back in the work queue.
+ *
+ * @param nexpected number of call-backs expected in the work queue
+ */
+ private void invokeCallback(int nexpected) {
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc, times(nexpected)).execute(captor.capture());
+
+ if (nexpected > 0) {
+ captor.getAllValues().get(nexpected - 1).run();
+ }
+ }
+
+ /**
+ * Feature that uses <i>exsvc</i> to execute requests.
+ */
+ private class MyLockingFeature extends SimpleLockManager {
+
+ public MyLockingFeature() {
+ this(engine, new Properties());
+ }
+
+ public MyLockingFeature(PolicyEngine engine, Properties props) {
+ super(engine, props);
+
+ exsvc = mock(ScheduledExecutorService.class);
+ when(engine.getExecutorService()).thenReturn(exsvc);
+
+ when(exsvc.scheduleWithFixedDelay(any(), anyLong(), anyLong(), any())).thenAnswer(answer -> {
+ return future;
+ });
+ }
+ }
+
+ /**
+ * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
+ * extend it, and then unlock it.
+ */
+ private class MyThread extends Thread {
+ AssertionError err = null;
+
+ public MyThread() {
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (int x = 0; x < MAX_LOOPS; ++x) {
+ makeAttempt();
+ }
+
+ } catch (AssertionError e) {
+ err = e;
+ }
+ }
+
+ private void makeAttempt() {
+ try {
+ Semaphore sem = new Semaphore(0);
+
+ LockCallback cb = new LockCallback() {
+ @Override
+ public void lockAvailable(Lock lock) {
+ sem.release();
+ }
+
+ @Override
+ public void lockUnavailable(Lock lock) {
+ sem.release();
+ }
+ };
+
+ Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
+
+ // wait for callback, whether available or unavailable
+ assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
+ if (!lock.isActive()) {
+ return;
+ }
+
+ nsuccesses.incrementAndGet();
+
+ assertEquals(1, nactive.incrementAndGet());
+
+ lock.extend(HOLD_SEC2, cb);
+ assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
+ assertTrue(lock.isActive());
+
+ // decrement BEFORE free()
+ nactive.decrementAndGet();
+
+ assertTrue(lock.free());
+ assertTrue(lock.isUnavailable());
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new AssertionError("interrupted", e);
+ }
+ }
+ }
+}