summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--feature-distributed-locking/pom.xml7
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java936
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java (renamed from feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java)12
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java136
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java179
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java127
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java282
-rw-r--r--feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi1
-rw-r--r--feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi2
-rw-r--r--feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerExceptionTest.java (renamed from feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureExceptionTest.java)12
-rw-r--r--feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java1818
-rw-r--r--feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java81
-rw-r--r--feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java107
-rw-r--r--feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java345
-rw-r--r--feature-distributed-locking/src/test/resources/feature-distributed-locking.properties13
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/lock/AlwaysFailLock.java71
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/lock/Lock.java164
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/lock/LockCallback.java (renamed from policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiConstants.java)31
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/lock/LockImpl.java158
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/lock/LockState.java43
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApi.java168
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockManager.java227
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/lock/SimpleLockManager.java384
-rw-r--r--policy-core/src/test/java/org/onap/policy/drools/core/lock/AlwaysFailLockTest.java106
-rw-r--r--policy-core/src/test/java/org/onap/policy/drools/core/lock/LockImplTest.java261
-rw-r--r--policy-core/src/test/java/org/onap/policy/drools/core/lock/LockTest.java134
-rw-r--r--policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiTest.java92
-rw-r--r--policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockManagerTest.java595
-rw-r--r--policy-core/src/test/java/org/onap/policy/drools/core/lock/SimpleLockManagerTest.java527
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java23
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java33
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java139
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java426
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManagerException.java34
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockProperties.java52
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java164
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerExceptionTest.java35
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java781
38 files changed, 5406 insertions, 3300 deletions
diff --git a/feature-distributed-locking/pom.xml b/feature-distributed-locking/pom.xml
index af777efa..1b6063b0 100644
--- a/feature-distributed-locking/pom.xml
+++ b/feature-distributed-locking/pom.xml
@@ -2,7 +2,7 @@
============LICENSE_START=======================================================
ONAP Policy Engine - Drools PDP
================================================================================
- Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
================================================================================
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -117,6 +117,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
<scope>test</scope>
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java
new file mode 100644
index 00000000..523c0d93
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java
@@ -0,0 +1,936 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.commons.dbcp2.BasicDataSourceFactory;
+import org.onap.policy.common.utils.network.NetworkUtil;
+import org.onap.policy.drools.core.lock.AlwaysFailLock;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.LockImpl;
+import org.onap.policy.drools.core.lock.LockState;
+import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
+import org.onap.policy.drools.features.PolicyEngineFeatureApi;
+import org.onap.policy.drools.persistence.SystemPersistenceConstants;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Distributed implementation of the Lock Feature. Maintains locks across servers using a
+ * shared DB.
+ *
+ * <p/>
+ * Note: this implementation does <i>not</i> honor the waitForLocks={@code true}
+ * parameter.
+ *
+ * <p/>
+ * Additional Notes:
+ * <dl>
+ * <li>The <i>owner</i> field in the DB is not derived from the lock's owner info, but is
+ * instead populated with the {@link #uuidString}.</li>
+ * <li>A periodic check of the DB is made to determine if any of the locks have
+ * expired.</li>
+ * <li>When a lock is deserialized, it will not initially appear in this feature's map; it
+ * will be added to the map once free() or extend() is invoked, provided there isn't
+ * already an entry. In addition, it initially has the host and UUID of the feature
+ * instance that created it. However, as soon as doExtend() completes successfully, the
+ * host and UUID of the lock will be updated to reflect the values within this feature
+ * instance.</li>
+ * </dl>
+ */
+public class DistributedLockManager
+ implements PolicyResourceLockManager, PolicyEngineFeatureApi {
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class);
+
+ private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
+ private static final String LOCK_LOST_MSG = "lock lost";
+ private static final String NOT_LOCKED_MSG = "not locked";
+
+ @Getter(AccessLevel.PROTECTED)
+ @Setter(AccessLevel.PROTECTED)
+ private static DistributedLockManager latestInstance = null;
+
+
+ /**
+ * Name of the host on which this JVM is running.
+ */
+ @Getter
+ private final String hostName;
+
+ /**
+ * UUID of this object.
+ */
+ @Getter
+ private final String uuidString = UUID.randomUUID().toString();
+
+ /**
+ * Maps a resource to the lock that owns it, or is awaiting a request for it. Once a
+ * lock is added to the map, it remains in the map until the lock is lost or until the
+ * unlock request completes.
+ */
+ private final Map<String, DistributedLock> resource2lock = new ConcurrentHashMap<>();
+
+ /**
+ * Engine with which this manager is associated.
+ */
+ private PolicyEngine engine;
+
+ /**
+ * Feature properties.
+ */
+ private DistributedLockProperties featProps;
+
+ /**
+ * Thread pool used to check for lock expiration and to notify owners when locks are
+ * granted or lost.
+ */
+ private ScheduledExecutorService exsvc = null;
+
+ /**
+ * Data source used to connect to the DB.
+ */
+ private BasicDataSource dataSource = null;
+
+
+ /**
+ * Constructs the object.
+ */
+ public DistributedLockManager() {
+ this.hostName = NetworkUtil.getHostname();
+ }
+
+ @Override
+ public int getSequenceNumber() {
+ return 1000;
+ }
+
+ @Override
+ public boolean isAlive() {
+ return (exsvc != null);
+ }
+
+ @Override
+ public boolean start() {
+ // handled via engine API
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ // handled via engine API
+ return true;
+ }
+
+ @Override
+ public void shutdown() {
+ // handled via engine API
+ }
+
+ @Override
+ public boolean isLocked() {
+ return false;
+ }
+
+ @Override
+ public boolean lock() {
+ return true;
+ }
+
+ @Override
+ public boolean unlock() {
+ return true;
+ }
+
+ @Override
+ public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) {
+
+ try {
+ this.engine = engine;
+ this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME));
+ this.exsvc = getThreadPool();
+ this.dataSource = makeDataSource();
+
+ return this;
+
+ } catch (Exception e) {
+ throw new DistributedLockManagerException(e);
+ }
+ }
+
+ @Override
+ public boolean afterStart(PolicyEngine engine) {
+
+ try {
+ exsvc.execute(this::deleteExpiredDbLocks);
+ exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
+
+ setLatestInstance(this);
+
+ } catch (Exception e) {
+ throw new DistributedLockManagerException(e);
+ }
+
+ return false;
+ }
+
+ /**
+ * Make data source.
+ *
+ * @return a new, pooled data source
+ * @throws Exception exception
+ */
+ protected BasicDataSource makeDataSource() throws Exception {
+ Properties props = new Properties();
+ props.put("driverClassName", featProps.getDbDriver());
+ props.put("url", featProps.getDbUrl());
+ props.put("username", featProps.getDbUser());
+ props.put("password", featProps.getDbPwd());
+ props.put("testOnBorrow", "true");
+ props.put("poolPreparedStatements", "true");
+
+ // additional properties are listed in the GenericObjectPool API
+
+ return BasicDataSourceFactory.createDataSource(props);
+ }
+
+ /**
+ * Deletes expired locks from the DB.
+ */
+ private void deleteExpiredDbLocks() {
+ logger.info("deleting all expired locks from the DB");
+
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn
+ .prepareStatement("DELETE FROM pooling.locks WHERE expirationTime <= now()")) {
+
+ int ndel = stmt.executeUpdate();
+ logger.info("deleted {} expired locks from the DB", ndel);
+
+ } catch (SQLException e) {
+ logger.warn("failed to delete expired locks from the DB", e);
+ }
+ }
+
+ /**
+ * Closes the data source. Does <i>not</i> invoke any lock call-backs.
+ */
+ @Override
+ public boolean afterStop(PolicyEngine engine) {
+ exsvc = null;
+ closeDataSource();
+ return false;
+ }
+
+ /**
+ * Closes {@link #dataSource} and sets it to {@code null}.
+ */
+ private void closeDataSource() {
+ try {
+ if (dataSource != null) {
+ dataSource.close();
+ }
+
+ } catch (SQLException e) {
+ logger.error("cannot close the distributed locking DB", e);
+ }
+
+ dataSource = null;
+ }
+
+ @Override
+ public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback,
+ boolean waitForLock) {
+
+ if (latestInstance != this) {
+ AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback);
+ lock.notifyUnavailable();
+ return lock;
+ }
+
+ DistributedLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback);
+
+ DistributedLock existingLock = resource2lock.putIfAbsent(resourceId, lock);
+
+ // do these outside of compute() to avoid blocking other map operations
+ if (existingLock == null) {
+ logger.debug("added lock to map {}", lock);
+ lock.scheduleRequest(lock::doLock);
+ } else {
+ lock.deny("resource is busy", true);
+ }
+
+ return lock;
+ }
+
+ /**
+ * Checks for expired locks.
+ */
+ private void checkExpired() {
+
+ try {
+ logger.info("checking for expired locks");
+ Set<String> expiredIds = new HashSet<>(resource2lock.keySet());
+ identifyDbLocks(expiredIds);
+ expireLocks(expiredIds);
+
+ exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
+
+ } catch (RejectedExecutionException e) {
+ logger.warn("thread pool is no longer accepting requests", e);
+
+ } catch (SQLException | RuntimeException e) {
+ logger.error("error checking expired locks", e);
+ exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS);
+ }
+
+ logger.info("done checking for expired locks");
+ }
+
+ /**
+ * Identifies this feature instance's locks that the DB indicates are still active.
+ *
+ * @param expiredIds IDs of resources that have expired locks. If a resource is still
+ * locked, it's ID is removed from this set
+ * @throws SQLException if a DB error occurs
+ */
+ private void identifyDbLocks(Set<String> expiredIds) throws SQLException {
+ /*
+ * We could query for host and UUIDs that actually appear within the locks, but
+ * those might change while the query is running so no real value in doing that.
+ * On the other hand, there's only a brief instance between the time a
+ * deserialized lock is added to this feature instance and its doExtend() method
+ * updates its host and UUID to match this feature instance. If this happens to
+ * run during that brief instance, then the lock will be lost and the callback
+ * invoked. It isn't worth complicating this code further to handle those highly
+ * unlikely cases.
+ */
+
+ // @formatter:off
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn.prepareStatement(
+ "SELECT resourceId FROM pooling.locks WHERE host=? AND owner=? AND expirationTime > now()")) {
+ // @formatter:on
+
+ stmt.setString(1, hostName);
+ stmt.setString(2, uuidString);
+
+ try (ResultSet resultSet = stmt.executeQuery()) {
+ while (resultSet.next()) {
+ String resourceId = resultSet.getString(1);
+
+ // we have now seen this resource id
+ expiredIds.remove(resourceId);
+ }
+ }
+ }
+ }
+
+ /**
+ * Expires locks for the resources that no longer appear within the DB.
+ *
+ * @param expiredIds IDs of resources that have expired locks
+ */
+ private void expireLocks(Set<String> expiredIds) {
+ for (String resourceId : expiredIds) {
+ AtomicReference<DistributedLock> lockref = new AtomicReference<>(null);
+
+ resource2lock.computeIfPresent(resourceId, (key, lock) -> {
+ if (lock.isActive()) {
+ // it thinks it's active, but it isn't - remove from the map
+ lockref.set(lock);
+ return null;
+ }
+
+ return lock;
+ });
+
+ DistributedLock lock = lockref.get();
+ if (lock != null) {
+ logger.debug("removed lock from map {}", lock);
+ lock.deny(LOCK_LOST_MSG, false);
+ }
+ }
+ }
+
+ /**
+ * Distributed Lock implementation.
+ */
+ public static class DistributedLock extends LockImpl {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Feature containing this lock. May be {@code null} until the feature is
+ * identified. Note: this can only be null if the lock has been de-serialized.
+ */
+ private transient DistributedLockManager feature;
+
+ /**
+ * Host name from the feature instance that created this object. Replaced with the
+ * host name from the current feature instance whenever the lock is successfully
+ * extended.
+ */
+ private String hostName;
+
+ /**
+ * UUID string from the feature instance that created this object. Replaced with
+ * the UUID string from the current feature instance whenever the lock is
+ * successfully extended.
+ */
+ private String uuidString;
+
+ /**
+ * {@code True} if the lock is busy making a request, {@code false} otherwise.
+ */
+ private transient boolean busy = false;
+
+ /**
+ * Request to be performed.
+ */
+ private transient RunnableWithEx request = null;
+
+ /**
+ * Number of times we've retried a request.
+ */
+ private transient int nretries = 0;
+
+ /**
+ * Constructs the object.
+ */
+ public DistributedLock() {
+ this.hostName = "";
+ this.uuidString = "";
+ }
+
+ /**
+ * Constructs the object.
+ *
+ * @param state initial state of the lock
+ * @param resourceId identifier of the resource to be locked
+ * @param ownerKey information identifying the owner requesting the lock
+ * @param holdSec amount of time, in seconds, for which the lock should be held,
+ * after which it will automatically be released
+ * @param callback callback to be invoked once the lock is granted, or
+ * subsequently lost; must not be {@code null}
+ * @param feature feature containing this lock
+ */
+ public DistributedLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback,
+ DistributedLockManager feature) {
+ super(state, resourceId, ownerKey, holdSec, callback);
+
+ this.feature = feature;
+ this.hostName = feature.hostName;
+ this.uuidString = feature.uuidString;
+ }
+
+ /**
+ * Grants this lock. The notification is <i>always</i> invoked via the
+ * <i>foreground</i> thread.
+ */
+ protected void grant() {
+ synchronized (this) {
+ if (isUnavailable()) {
+ return;
+ }
+
+ setState(LockState.ACTIVE);
+ }
+
+ logger.info("lock granted: {}", this);
+
+ notifyAvailable();
+ }
+
+ /**
+ * Permanently denies this lock.
+ *
+ * @param reason the reason the lock was denied
+ * @param foreground {@code true} if the callback can be invoked in the current
+ * (i.e., foreground) thread, {@code false} if it should be invoked via the
+ * executor
+ */
+ protected void deny(String reason, boolean foreground) {
+ synchronized (this) {
+ setState(LockState.UNAVAILABLE);
+ }
+
+ logger.info("{}: {}", reason, this);
+
+ if (feature == null || foreground) {
+ notifyUnavailable();
+
+ } else {
+ feature.exsvc.execute(this::notifyUnavailable);
+ }
+ }
+
+ @Override
+ public boolean free() {
+ // do a quick check of the state
+ if (isUnavailable()) {
+ return false;
+ }
+
+ logger.info("releasing lock: {}", this);
+
+ if (!attachFeature()) {
+ setState(LockState.UNAVAILABLE);
+ return false;
+ }
+
+ AtomicBoolean result = new AtomicBoolean(false);
+
+ feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
+ if (curlock == this && !isUnavailable()) {
+ // this lock was the owner
+ result.set(true);
+ setState(LockState.UNAVAILABLE);
+
+ /*
+ * NOTE: do NOT return null; curlock must remain until doUnlock
+ * completes.
+ */
+ }
+
+ return curlock;
+ });
+
+ if (result.get()) {
+ scheduleRequest(this::doUnlock);
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public void extend(int holdSec, LockCallback callback) {
+ if (holdSec < 0) {
+ throw new IllegalArgumentException("holdSec is negative");
+ }
+
+ setHoldSec(holdSec);
+ setCallback(callback);
+
+ // do a quick check of the state
+ if (isUnavailable() || !attachFeature()) {
+ deny(LOCK_LOST_MSG, true);
+ return;
+ }
+
+ AtomicBoolean success = new AtomicBoolean(false);
+
+ feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
+ if (curlock == this && !isUnavailable()) {
+ success.set(true);
+ setState(LockState.WAITING);
+ }
+
+ // note: leave it in the map until doUnlock() removes it
+
+ return curlock;
+ });
+
+ if (success.get()) {
+ scheduleRequest(this::doExtend);
+
+ } else {
+ deny(NOT_LOCKED_MSG, true);
+ }
+ }
+
+ /**
+ * Attaches to the feature instance, if not already attached.
+ *
+ * @return {@code true} if the lock is now attached to a feature, {@code false}
+ * otherwise
+ */
+ private synchronized boolean attachFeature() {
+ if (feature != null) {
+ // already attached
+ return true;
+ }
+
+ feature = latestInstance;
+ if (feature == null) {
+ logger.warn("no feature yet for {}", this);
+ return false;
+ }
+
+ // put this lock into the map
+ feature.resource2lock.putIfAbsent(getResourceId(), this);
+
+ return true;
+ }
+
+ /**
+ * Schedules a request for execution.
+ *
+ * @param schedreq the request that should be scheduled
+ */
+ private synchronized void scheduleRequest(RunnableWithEx schedreq) {
+ logger.debug("schedule lock action {}", this);
+ nretries = 0;
+ request = schedreq;
+ feature.exsvc.execute(this::doRequest);
+ }
+
+ /**
+ * Reschedules a request for execution, if there is not already a request in the
+ * queue, and if the retry count has not been exhausted.
+ *
+ * @param req request to be rescheduled
+ */
+ private void rescheduleRequest(RunnableWithEx req) {
+ synchronized (this) {
+ if (request != null) {
+ // a new request has already been scheduled - it supersedes "req"
+ logger.debug("not rescheduling lock action {}", this);
+ return;
+ }
+
+ if (nretries++ < feature.featProps.getMaxRetries()) {
+ logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this);
+ request = req;
+ feature.exsvc.schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS);
+ return;
+ }
+ }
+
+ logger.warn("retry count {} exhausted for lock: {}", feature.featProps.getMaxRetries(), this);
+ removeFromMap();
+ }
+
+ /**
+ * Gets, and removes, the next request from the queue. Clears {@link #busy} if
+ * there are no more requests in the queue.
+ *
+ * @param prevReq the previous request that was just run
+ *
+ * @return the next request, or {@code null} if the queue is empty
+ */
+ private synchronized RunnableWithEx getNextRequest(RunnableWithEx prevReq) {
+ if (request == null || request == prevReq) {
+ logger.debug("no more requests for {}", this);
+ busy = false;
+ return null;
+ }
+
+ RunnableWithEx req = request;
+ request = null;
+
+ return req;
+ }
+
+ /**
+ * Executes the current request, if none are currently executing.
+ */
+ private void doRequest() {
+ synchronized (this) {
+ if (busy) {
+ // another thread is already processing the request(s)
+ return;
+ }
+ busy = true;
+ }
+
+ /*
+ * There is a race condition wherein this thread could invoke run() while the
+ * next scheduled thread checks the busy flag and finds that work is being
+ * done and returns, leaving the next work item in "request". In that case,
+ * the next work item may never be executed, thus we use a loop here, instead
+ * of just executing a single request.
+ */
+ RunnableWithEx req = null;
+ while ((req = getNextRequest(req)) != null) {
+ if (feature.resource2lock.get(getResourceId()) != this) {
+ /*
+ * no longer in the map - don't apply the action, as it may interfere
+ * with any newly added Lock object
+ */
+ logger.debug("discard lock action {}", this);
+ synchronized (this) {
+ busy = false;
+ }
+ return;
+ }
+
+ try {
+ /*
+ * Run the request. If it throws an exception, then it will be
+ * rescheduled for execution a little later.
+ */
+ req.run();
+
+ } catch (SQLException e) {
+ logger.warn("request failed for lock: {}", this, e);
+
+ if (feature.featProps.isTransient(e.getErrorCode())) {
+ // retry the request a little later
+ rescheduleRequest(req);
+ } else {
+ removeFromMap();
+ }
+
+ } catch (RuntimeException e) {
+ logger.warn("request failed for lock: {}", this, e);
+ removeFromMap();
+ }
+ }
+ }
+
+ /**
+ * Attempts to add a lock to the DB. Generates a callback, indicating success or
+ * failure.
+ *
+ * @throws SQLException if a DB error occurs
+ */
+ private void doLock() throws SQLException {
+ if (!isWaiting()) {
+ logger.debug("discard doLock {}", this);
+ return;
+ }
+
+ /*
+ * There is a small window in which a client could invoke free() before the DB
+ * is updated. In that case, doUnlock will be added to the queue to run after
+ * this, which will delete the record, as desired. In addition, grant() will
+ * not do anything, because the lock state will have been set to UNAVAILABLE
+ * by free().
+ */
+
+ logger.debug("doLock {}", this);
+ try (Connection conn = feature.dataSource.getConnection()) {
+ boolean success = false;
+ try {
+ success = doDbInsert(conn);
+
+ } catch (SQLException e) {
+ logger.info("failed to insert lock record - attempting update: {}", this, e);
+ success = doDbUpdate(conn);
+ }
+
+ if (success) {
+ grant();
+ return;
+ }
+ }
+
+ removeFromMap();
+ }
+
+ /**
+ * Attempts to remove a lock from the DB. Does <i>not</i> generate a callback if
+ * it fails, as this should only be executed in response to a call to
+ * {@link #free()}.
+ *
+ * @throws SQLException if a DB error occurs
+ */
+ private void doUnlock() throws SQLException {
+ logger.debug("unlock {}", this);
+ try (Connection conn = feature.dataSource.getConnection()) {
+ doDbDelete(conn);
+ }
+
+ removeFromMap();
+ }
+
+ /**
+ * Attempts to extend a lock in the DB. Generates a callback, indicating success
+ * or failure.
+ *
+ * @throws SQLException if a DB error occurs
+ */
+ private void doExtend() throws SQLException {
+ if (!isWaiting()) {
+ logger.debug("discard doExtend {}", this);
+ return;
+ }
+
+ /*
+ * There is a small window in which a client could invoke free() before the DB
+ * is updated. In that case, doUnlock will be added to the queue to run after
+ * this, which will delete the record, as desired. In addition, grant() will
+ * not do anything, because the lock state will have been set to UNAVAILABLE
+ * by free().
+ */
+
+ logger.debug("doExtend {}", this);
+ try (Connection conn = feature.dataSource.getConnection()) {
+ /*
+ * invoker may have called extend() before free() had a chance to insert
+ * the record, thus we have to try to insert, if the update fails
+ */
+ if (doDbUpdate(conn) || doDbInsert(conn)) {
+ grant();
+ return;
+ }
+ }
+
+ removeFromMap();
+ }
+
+ /**
+ * Inserts the lock into the DB.
+ *
+ * @param conn DB connection
+ * @return {@code true} if a record was successfully inserted, {@code false}
+ * otherwise
+ * @throws SQLException if a DB error occurs
+ */
+ protected boolean doDbInsert(Connection conn) throws SQLException {
+ logger.debug("insert lock record {}", this);
+ try (PreparedStatement stmt =
+ conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
+ + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
+
+ stmt.setString(1, getResourceId());
+ stmt.setString(2, feature.hostName);
+ stmt.setString(3, feature.uuidString);
+ stmt.setInt(4, getHoldSec());
+
+ stmt.executeUpdate();
+
+ this.hostName = feature.hostName;
+ this.uuidString = feature.uuidString;
+
+ return true;
+ }
+ }
+
+ /**
+ * Updates the lock in the DB.
+ *
+ * @param conn DB connection
+ * @return {@code true} if a record was successfully updated, {@code false}
+ * otherwise
+ * @throws SQLException if a DB error occurs
+ */
+ protected boolean doDbUpdate(Connection conn) throws SQLException {
+ logger.debug("update lock record {}", this);
+ try (PreparedStatement stmt =
+ conn.prepareStatement("UPDATE pooling.locks SET resourceId=?, host=?, owner=?,"
+ + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?"
+ + " AND ((host=? AND owner=?) OR expirationTime < now())")) {
+
+ stmt.setString(1, getResourceId());
+ stmt.setString(2, feature.hostName);
+ stmt.setString(3, feature.uuidString);
+ stmt.setInt(4, getHoldSec());
+
+ stmt.setString(5, getResourceId());
+ stmt.setString(6, this.hostName);
+ stmt.setString(7, this.uuidString);
+
+ if (stmt.executeUpdate() != 1) {
+ return false;
+ }
+
+ this.hostName = feature.hostName;
+ this.uuidString = feature.uuidString;
+
+ return true;
+ }
+ }
+
+ /**
+ * Deletes the lock from the DB.
+ *
+ * @param conn DB connection
+ * @throws SQLException if a DB error occurs
+ */
+ protected void doDbDelete(Connection conn) throws SQLException {
+ logger.debug("delete lock record {}", this);
+ try (PreparedStatement stmt =
+ conn.prepareStatement("DELETE pooling.locks WHERE resourceId=? AND host=? AND owner=?")) {
+
+ stmt.setString(1, getResourceId());
+ stmt.setString(2, this.hostName);
+ stmt.setString(3, this.uuidString);
+
+ stmt.executeUpdate();
+ }
+ }
+
+ /**
+ * Removes the lock from the map, and sends a notification using the current
+ * thread.
+ */
+ private void removeFromMap() {
+ logger.debug("remove lock from map {}", this);
+ feature.resource2lock.remove(getResourceId(), this);
+
+ synchronized (this) {
+ if (!isUnavailable()) {
+ deny(LOCK_LOST_MSG, true);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "DistributedLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey="
+ + getOwnerKey() + ", holdSec=" + getHoldSec() + ", hostName=" + hostName + ", uuidString="
+ + uuidString + "]";
+ }
+ }
+
+ @FunctionalInterface
+ private static interface RunnableWithEx {
+ void run() throws SQLException;
+ }
+
+ // these may be overridden by junit tests
+
+ protected Properties getProperties(String fileName) {
+ return SystemPersistenceConstants.getManager().getProperties(fileName);
+ }
+
+ protected ScheduledExecutorService getThreadPool() {
+ return engine.getExecutorService();
+ }
+
+ protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+ return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this);
+ }
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java
index 55fc4fab..e720f9a1 100644
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* feature-distributed-locking
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,15 +20,15 @@
package org.onap.policy.distributed.locking;
-public class DistributedLockingFeatureException extends RuntimeException {
+public class DistributedLockManagerException extends RuntimeException {
private static final long serialVersionUID = 1L;
/**
* Constructor.
- *
+ *
* @param ex exception to be wrapped
*/
- public DistributedLockingFeatureException(Exception ex) {
+ public DistributedLockManagerException(Exception ex) {
super(ex);
}
}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java
new file mode 100644
index 00000000..f470c8e2
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java
@@ -0,0 +1,136 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.utils.properties.BeanConfigurator;
+import org.onap.policy.common.utils.properties.Property;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+
+
+@Getter
+@Setter
+public class DistributedLockProperties {
+ public static final String PREFIX = "distributed.locking.";
+
+ public static final String DB_DRIVER = "javax.persistence.jdbc.driver";
+ public static final String DB_URL = "javax.persistence.jdbc.url";
+ public static final String DB_USER = "javax.persistence.jdbc.user";
+ public static final String DB_PASS = "javax.persistence.jdbc.password";
+ public static final String TRANSIENT_ERROR_CODES = PREFIX + "transient.error.codes";
+ public static final String EXPIRE_CHECK_SEC = PREFIX + "expire.check.seconds";
+ public static final String RETRY_SEC = PREFIX + "retry.seconds";
+ public static final String MAX_RETRIES = PREFIX + "max.retries";
+
+ /**
+ * Database driver.
+ */
+ @Property(name = DB_DRIVER)
+ private String dbDriver;
+
+ /**
+ * Database url.
+ */
+ @Property(name = DB_URL)
+ private String dbUrl;
+
+ /**
+ * Database user.
+ */
+ @Property(name = DB_USER)
+ private String dbUser;
+
+ /**
+ * Database password.
+ */
+ @Property(name = DB_PASS)
+ private String dbPwd;
+
+ /**
+ * Vendor-specific error codes that are "transient", meaning they may go away if the
+ * command is repeated (e.g., connection issue), as opposed to something like a syntax
+ * error or a duplicate key.
+ */
+ @Property(name = TRANSIENT_ERROR_CODES)
+ private String errorCodeStrings;
+
+ private final Set<Integer> transientErrorCodes;
+
+ /**
+ * Time, in seconds, to wait between checks for expired locks.
+ */
+ @Property(name = EXPIRE_CHECK_SEC, defaultValue = "900")
+ private int expireCheckSec;
+
+ /**
+ * Number of seconds to wait before retrying, after a DB error.
+ */
+ @Property(name = RETRY_SEC, defaultValue = "60")
+ private int retrySec;
+
+ /**
+ * Maximum number of times to retry a DB operation.
+ */
+ @Property(name = MAX_RETRIES, defaultValue = "2")
+ private int maxRetries;
+
+ /**
+ * Constructs the object, populating fields from the properties.
+ *
+ * @param props properties from which to configure this
+ * @throws PropertyException if an error occurs
+ */
+ public DistributedLockProperties(Properties props) throws PropertyException {
+ new BeanConfigurator().configureFromProperties(this, props);
+
+ Set<Integer> set = new HashSet<>();
+ for (String text : errorCodeStrings.split(",")) {
+ text = text.trim();
+ if (text.isEmpty()) {
+ continue;
+ }
+
+ try {
+ set.add(Integer.valueOf(text));
+
+ } catch (NumberFormatException e) {
+ throw new PropertyException(TRANSIENT_ERROR_CODES, "errorCodeStrings", e);
+ }
+ }
+
+ transientErrorCodes = Collections.unmodifiableSet(set);
+ }
+
+ /**
+ * Determines if an error is transient.
+ *
+ * @param errorCode error code to check
+ * @return {@code true} if the error is transient, {@code false} otherwise
+ */
+ public boolean isTransient(int errorCode) {
+ return transientErrorCodes.contains(errorCode);
+ }
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java
deleted file mode 100644
index d5e07a30..00000000
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Properties;
-import java.util.UUID;
-import org.apache.commons.dbcp2.BasicDataSource;
-import org.apache.commons.dbcp2.BasicDataSourceFactory;
-import org.onap.policy.common.utils.properties.exception.PropertyException;
-import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi;
-import org.onap.policy.drools.features.PolicyEngineFeatureApi;
-import org.onap.policy.drools.persistence.SystemPersistenceConstants;
-import org.onap.policy.drools.system.PolicyEngine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DistributedLockingFeature implements PolicyEngineFeatureApi, PolicyResourceLockFeatureApi {
-
- /**
- * Logger instance.
- */
- private static final Logger logger = LoggerFactory.getLogger(DistributedLockingFeature.class);
-
- /**
- * Properties Configuration Name.
- */
- public static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
-
- /**
- * Properties for locking feature.
- */
- private DistributedLockingProperties lockProps;
-
- /**
- * Data source used to connect to the DB containing locks.
- */
- private BasicDataSource dataSource;
-
- /**
- * UUID.
- */
- private static final UUID uuid = UUID.randomUUID();
-
- @Override
- public int getSequenceNumber() {
- return 1000;
- }
-
- @Override
- public OperResult beforeLock(String resourceId, String owner, int holdSec) {
-
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.lock(holdSec) ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeRefresh(String resourceId, String owner, int holdSec) {
-
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.refresh(holdSec) ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeUnlock(String resourceId, String owner) {
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.unlock() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeIsLockedBy(String resourceId, String owner) {
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.isActive() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeIsLocked(String resourceId) {
- TargetLock lock = new TargetLock(resourceId, uuid, "dummyOwner", dataSource);
-
- return (lock.isLocked() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public boolean afterStart(PolicyEngine engine) {
-
- try {
- this.lockProps = new DistributedLockingProperties(SystemPersistenceConstants.getManager()
- .getProperties(DistributedLockingFeature.CONFIGURATION_PROPERTIES_NAME));
- this.dataSource = makeDataSource();
- } catch (PropertyException e) {
- logger.error("DistributedLockingFeature feature properies have not been loaded", e);
- throw new DistributedLockingFeatureException(e);
- } catch (InterruptedException e) {
- logger.error("DistributedLockingFeature failed to create data source", e);
- Thread.currentThread().interrupt();
- throw new DistributedLockingFeatureException(e);
- } catch (Exception e) {
- logger.error("DistributedLockingFeature failed to create data source", e);
- throw new DistributedLockingFeatureException(e);
- }
-
- cleanLockTable();
-
- return false;
- }
-
- /**
- * Make data source.
- *
- * @return a new, pooled data source
- * @throws Exception exception
- */
- protected BasicDataSource makeDataSource() throws Exception {
- Properties props = new Properties();
- props.put("driverClassName", lockProps.getDbDriver());
- props.put("url", lockProps.getDbUrl());
- props.put("username", lockProps.getDbUser());
- props.put("password", lockProps.getDbPwd());
- props.put("testOnBorrow", "true");
- props.put("poolPreparedStatements", "true");
-
- // additional properties are listed in the GenericObjectPool API
-
- return BasicDataSourceFactory.createDataSource(props);
- }
-
- /**
- * This method kills the heartbeat thread and calls refreshLockTable which removes
- * any records from the db where the current host is the owner.
- */
- @Override
- public boolean beforeShutdown(PolicyEngine engine) {
- cleanLockTable();
- return false;
- }
-
- /**
- * This method removes all records owned by the current host from the db.
- */
- private void cleanLockTable() {
-
- try (Connection conn = dataSource.getConnection();
- PreparedStatement statement = conn.prepareStatement(
- "DELETE FROM pooling.locks WHERE host = ? OR expirationTime < now()")
- ) {
-
- statement.setString(1, uuid.toString());
- statement.executeUpdate();
-
- } catch (SQLException e) {
- logger.error("error in refreshLockTable()", e);
- }
-
- }
-}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java
deleted file mode 100644
index 0ed5930d..00000000
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import java.util.Properties;
-import org.onap.policy.common.utils.properties.BeanConfigurator;
-import org.onap.policy.common.utils.properties.Property;
-import org.onap.policy.common.utils.properties.exception.PropertyException;
-
-
-public class DistributedLockingProperties {
-
- /**
- * Feature properties all begin with this prefix.
- */
- public static final String PREFIX = "distributed.locking.";
-
- public static final String DB_DRIVER = "javax.persistence.jdbc.driver";
- public static final String DB_URL = "javax.persistence.jdbc.url";
- public static final String DB_USER = "javax.persistence.jdbc.user";
- public static final String DB_PWD = "javax.persistence.jdbc.password";
-
- /**
- * Properties from which this was constructed.
- */
- private final Properties source;
-
- /**
- * Database driver.
- */
- @Property(name = DB_DRIVER)
- private String dbDriver;
-
- /**
- * Database url.
- */
- @Property(name = DB_URL)
- private String dbUrl;
-
- /**
- * Database user.
- */
- @Property(name = DB_USER)
- private String dbUser;
-
- /**
- * Database password.
- */
- @Property(name = DB_PWD)
- private String dbPwd;
-
- /**
- * Constructs the object, populating fields from the properties.
- *
- * @param props properties from which to configure this
- * @throws PropertyException if an error occurs
- */
- public DistributedLockingProperties(Properties props) throws PropertyException {
- source = props;
-
- new BeanConfigurator().configureFromProperties(this, props);
- }
-
-
- public Properties getSource() {
- return source;
- }
-
-
- public String getDbDriver() {
- return dbDriver;
- }
-
-
- public String getDbUrl() {
- return dbUrl;
- }
-
-
- public String getDbUser() {
- return dbUser;
- }
-
-
- public String getDbPwd() {
- return dbPwd;
- }
-
-
- public void setDbDriver(String dbDriver) {
- this.dbDriver = dbDriver;
- }
-
-
- public void setDbUrl(String dbUrl) {
- this.dbUrl = dbUrl;
- }
-
-
- public void setDbUser(String dbUser) {
- this.dbUser = dbUser;
- }
-
-
- public void setDbPwd(String dbPwd) {
- this.dbPwd = dbPwd;
- }
-
-}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java
deleted file mode 100644
index 42e1f92f..00000000
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.UUID;
-import org.apache.commons.dbcp2.BasicDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TargetLock {
-
- private static final Logger logger = LoggerFactory.getLogger(TargetLock.class);
-
- /**
- * The Target resource we want to lock.
- */
- private String resourceId;
-
- /**
- * Data source used to connect to the DB containing locks.
- */
- private BasicDataSource dataSource;
-
- /**
- * UUID .
- */
- private UUID uuid;
-
- /**
- * Owner.
- */
- private String owner;
-
- /**
- * Constructs a TargetLock object.
- *
- * @param resourceId ID of the entity we want to lock
- * @param dataSource used to connect to the DB containing locks
- */
- public TargetLock(String resourceId, UUID uuid, String owner, BasicDataSource dataSource) {
- this.resourceId = resourceId;
- this.uuid = uuid;
- this.owner = owner;
- this.dataSource = dataSource;
- }
-
- /**
- * Obtain a lock.
- * @param holdSec the amount of time, in seconds, that the lock should be held
- */
- public boolean lock(int holdSec) {
-
- return grabLock(holdSec);
- }
-
- /**
- * Refresh a lock.
- *
- * @param holdSec the amount of time, in seconds, that the lock should be held
- * @return {@code true} if the lock was refreshed, {@code false} if the resource is
- * not currently locked by the given owner
- */
- public boolean refresh(int holdSec) {
- return updateLock(holdSec);
- }
-
- /**
- * Unlock a resource by deleting it's associated record in the db.
- */
- public boolean unlock() {
- return deleteLock();
- }
-
- /**
- * "Grabs" lock by attempting to insert a new record in the db.
- * If the insert fails due to duplicate key error resource is already locked
- * so we call secondGrab.
- * @param holdSec the amount of time, in seconds, that the lock should be held
- */
- private boolean grabLock(int holdSec) {
-
- // try to insert a record into the table(thereby grabbing the lock)
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement statement = conn.prepareStatement(
- "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
- + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
-
- int index = 1;
- statement.setString(index++, this.resourceId);
- statement.setString(index++, this.uuid.toString());
- statement.setString(index++, this.owner);
- statement.setInt(index++, holdSec);
- statement.executeUpdate();
- }
-
- catch (SQLException e) {
- logger.error("error in TargetLock.grabLock()", e);
- return secondGrab(holdSec);
- }
-
- return true;
- }
-
- /**
- * A second attempt at grabbing a lock. It first attempts to update the lock in case it is expired.
- * If that fails, it attempts to insert a new record again
- * @param holdSec the amount of time, in seconds, that the lock should be held
- */
- private boolean secondGrab(int holdSec) {
-
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement updateStatement = conn.prepareStatement(
- "UPDATE pooling.locks SET host = ?, owner = ?, "
- + "expirationTime = timestampadd(second, ?, now()) "
- + "WHERE resourceId = ? AND expirationTime < now()");
-
- PreparedStatement insertStatement = conn.prepareStatement(
- "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
- + "values (?, ?, ?, timestampadd(second, ?, now()))");) {
-
- int index = 1;
- updateStatement.setString(index++, this.uuid.toString());
- updateStatement.setString(index++, this.owner);
- updateStatement.setInt(index++, holdSec);
- updateStatement.setString(index++, this.resourceId);
-
- // The lock was expired and we grabbed it.
- // return true
- if (updateStatement.executeUpdate() == 1) {
- return true;
- }
-
- // If our update does not return 1 row, the lock either has not expired
- // or it was removed. Try one last grab
- else {
- index = 1;
- insertStatement.setString(index++, this.resourceId);
- insertStatement.setString(index++, this.uuid.toString());
- insertStatement.setString(index++, this.owner);
- insertStatement.setInt(index++, holdSec);
-
- // If our insert returns 1 we successfully grabbed the lock
- return (insertStatement.executeUpdate() == 1);
- }
-
- } catch (SQLException e) {
- logger.error("error in TargetLock.secondGrab()", e);
- return false;
- }
-
- }
-
- /**
- * Updates the DB record associated with the lock.
- *
- * @param holdSec the amount of time, in seconds, that the lock should be held
- * @return {@code true} if the record was updated, {@code false} otherwise
- */
- private boolean updateLock(int holdSec) {
-
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement updateStatement = conn.prepareStatement(
- "UPDATE pooling.locks SET host = ?, owner = ?, "
- + "expirationTime = timestampadd(second, ?, now()) "
- + "WHERE resourceId = ? AND owner = ? AND expirationTime >= now()")) {
-
- int index = 1;
- updateStatement.setString(index++, this.uuid.toString());
- updateStatement.setString(index++, this.owner);
- updateStatement.setInt(index++, holdSec);
- updateStatement.setString(index++, this.resourceId);
- updateStatement.setString(index++, this.owner);
-
- // refresh succeeded iff a record was updated
- return (updateStatement.executeUpdate() == 1);
-
- } catch (SQLException e) {
- logger.error("error in TargetLock.refreshLock()", e);
- return false;
- }
-
- }
-
- /**
- *To remove a lock we simply delete the record from the db .
- */
- private boolean deleteLock() {
-
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement deleteStatement = conn.prepareStatement(
- "DELETE FROM pooling.locks WHERE resourceId = ? AND owner = ? AND host = ?")) {
-
- deleteStatement.setString(1, this.resourceId);
- deleteStatement.setString(2, this.owner);
- deleteStatement.setString(3, this.uuid.toString());
-
- return (deleteStatement.executeUpdate() == 1);
-
- } catch (SQLException e) {
- logger.error("error in TargetLock.deleteLock()", e);
- return false;
- }
-
- }
-
- /**
- * Is the lock active.
- */
- public boolean isActive() {
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement selectStatement = conn.prepareStatement(
- "SELECT * FROM pooling.locks "
- + "WHERE resourceId = ? AND host = ? AND owner= ? AND expirationTime >= now()")) {
-
- selectStatement.setString(1, this.resourceId);
- selectStatement.setString(2, this.uuid.toString());
- selectStatement.setString(3, this.owner);
- try (ResultSet result = selectStatement.executeQuery()) {
-
- // This will return true if the
- // query returned at least one row
- return result.first();
- }
-
- }
-
- catch (SQLException e) {
- logger.error("error in TargetLock.isActive()", e);
- return false;
- }
-
- }
-
- /**
- * Is the resource locked.
- */
- public boolean isLocked() {
-
- try (Connection conn = dataSource.getConnection();
- PreparedStatement selectStatement = conn
- .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND expirationTime >= now()")) {
-
- selectStatement.setString(1, this.resourceId);
- try (ResultSet result = selectStatement.executeQuery()) {
- // This will return true if the
- // query returned at least one row
- return result.first();
- }
- } catch (SQLException e) {
- logger.error("error in TargetLock.isActive()", e);
- return false;
- }
- }
-
-}
diff --git a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi
deleted file mode 100644
index 19bdf505..00000000
--- a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi
+++ /dev/null
@@ -1 +0,0 @@
-org.onap.policy.distributed.locking.DistributedLockingFeature \ No newline at end of file
diff --git a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi
index 19bdf505..2a7c0547 100644
--- a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi
+++ b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi
@@ -1 +1 @@
-org.onap.policy.distributed.locking.DistributedLockingFeature \ No newline at end of file
+org.onap.policy.distributed.locking.DistributedLockManager \ No newline at end of file
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureExceptionTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerExceptionTest.java
index 7ba2384a..cfd6a151 100644
--- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureExceptionTest.java
+++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerExceptionTest.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* feature-distributed-locking
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,13 +24,13 @@ import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.onap.policy.common.utils.test.ExceptionsTester;
-import org.onap.policy.distributed.locking.DistributedLockingFeatureException;
+import org.onap.policy.distributed.locking.DistributedLockManagerException;
-public class DistributedLockingFeatureExceptionTest extends ExceptionsTester {
+public class DistributedLockManagerExceptionTest extends ExceptionsTester {
@Test
public void test() {
- assertEquals(1, test(DistributedLockingFeatureException.class));
+ assertEquals(1, test(DistributedLockManagerException.class));
}
}
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java
new file mode 100644
index 00000000..59b56224
--- /dev/null
+++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java
@@ -0,0 +1,1818 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.utils.services.OrderedServiceImpl;
+import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.LockState;
+import org.onap.policy.drools.features.PolicyEngineFeatureApi;
+import org.onap.policy.drools.persistence.SystemPersistenceConstants;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.onap.policy.drools.system.PolicyEngineConstants;
+import org.powermock.reflect.Whitebox;
+
+public class DistributedLockManagerTest {
+ private static final long EXPIRE_SEC = 900L;
+ private static final long RETRY_SEC = 60L;
+ private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
+ private static final String OTHER_HOST = "other-host";
+ private static final String OTHER_OWNER = "other-owner";
+ private static final String EXPECTED_EXCEPTION = "expected exception";
+ private static final String DB_CONNECTION =
+ "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
+ private static final String DB_USER = "user";
+ private static final String DB_PASSWORD = "password";
+ private static final String OWNER_KEY = "my key";
+ private static final String RESOURCE = "my resource";
+ private static final String RESOURCE2 = "my resource #2";
+ private static final String RESOURCE3 = "my resource #3";
+ private static final String RESOURCE4 = "my resource #4";
+ private static final String RESOURCE5 = "my resource #5";
+ private static final int HOLD_SEC = 100;
+ private static final int HOLD_SEC2 = 120;
+ private static final int MAX_THREADS = 5;
+ private static final int MAX_LOOPS = 100;
+ private static final int TRANSIENT = 500;
+ private static final int PERMANENT = 600;
+
+ // number of execute() calls before the first lock attempt
+ private static final int PRE_LOCK_EXECS = 1;
+
+ // number of execute() calls before the first schedule attempt
+ private static final int PRE_SCHED_EXECS = 1;
+
+ private static Connection conn = null;
+ private static ScheduledExecutorService saveExec;
+ private static ScheduledExecutorService realExec;
+
+ @Mock
+ private ScheduledExecutorService exsvc;
+
+ @Mock
+ private LockCallback callback;
+
+ @Mock
+ private BasicDataSource datasrc;
+
+ @Mock
+ private PolicyEngine engine;
+
+ private DistributedLock lock;
+
+ private AtomicInteger nactive;
+ private AtomicInteger nsuccesses;
+ private DistributedLockManager feature;
+
+
+ /**
+ * Configures the location of the property files and creates the DB.
+ *
+ * @throws SQLException if the DB cannot be created
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws SQLException {
+ SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
+
+ conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
+
+ try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
+ + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
+ + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
+ createStmt.executeUpdate();
+ }
+
+ saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
+
+ realExec = Executors.newScheduledThreadPool(3);
+ Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
+ }
+
+ /**
+ * Restores static fields.
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws SQLException {
+ Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
+ realExec.shutdown();
+ conn.close();
+ }
+
+ /**
+ * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
+ * tasks.
+ *
+ * @throws SQLException if the lock records cannot be deleted from the DB
+ */
+ @Before
+ public void setUp() throws SQLException {
+ MockitoAnnotations.initMocks(this);
+
+ nactive = new AtomicInteger(0);
+ nsuccesses = new AtomicInteger(0);
+
+ cleanDb();
+
+ when(engine.getExecutorService()).thenReturn(exsvc);
+
+ feature = new MyLockingFeature(true);
+ }
+
+ @After
+ public void tearDown() throws SQLException {
+ shutdownFeature();
+ cleanDb();
+ }
+
+ private void cleanDb() throws SQLException {
+ try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
+ stmt.executeUpdate();
+ }
+ }
+
+ private void shutdownFeature() {
+ if (feature != null) {
+ feature.afterStop(engine);
+ feature = null;
+ }
+ }
+
+ /**
+ * Tests that the feature is found in the expected service sets.
+ */
+ @Test
+ public void testServiceApis() {
+ assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
+ .anyMatch(obj -> obj instanceof DistributedLockManager));
+ }
+
+ /**
+ * Tests constructor() when properties are invalid.
+ */
+ @Test
+ public void testDistributedLockManagerInvalidProperties() {
+ // use properties containing an invalid value
+ Properties props = new Properties();
+ props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "abc");
+
+ feature = new MyLockingFeature(false) {
+ @Override
+ protected Properties getProperties(String fileName) {
+ return props;
+ }
+ };
+
+ assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
+ }
+
+ @Test
+ public void testGetSequenceNumber() {
+ assertEquals(1000, feature.getSequenceNumber());
+ }
+
+ @Test
+ public void testStartableApi() {
+ assertTrue(feature.isAlive());
+ assertTrue(feature.start());
+ assertTrue(feature.stop());
+ feature.shutdown();
+
+ // above should have had no effect
+ assertTrue(feature.isAlive());
+
+ feature.afterStop(engine);
+ assertFalse(feature.isAlive());
+ }
+
+ @Test
+ public void testLockApi() {
+ assertFalse(feature.isLocked());
+ assertTrue(feature.lock());
+ assertTrue(feature.unlock());
+ }
+
+ @Test
+ public void testBeforeCreateLockManager() {
+ assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
+ }
+
+ /**
+ * Tests beforeCreate(), when getProperties() throws a runtime exception.
+ */
+ @Test
+ public void testBeforeCreateLockManagerEx() {
+ shutdownFeature();
+
+ feature = new MyLockingFeature(false) {
+ @Override
+ protected Properties getProperties(String fileName) {
+ throw new IllegalArgumentException(EXPECTED_EXCEPTION);
+ }
+ };
+
+ assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, new Properties()))
+ .isInstanceOf(DistributedLockManagerException.class);
+ }
+
+ @Test
+ public void testAfterStart() {
+ // verify that cleanup & expire check are both added to the queue
+ verify(exsvc).execute(any());
+ verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
+ }
+
+ /**
+ * Tests afterStart(), when thread pool throws a runtime exception.
+ */
+ @Test
+ public void testAfterStartExInThreadPool() {
+ shutdownFeature();
+
+ feature = new MyLockingFeature(false);
+
+ when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
+ .thenThrow(new IllegalArgumentException(EXPECTED_EXCEPTION));
+
+ assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
+ }
+
+ @Test
+ public void testDeleteExpiredDbLocks() throws SQLException {
+ // add records: two expired, one not
+ insertRecord(RESOURCE, feature.getUuidString(), -1);
+ insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
+ insertRecord(RESOURCE3, OTHER_OWNER, 0);
+ insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
+
+ // get the clean-up function and execute it
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc).execute(captor.capture());
+
+ long tbegin = System.currentTimeMillis();
+ Runnable action = captor.getValue();
+ action.run();
+
+ assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
+ assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
+ assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
+ assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
+
+ assertEquals(2, getRecordCount());
+ }
+
+ /**
+ * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDeleteExpiredDbLocksEx() throws SQLException {
+ feature = new InvalidDbLockingFeature(TRANSIENT);
+
+ // get the clean-up function and execute it
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc).execute(captor.capture());
+
+ Runnable action = captor.getValue();
+
+ // should not throw an exception
+ action.run();
+ }
+
+ @Test
+ public void testAfterStop() {
+ shutdownFeature();
+
+ feature = new DistributedLockManager();
+
+ // shutdown without calling afterStart()
+
+ shutdownFeature();
+ }
+
+ /**
+ * Tests afterStop(), when the data source throws an exception when close() is called.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testAfterStopEx() throws SQLException {
+ shutdownFeature();
+
+ // use a data source that throws an exception when closed
+ feature = new InvalidDbLockingFeature(TRANSIENT);
+
+ shutdownFeature();
+ }
+
+ @Test
+ public void testCreateLock() throws SQLException {
+ verify(exsvc).execute(any());
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertTrue(lock.isWaiting());
+
+ verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
+
+ // this lock should fail
+ LockCallback callback2 = mock(LockCallback.class);
+ DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
+ assertTrue(lock2.isUnavailable());
+ verify(callback2, never()).lockAvailable(lock2);
+ verify(callback2).lockUnavailable(lock2);
+
+ // this should fail, too
+ LockCallback callback3 = mock(LockCallback.class);
+ DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
+ assertTrue(lock3.isUnavailable());
+ verify(callback3, never()).lockAvailable(lock3);
+ verify(callback3).lockUnavailable(lock3);
+
+ // no change to first
+ assertTrue(lock.isWaiting());
+
+ // no callbacks to the first lock
+ verify(callback, never()).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ assertTrue(lock.isWaiting());
+ assertEquals(0, getRecordCount());
+
+ runLock(0, 0);
+ assertTrue(lock.isActive());
+ assertEquals(1, getRecordCount());
+
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ // this should succeed
+ DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
+ assertTrue(lock4.isWaiting());
+
+ // after running checker, original records should still remain
+ runChecker(0, 0, EXPIRE_SEC);
+ assertEquals(1, getRecordCount());
+ verify(callback, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests lock() when the feature is not the latest instance.
+ */
+ @Test
+ public void testCreateLockNotLatestInstance() {
+ DistributedLockManager.setLatestInstance(null);
+
+ Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertTrue(lock.isUnavailable());
+ verify(callback, never()).lockAvailable(any());
+ verify(callback).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testCheckExpired() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
+ runLock(1, 0);
+
+ LockCallback callback3 = mock(LockCallback.class);
+ final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
+ runLock(2, 0);
+
+ LockCallback callback4 = mock(LockCallback.class);
+ final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
+ runLock(3, 0);
+
+ LockCallback callback5 = mock(LockCallback.class);
+ final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
+ runLock(4, 0);
+
+ assertEquals(5, getRecordCount());
+
+ // expire one record
+ updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
+
+ // change host of another record
+ updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
+
+ // change uuid of another record
+ updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC);
+
+
+ // run the checker
+ runChecker(0, 0, EXPIRE_SEC);
+
+
+ // check lock states
+ assertTrue(lock.isUnavailable());
+ assertTrue(lock2.isActive());
+ assertTrue(lock3.isUnavailable());
+ assertTrue(lock4.isActive());
+ assertTrue(lock5.isUnavailable());
+
+ // allow callbacks
+ runLock(5, 2);
+ runLock(6, 1);
+ runLock(7, 0);
+ verify(callback).lockUnavailable(lock);
+ verify(callback3).lockUnavailable(lock3);
+ verify(callback5).lockUnavailable(lock5);
+
+ verify(callback2, never()).lockUnavailable(lock2);
+ verify(callback4, never()).lockUnavailable(lock4);
+
+
+ // another check should have been scheduled, with the normal interval
+ runChecker(1, 0, EXPIRE_SEC);
+ }
+
+ /**
+ * Tests checkExpired(), when schedule() throws an exception.
+ */
+ @Test
+ public void testCheckExpiredExecRejected() {
+ // arrange for execution to be rejected
+ when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
+ .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
+
+ runChecker(0, 0, EXPIRE_SEC);
+ }
+
+ /**
+ * Tests checkExpired(), when getConnection() throws an exception.
+ */
+ @Test
+ public void testCheckExpiredSqlEx() {
+ // use a data source that throws an exception when getConnection() is called
+ feature = new InvalidDbLockingFeature(TRANSIENT);
+
+ runChecker(0, 0, EXPIRE_SEC);
+
+ // it should have scheduled another check, sooner
+ runChecker(0, 0, RETRY_SEC);
+ }
+
+ @Test
+ public void testExpireLocks() throws SQLException {
+ AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
+
+ feature = new MyLockingFeature(true) {
+ @Override
+ protected BasicDataSource makeDataSource() throws Exception {
+ // get the real data source
+ BasicDataSource src2 = super.makeDataSource();
+
+ when(datasrc.getConnection()).thenAnswer(answer -> {
+ DistributedLock lck = freeLock.getAndSet(null);
+ if (lck != null) {
+ // free it
+ lck.free();
+
+ // run its doUnlock
+ runLock(4, 0);
+ }
+
+ return src2.getConnection();
+ });
+
+ return datasrc;
+ }
+ };
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
+ runLock(1, 0);
+
+ LockCallback callback3 = mock(LockCallback.class);
+ final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
+ // don't run doLock for lock3 - leave it in the waiting state
+
+ LockCallback callback4 = mock(LockCallback.class);
+ final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
+ runLock(3, 0);
+
+ assertEquals(3, getRecordCount());
+
+ // expire one record
+ updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
+
+ // arrange to free lock4 while the checker is running
+ freeLock.set(lock4);
+
+ // run the checker
+ runChecker(0, 0, EXPIRE_SEC);
+
+
+ // check lock states
+ assertTrue(lock.isUnavailable());
+ assertTrue(lock2.isActive());
+ assertTrue(lock3.isWaiting());
+ assertTrue(lock4.isUnavailable());
+
+ runLock(5, 0);
+ verify(exsvc, times(PRE_LOCK_EXECS + 6)).execute(any());
+
+ verify(callback).lockUnavailable(lock);
+ verify(callback2, never()).lockUnavailable(lock2);
+ verify(callback3, never()).lockUnavailable(lock3);
+ verify(callback4, never()).lockUnavailable(lock4);
+ }
+
+ @Test
+ public void testDistributedLockNoArgs() {
+ DistributedLock lock = new DistributedLock();
+ assertNull(lock.getResourceId());
+ assertNull(lock.getOwnerKey());
+ assertNull(lock.getCallback());
+ assertEquals(0, lock.getHoldSec());
+ }
+
+ @Test
+ public void testDistributedLock() {
+ assertThatIllegalArgumentException()
+ .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
+ .withMessageContaining("holdSec");
+
+ // should generate no exception
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ }
+
+ @Test
+ public void testDistributedLockSerializable() throws Exception {
+ DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ lock = roundTrip(lock);
+
+ assertTrue(lock.isWaiting());
+
+ assertEquals(RESOURCE, lock.getResourceId());
+ assertEquals(OWNER_KEY, lock.getOwnerKey());
+ assertNull(lock.getCallback());
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+ }
+
+ @Test
+ public void testGrant() {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertFalse(lock.isActive());
+
+ // execute the doLock() call
+ runLock(0, 0);
+
+ assertTrue(lock.isActive());
+
+ // the callback for the lock should have been run in the foreground thread
+ verify(callback).lockAvailable(lock);
+ }
+
+ /**
+ * Tests grant() when the lock is already unavailable.
+ */
+ @Test
+ public void testDistributedLockGrantUnavailable() {
+ DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ lock.setState(LockState.UNAVAILABLE);
+ lock.grant();
+
+ assertTrue(lock.isUnavailable());
+ verify(callback, never()).lockAvailable(any());
+ verify(callback, never()).lockUnavailable(any());
+ }
+
+ @Test
+ public void testDistributedLockDeny() {
+ // get a lock
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // get another lock - should fail
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ assertTrue(lock.isUnavailable());
+
+ // the callback for the second lock should have been run in the foreground thread
+ verify(callback).lockUnavailable(lock);
+
+ // should only have a request for the first lock
+ verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
+ }
+
+ @Test
+ public void testDistributedLockFree() {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ assertTrue(lock.free());
+ assertTrue(lock.isUnavailable());
+
+ // run both requests associated with the lock
+ runLock(0, 1);
+ runLock(1, 0);
+
+ // should not have changed state
+ assertTrue(lock.isUnavailable());
+
+ // attempt to free it again
+ assertFalse(lock.free());
+
+ // should not have queued anything else
+ verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
+
+ // new lock should succeed
+ DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertTrue(lock2 != lock);
+ assertTrue(lock2.isWaiting());
+ }
+
+ /**
+ * Tests that free() works on a serialized lock with a new feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testDistributedLockFreeSerialized() throws Exception {
+ DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ feature = new MyLockingFeature(true);
+
+ lock = roundTrip(lock);
+ assertTrue(lock.free());
+ assertTrue(lock.isUnavailable());
+ }
+
+ /**
+ * Tests free() on a serialized lock without a feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testDistributedLockFreeNoFeature() throws Exception {
+ DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ DistributedLockManager.setLatestInstance(null);
+
+ lock = roundTrip(lock);
+ assertFalse(lock.free());
+ assertTrue(lock.isUnavailable());
+ }
+
+ /**
+ * Tests the case where the lock is freed and doUnlock called between the call to
+ * isUnavailable() and the call to compute().
+ */
+ @Test
+ public void testDistributedLockFreeUnlocked() {
+ feature = new FreeWithFreeLockingFeature(true);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ assertFalse(lock.free());
+ assertTrue(lock.isUnavailable());
+ }
+
+ /**
+ * Tests the case where the lock is freed, but doUnlock is not completed, between the
+ * call to isUnavailable() and the call to compute().
+ */
+ @Test
+ public void testDistributedLockFreeLockFreed() {
+ feature = new FreeWithFreeLockingFeature(false);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ assertFalse(lock.free());
+ assertTrue(lock.isUnavailable());
+ }
+
+ @Test
+ public void testDistributedLockExtend() {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // lock2 should be denied - called back by this thread
+ DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ verify(callback, never()).lockAvailable(lock2);
+ verify(callback).lockUnavailable(lock2);
+
+ // lock2 will still be denied - called back by this thread
+ lock2.extend(HOLD_SEC, callback);
+ verify(callback, times(2)).lockUnavailable(lock2);
+
+ // force lock2 to be active - should still be denied
+ Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
+ lock2.extend(HOLD_SEC, callback);
+ verify(callback, times(3)).lockUnavailable(lock2);
+
+ assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
+ .withMessageContaining("holdSec");
+
+ // execute doLock()
+ runLock(0, 0);
+ assertTrue(lock.isActive());
+
+ // now extend the first lock
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+ assertTrue(lock.isWaiting());
+
+ // execute doExtend()
+ runLock(1, 0);
+ lock.extend(HOLD_SEC2, callback2);
+ assertEquals(HOLD_SEC2, lock.getHoldSec());
+ verify(callback2).lockAvailable(lock);
+ verify(callback2, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests that extend() works on a serialized lock with a new feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testDistributedLockExtendSerialized() throws Exception {
+ DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // run doLock
+ runLock(0, 0);
+ assertTrue(lock.isActive());
+
+ feature = new MyLockingFeature(true);
+
+ lock = roundTrip(lock);
+ assertTrue(lock.isActive());
+
+ LockCallback scallback = mock(LockCallback.class);
+
+ lock.extend(HOLD_SEC, scallback);
+ assertTrue(lock.isWaiting());
+
+ // run doExtend (in new feature)
+ runLock(0, 0);
+ assertTrue(lock.isActive());
+
+ verify(scallback).lockAvailable(lock);
+ verify(scallback, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests extend() on a serialized lock without a feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testDistributedLockExtendNoFeature() throws Exception {
+ DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // run doLock
+ runLock(0, 0);
+ assertTrue(lock.isActive());
+
+ DistributedLockManager.setLatestInstance(null);
+
+ lock = roundTrip(lock);
+ assertTrue(lock.isActive());
+
+ LockCallback scallback = mock(LockCallback.class);
+
+ lock.extend(HOLD_SEC, scallback);
+ assertTrue(lock.isUnavailable());
+
+ verify(scallback, never()).lockAvailable(lock);
+ verify(scallback).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests the case where the lock is freed and doUnlock called between the call to
+ * isUnavailable() and the call to compute().
+ */
+ @Test
+ public void testDistributedLockExtendUnlocked() {
+ feature = new FreeWithFreeLockingFeature(true);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ lock.extend(HOLD_SEC2, callback);
+ assertTrue(lock.isUnavailable());
+ verify(callback).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests the case where the lock is freed, but doUnlock is not completed, between the
+ * call to isUnavailable() and the call to compute().
+ */
+ @Test
+ public void testDistributedLockExtendLockFreed() {
+ feature = new FreeWithFreeLockingFeature(false);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ lock.extend(HOLD_SEC2, callback);
+ assertTrue(lock.isUnavailable());
+ verify(callback).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testDistributedLockScheduleRequest() {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ verify(callback).lockAvailable(lock);
+ }
+
+ @Test
+ public void testDistributedLockRescheduleRequest() throws SQLException {
+ // use a data source that throws an exception when getConnection() is called
+ InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
+ feature = invfeat;
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should fail and reschedule
+ runLock(0, 0);
+
+ // should still be waiting
+ assertTrue(lock.isWaiting());
+ verify(callback, never()).lockUnavailable(lock);
+
+ // free the lock while doLock is executing
+ invfeat.freeLock = true;
+
+ // try scheduled request - should just invoke doUnlock
+ runSchedule(0, 0);
+
+ // should still be waiting
+ assertTrue(lock.isUnavailable());
+ verify(callback, never()).lockUnavailable(lock);
+
+ // should have scheduled a retry of doUnlock
+ verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
+ }
+
+ @Test
+ public void testDistributedLockGetNextRequest() {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ /*
+ * run doLock. This should cause getNextRequest() to be called twice, once with a
+ * request in the queue, and the second time with request=null.
+ */
+ runLock(0, 0);
+ }
+
+ /**
+ * Tests getNextRequest(), where the same request is still in the queue the second
+ * time it's called.
+ */
+ @Test
+ public void testDistributedLockGetNextRequestSameRequest() {
+ // force reschedule to be invoked
+ feature = new InvalidDbLockingFeature(TRANSIENT);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ /*
+ * run doLock. This should cause getNextRequest() to be called twice, once with a
+ * request in the queue, and the second time with the same request again.
+ */
+ runLock(0, 0);
+
+ verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
+ }
+
+ @Test
+ public void testDistributedLockDoRequest() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ assertTrue(lock.isWaiting());
+
+ // run doLock via doRequest
+ runLock(0, 0);
+
+ assertTrue(lock.isActive());
+ }
+
+ /**
+ * Tests doRequest(), when doRequest() is already running within another thread.
+ */
+ @Test
+ public void testDistributedLockDoRequestBusy() {
+ /*
+ * this feature will invoke a request in a background thread while it's being run
+ * in a foreground thread.
+ */
+ AtomicBoolean running = new AtomicBoolean(false);
+ AtomicBoolean returned = new AtomicBoolean(false);
+
+ feature = new MyLockingFeature(true) {
+ @Override
+ protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+ return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean doDbInsert(Connection conn) throws SQLException {
+ if (running.get()) {
+ // already inside the thread - don't recurse any further
+ return super.doDbInsert(conn);
+ }
+
+ running.set(true);
+
+ Thread thread = new Thread(() -> {
+ // run doLock from within the new thread
+ runLock(0, 0);
+ });
+ thread.setDaemon(true);
+ thread.start();
+
+ // wait for the background thread to complete before continuing
+ try {
+ thread.join(5000);
+ } catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+ }
+
+ returned.set(!thread.isAlive());
+
+ return super.doDbInsert(conn);
+ }
+ };
+ }
+ };
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // run doLock
+ runLock(0, 0);
+
+ assertTrue(returned.get());
+ }
+
+ /**
+ * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
+ // throw run-time exception
+ when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
+
+ // use a data source that throws an exception when getConnection() is called
+ feature = new MyLockingFeature(true) {
+ @Override
+ protected BasicDataSource makeDataSource() throws Exception {
+ return datasrc;
+ }
+ };
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should NOT reschedule
+ runLock(0, 0);
+
+ assertTrue(lock.isUnavailable());
+ verify(callback).lockUnavailable(lock);
+
+ verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
+ }
+
+ /**
+ * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
+ * state.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
+ // throw run-time exception
+ when(datasrc.getConnection()).thenAnswer(answer -> {
+ lock.free();
+ throw new IllegalStateException(EXPECTED_EXCEPTION);
+ });
+
+ // use a data source that throws an exception when getConnection() is called
+ feature = new MyLockingFeature(true) {
+ @Override
+ protected BasicDataSource makeDataSource() throws Exception {
+ return datasrc;
+ }
+ };
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should NOT reschedule
+ runLock(0, 0);
+
+ assertTrue(lock.isUnavailable());
+ verify(callback, never()).lockUnavailable(lock);
+
+ verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
+ }
+
+ /**
+ * Tests doRequest() when the retry count gets exhausted.
+ */
+ @Test
+ public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
+ // use a data source that throws an exception when getConnection() is called
+ feature = new InvalidDbLockingFeature(TRANSIENT);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should fail and reschedule
+ runLock(0, 0);
+
+ // should still be waiting
+ assertTrue(lock.isWaiting());
+ verify(callback, never()).lockUnavailable(lock);
+
+ // try again, via SCHEDULER - first retry fails
+ runSchedule(0, 0);
+
+ // should still be waiting
+ assertTrue(lock.isWaiting());
+ verify(callback, never()).lockUnavailable(lock);
+
+ // try again, via SCHEDULER - final retry fails
+ runSchedule(1, 0);
+ assertTrue(lock.isUnavailable());
+
+ // now callback should have been called
+ verify(callback).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests doRequest() when a non-transient DB exception is thrown.
+ */
+ @Test
+ public void testDistributedLockDoRequestNotTransient() {
+ /*
+ * use a data source that throws a PERMANENT exception when getConnection() is
+ * called
+ */
+ feature = new InvalidDbLockingFeature(PERMANENT);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should fail
+ runLock(0, 0);
+
+ assertTrue(lock.isUnavailable());
+ verify(callback).lockUnavailable(lock);
+
+ // should not have scheduled anything new
+ verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
+ verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
+ }
+
+ @Test
+ public void testDistributedLockDoLock() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should simply do an insert
+ long tbegin = System.currentTimeMillis();
+ runLock(0, 0);
+
+ assertEquals(1, getRecordCount());
+ assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
+ verify(callback).lockAvailable(lock);
+ }
+
+ /**
+ * Tests doLock() when the lock is freed before doLock runs.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoLockFreed() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ lock.setState(LockState.UNAVAILABLE);
+
+ // invoke doLock - should do nothing
+ runLock(0, 0);
+
+ assertEquals(0, getRecordCount());
+
+ verify(callback, never()).lockAvailable(lock);
+ }
+
+ /**
+ * Tests doLock() when a DB exception is thrown.
+ */
+ @Test
+ public void testDistributedLockDoLockEx() {
+ // use a data source that throws an exception when getConnection() is called
+ feature = new InvalidDbLockingFeature(PERMANENT);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should simply do an insert
+ runLock(0, 0);
+
+ // lock should have failed due to exception
+ verify(callback).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
+ * to be called.
+ */
+ @Test
+ public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
+ // insert an expired record
+ insertRecord(RESOURCE, feature.getUuidString(), 0);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock - should simply do an update
+ runLock(0, 0);
+ verify(callback).lockAvailable(lock);
+ }
+
+ /**
+ * Tests doLock() when a locked record already exists.
+ */
+ @Test
+ public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
+ // insert an expired record
+ insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock
+ runLock(0, 0);
+
+ // lock should have failed because it's already locked
+ verify(callback).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testDistributedLockDoUnlock() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // invoke doLock()
+ runLock(0, 0);
+
+ lock.free();
+
+ // invoke doUnlock()
+ long tbegin = System.currentTimeMillis();
+ runLock(1, 0);
+
+ assertEquals(0, getRecordCount());
+ assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
+
+ assertTrue(lock.isUnavailable());
+
+ // no more callbacks should have occurred
+ verify(callback, times(1)).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests doUnlock() when a DB exception is thrown.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoUnlockEx() throws SQLException {
+ feature = new InvalidDbLockingFeature(PERMANENT);
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // do NOT invoke doLock() - it will fail without a DB connection
+
+ lock.free();
+
+ // invoke doUnlock()
+ runLock(1, 0);
+
+ assertTrue(lock.isUnavailable());
+
+ // no more callbacks should have occurred
+ verify(callback, never()).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testDistributedLockDoExtend() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+
+ // call doExtend()
+ long tbegin = System.currentTimeMillis();
+ runLock(1, 0);
+
+ assertEquals(1, getRecordCount());
+ assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
+
+ assertTrue(lock.isActive());
+
+ // no more callbacks should have occurred
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ // extension should have succeeded
+ verify(callback2).lockAvailable(lock);
+ verify(callback2, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests doExtend() when the lock is freed before doExtend runs.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoExtendFreed() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ lock.extend(HOLD_SEC2, callback);
+
+ lock.setState(LockState.UNAVAILABLE);
+
+ // invoke doExtend - should do nothing
+ runLock(1, 0);
+
+ assertEquals(0, getRecordCount());
+
+ verify(callback, never()).lockAvailable(lock);
+ }
+
+ /**
+ * Tests doExtend() when the lock record is missing from the DB, thus requiring an
+ * insert.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+
+ // delete the record so it's forced to re-insert it
+ cleanDb();
+
+ // call doExtend()
+ long tbegin = System.currentTimeMillis();
+ runLock(1, 0);
+
+ assertEquals(1, getRecordCount());
+ assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
+
+ assertTrue(lock.isActive());
+
+ // no more callbacks should have occurred
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ // extension should have succeeded
+ verify(callback2).lockAvailable(lock);
+ verify(callback2, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests doExtend() when both update and insert fail.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException {
+ /*
+ * this feature will create a lock that returns false when doDbUpdate() is
+ * invoked, or when doDbInsert() is invoked a second time
+ */
+ feature = new MyLockingFeature(true) {
+ @Override
+ protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+ return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
+ private static final long serialVersionUID = 1L;
+ private int ntimes = 0;
+
+ @Override
+ protected boolean doDbInsert(Connection conn) throws SQLException {
+ if (ntimes++ > 0) {
+ return false;
+ }
+
+ return super.doDbInsert(conn);
+ }
+
+ @Override
+ protected boolean doDbUpdate(Connection conn) {
+ return false;
+ }
+ };
+ }
+ };
+
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+
+ // call doExtend()
+ runLock(1, 0);
+
+ assertTrue(lock.isUnavailable());
+
+ // no more callbacks should have occurred
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ // extension should have failed
+ verify(callback2, never()).lockAvailable(lock);
+ verify(callback2).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests doExtend() when an exception occurs.
+ *
+ * @throws SQLException if an error occurs
+ */
+ @Test
+ public void testDistributedLockDoExtendEx() throws SQLException {
+ lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ runLock(0, 0);
+
+ /*
+ * delete the record and insert one with a different owner, which will cause
+ * doDbInsert() to throw an exception
+ */
+ cleanDb();
+ insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+
+ // call doExtend()
+ runLock(1, 0);
+
+ assertTrue(lock.isUnavailable());
+
+ // no more callbacks should have occurred
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ // extension should have failed
+ verify(callback2, never()).lockAvailable(lock);
+ verify(callback2).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testDistributedLockToString() {
+ String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
+ assertNotNull(text);
+ assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
+ }
+
+ @Test
+ public void testMakeThreadPool() {
+ // use a REAL feature to test this
+ feature = new DistributedLockManager();
+
+ // this should create a thread pool
+ feature.beforeCreateLockManager(engine, new Properties());
+ feature.afterStart(engine);
+
+ shutdownFeature();
+ }
+
+ /**
+ * Performs a multi-threaded test of the locking facility.
+ *
+ * @throws InterruptedException if the current thread is interrupted while waiting for
+ * the background threads to complete
+ */
+ @Test
+ public void testMultiThreaded() throws InterruptedException {
+ feature = new DistributedLockManager();
+ feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
+ feature.afterStart(PolicyEngineConstants.getManager());
+
+ List<MyThread> threads = new ArrayList<>(MAX_THREADS);
+ for (int x = 0; x < MAX_THREADS; ++x) {
+ threads.add(new MyThread());
+ }
+
+ threads.forEach(Thread::start);
+
+ for (MyThread thread : threads) {
+ thread.join(6000);
+ assertFalse(thread.isAlive());
+ }
+
+ for (MyThread thread : threads) {
+ if (thread.err != null) {
+ throw thread.err;
+ }
+ }
+
+ assertTrue(nsuccesses.get() > 0);
+ }
+
+ private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
+ boolean waitForLock) {
+ return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
+ }
+
+ private DistributedLock roundTrip(DistributedLock lock) throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(lock);
+ }
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ try (ObjectInputStream ois = new ObjectInputStream(bais)) {
+ return (DistributedLock) ois.readObject();
+ }
+ }
+
+ /**
+ * Runs the checkExpired() action.
+ *
+ * @param nskip number of actions in the work queue to skip
+ * @param nadditional number of additional actions that appear in the work queue
+ * <i>after</i> the checkExpired action
+ * @param schedSec number of seconds for which the checker should have been scheduled
+ */
+ private void runChecker(int nskip, int nadditional, long schedSec) {
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
+ Runnable action = captor.getAllValues().get(nskip);
+ action.run();
+ }
+
+ /**
+ * Runs a lock action (e.g., doLock, doUnlock).
+ *
+ * @param nskip number of actions in the work queue to skip
+ * @param nadditional number of additional actions that appear in the work queue
+ * <i>after</i> the desired action
+ */
+ void runLock(int nskip, int nadditional) {
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
+
+ Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
+ action.run();
+ }
+
+ /**
+ * Runs a scheduled action (e.g., "retry" action).
+ *
+ * @param nskip number of actions in the work queue to skip
+ * @param nadditional number of additional actions that appear in the work queue
+ * <i>after</i> the desired action
+ */
+ void runSchedule(int nskip, int nadditional) {
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
+
+ Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
+ action.run();
+ }
+
+ /**
+ * Gets a count of the number of lock records in the DB.
+ *
+ * @return the number of lock records in the DB
+ * @throws SQLException if an error occurs accessing the DB
+ */
+ private int getRecordCount() throws SQLException {
+ try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
+ ResultSet result = stmt.executeQuery()) {
+
+ if (result.next()) {
+ return result.getInt(1);
+
+ } else {
+ return 0;
+ }
+ }
+ }
+
+ /**
+ * Determines if there is a record for the given resource whose expiration time is in
+ * the expected range.
+ *
+ * @param resourceId ID of the resource of interest
+ * @param uuidString UUID string of the owner
+ * @param holdSec seconds for which the lock was to be held
+ * @param tbegin earliest time, in milliseconds, at which the record could have been
+ * inserted into the DB
+ * @return {@code true} if a record is found, {@code false} otherwise
+ * @throws SQLException if an error occurs accessing the DB
+ */
+ private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
+ try (PreparedStatement stmt =
+ conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
+ + " WHERE resourceId=? AND host=? AND owner=?")) {
+
+ stmt.setString(1, resourceId);
+ stmt.setString(2, feature.getHostName());
+ stmt.setString(3, uuidString);
+
+ try (ResultSet result = stmt.executeQuery()) {
+ if (result.next()) {
+ int remaining = result.getInt(1);
+ long maxDiff = System.currentTimeMillis() - tbegin;
+ return (remaining >= 0 && holdSec - remaining <= maxDiff);
+
+ } else {
+ return false;
+ }
+ }
+ }
+ }
+
+ /**
+ * Inserts a record into the DB.
+ *
+ * @param resourceId ID of the resource of interest
+ * @param uuidString UUID string of the owner
+ * @param expireOffset offset, in seconds, from "now", at which the lock should expire
+ * @throws SQLException if an error occurs accessing the DB
+ */
+ private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
+ this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset);
+ }
+
+ private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
+ throws SQLException {
+ try (PreparedStatement stmt =
+ conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
+ + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
+
+ stmt.setString(1, resourceId);
+ stmt.setString(2, hostName);
+ stmt.setString(3, uuidString);
+ stmt.setInt(4, expireOffset);
+
+ assertEquals(1, stmt.executeUpdate());
+ }
+ }
+
+ /**
+ * Updates a record in the DB.
+ *
+ * @param resourceId ID of the resource of interest
+ * @param newUuid UUID string of the <i>new</i> owner
+ * @param expireOffset offset, in seconds, from "now", at which the lock should expire
+ * @throws SQLException if an error occurs accessing the DB
+ */
+ private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
+ try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
+ + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
+
+ stmt.setString(1, newHost);
+ stmt.setString(2, newUuid);
+ stmt.setInt(3, expireOffset);
+ stmt.setString(4, resourceId);
+
+ assertEquals(1, stmt.executeUpdate());
+ }
+ }
+
+ /**
+ * Feature that uses <i>exsvc</i> to execute requests.
+ */
+ private class MyLockingFeature extends DistributedLockManager {
+
+ public MyLockingFeature(boolean init) {
+ shutdownFeature();
+
+ exsvc = mock(ScheduledExecutorService.class);
+ when(engine.getExecutorService()).thenReturn(exsvc);
+
+ if (init) {
+ beforeCreateLockManager(engine, new Properties());
+ afterStart(engine);
+ }
+ }
+ }
+
+ /**
+ * Feature whose data source all throws exceptions.
+ */
+ private class InvalidDbLockingFeature extends MyLockingFeature {
+ private int errcode;
+ private boolean freeLock = false;
+
+ public InvalidDbLockingFeature(int errcode) {
+ // pass "false" because we have to set the error code BEFORE calling
+ // afterStart()
+ super(false);
+
+ this.errcode = errcode;
+
+ this.beforeCreateLockManager(engine, new Properties());
+ this.afterStart(engine);
+ }
+
+ @Override
+ protected BasicDataSource makeDataSource() throws Exception {
+ when(datasrc.getConnection()).thenAnswer(answer -> {
+ if (freeLock) {
+ freeLock = false;
+ lock.free();
+ }
+
+ throw new SQLException(EXPECTED_EXCEPTION, "", errcode);
+ });
+
+ doThrow(new SQLException(EXPECTED_EXCEPTION, "", errcode)).when(datasrc).close();
+
+ return datasrc;
+ }
+ }
+
+ /**
+ * Feature whose locks free themselves while free() is already running.
+ */
+ private class FreeWithFreeLockingFeature extends MyLockingFeature {
+ private boolean relock;
+
+ public FreeWithFreeLockingFeature(boolean relock) {
+ super(true);
+ this.relock = relock;
+ }
+
+ @Override
+ protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+
+ return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
+ private static final long serialVersionUID = 1L;
+ private boolean checked = false;
+
+ @Override
+ public boolean isUnavailable() {
+ if (checked) {
+ return super.isUnavailable();
+ }
+
+ checked = true;
+
+ // release and relock
+ free();
+
+ if (relock) {
+ // run doUnlock
+ runLock(1, 0);
+
+ // relock it
+ createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
+ }
+
+ return false;
+ }
+ };
+ }
+ }
+
+ /**
+ * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
+ * extend it, and then unlock it.
+ */
+ private class MyThread extends Thread {
+ AssertionError err = null;
+
+ public MyThread() {
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (int x = 0; x < MAX_LOOPS; ++x) {
+ makeAttempt();
+ }
+
+ } catch (AssertionError e) {
+ err = e;
+ }
+ }
+
+ private void makeAttempt() {
+ try {
+ Semaphore sem = new Semaphore(0);
+
+ LockCallback cb = new LockCallback() {
+ @Override
+ public void lockAvailable(Lock lock) {
+ sem.release();
+ }
+
+ @Override
+ public void lockUnavailable(Lock lock) {
+ sem.release();
+ }
+ };
+
+ Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
+
+ // wait for callback, whether available or unavailable
+ assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
+ if (!lock.isActive()) {
+ return;
+ }
+
+ nsuccesses.incrementAndGet();
+
+ assertEquals(1, nactive.incrementAndGet());
+
+ lock.extend(HOLD_SEC2, cb);
+ assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
+ assertTrue(lock.isActive());
+
+ // decrement BEFORE free()
+ nactive.decrementAndGet();
+
+ assertTrue(lock.free());
+ assertTrue(lock.isUnavailable());
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new AssertionError("interrupted", e);
+ }
+ }
+ }
+}
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java
new file mode 100644
index 00000000..5f76d657
--- /dev/null
+++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockPropertiesTest.java
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Properties;
+import java.util.TreeSet;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+
+public class DistributedLockPropertiesTest {
+
+ private Properties props;
+
+ /**
+ * Populates {@link #props}.
+ */
+ @Before
+ public void setUp() {
+ props = new Properties();
+
+ props.setProperty(DistributedLockProperties.DB_DRIVER, "my driver");
+ props.setProperty(DistributedLockProperties.DB_URL, "my url");
+ props.setProperty(DistributedLockProperties.DB_USER, "my user");
+ props.setProperty(DistributedLockProperties.DB_PASS, "my pass");
+ props.setProperty(DistributedLockProperties.TRANSIENT_ERROR_CODES, "10,-20,,,30");
+ props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "100");
+ props.setProperty(DistributedLockProperties.RETRY_SEC, "200");
+ props.setProperty(DistributedLockProperties.MAX_RETRIES, "300");
+ }
+
+ @Test
+ public void test() throws PropertyException {
+ DistributedLockProperties dlp = new DistributedLockProperties(props);
+
+ assertEquals("my driver", dlp.getDbDriver());
+ assertEquals("my url", dlp.getDbUrl());
+ assertEquals("my user", dlp.getDbUser());
+ assertEquals("my pass", dlp.getDbPwd());
+ assertEquals("10,-20,,,30", dlp.getErrorCodeStrings());
+ assertEquals("[-20, 10, 30]", new TreeSet<>(dlp.getTransientErrorCodes()).toString());
+ assertEquals(100, dlp.getExpireCheckSec());
+ assertEquals(200, dlp.getRetrySec());
+ assertEquals(300, dlp.getMaxRetries());
+
+ assertTrue(dlp.isTransient(10));
+ assertTrue(dlp.isTransient(-20));
+ assertTrue(dlp.isTransient(30));
+
+ assertFalse(dlp.isTransient(-10));
+
+ // invalid value
+ props.setProperty(DistributedLockProperties.TRANSIENT_ERROR_CODES, "10,abc,30");
+
+ assertThatThrownBy(() -> new DistributedLockProperties(props)).isInstanceOf(PropertyException.class)
+ .hasMessageContaining(DistributedLockProperties.TRANSIENT_ERROR_CODES);
+ }
+}
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java
deleted file mode 100644
index 68a5a31b..00000000
--- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockingFeatureTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.sql.SQLException;
-import org.apache.commons.dbcp2.BasicDataSource;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.onap.policy.common.utils.properties.exception.PropertyException;
-import org.onap.policy.drools.persistence.SystemPersistenceConstants;
-
-/**
- * Partially tests DistributedLockingFeature; most of the methods are tested via
- * {@link TargetLockTest}.
- */
-public class DistributedLockingFeatureTest {
- private static final String EXPECTED = "expected exception";
-
- private BasicDataSource dataSrc;
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
- }
-
- @Before
- public void setUp() throws Exception {
- dataSrc = mock(BasicDataSource.class);
- }
-
- @Test
- public void testGetSequenceNumber() {
- assertEquals(1000, new DistributedLockingFeature().getSequenceNumber());
- }
-
- @Test(expected = DistributedLockingFeatureException.class)
- public void testAfterStart_PropEx() {
- new DistributedLockingFeatureImpl(new PropertyException("prop", "val")).afterStart(null);
- }
-
- @Test(expected = DistributedLockingFeatureException.class)
- public void testAfterStart_InterruptEx() {
- new DistributedLockingFeatureImpl(new InterruptedException(EXPECTED)).afterStart(null);
- }
-
- @Test(expected = DistributedLockingFeatureException.class)
- public void testAfterStart_OtherEx() {
- new DistributedLockingFeatureImpl(new RuntimeException(EXPECTED)).afterStart(null);
- }
-
- @Test
- public void testCleanLockTable() throws Exception {
- when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED));
-
- new DistributedLockingFeatureImpl().afterStart(null);
- }
-
- /**
- * Feature that overrides {@link #makeDataSource()}.
- */
- private class DistributedLockingFeatureImpl extends DistributedLockingFeature {
- /**
- * Exception to throw when {@link #makeDataSource()} is invoked.
- */
- private final Exception makeEx;
-
- public DistributedLockingFeatureImpl() {
- makeEx = null;
- }
-
- public DistributedLockingFeatureImpl(Exception ex) {
- this.makeEx = ex;
- }
-
- @Override
- protected BasicDataSource makeDataSource() throws Exception {
- if (makeEx != null) {
- throw makeEx;
- }
-
- return dataSrc;
- }
- }
-}
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java
deleted file mode 100644
index 84eba6b6..00000000
--- a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/TargetLockTest.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import org.apache.commons.dbcp2.BasicDataSource;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi.OperResult;
-import org.onap.policy.drools.persistence.SystemPersistenceConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TargetLockTest {
- private static final Logger logger = LoggerFactory.getLogger(TargetLockTest.class);
- private static final int MAX_AGE_SEC = 4 * 60;
- private static final String DB_CONNECTION =
- "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
- private static final String DB_USER = "user";
- private static final String DB_PASSWORD = "password";
- private static final String EXPECTED = "expected exception";
- private static final String MY_RESOURCE = "my-resource-id";
- private static final String MY_OWNER = "my-owner";
- private static final UUID MY_UUID = UUID.randomUUID();
- private static Connection conn = null;
- private static DistributedLockingFeature distLockFeat;
-
- /**
- * Setup the database.
- */
- @BeforeClass
- public static void setup() {
- getDbConnection();
- createTable();
- SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
- distLockFeat = new DistributedLockingFeature();
- distLockFeat.afterStart(null);
- }
-
- /**
- * Cleanup the lock.
- */
- @AfterClass
- public static void cleanUp() {
- distLockFeat.beforeShutdown(null);
- try {
- conn.close();
- } catch (SQLException e) {
- logger.error("Error in TargetLockTest.cleanUp()", e);
- }
- }
-
- /**
- * Wipe the database.
- */
- @Before
- public void wipeDb() {
- try (PreparedStatement lockDelete = conn.prepareStatement("DELETE FROM pooling.locks"); ) {
- lockDelete.executeUpdate();
- } catch (SQLException e) {
- logger.error("Error in TargetLockTest.wipeDb()", e);
- throw new RuntimeException(e);
- }
- }
-
- @Test
- public void testGrabLockSuccess() throws InterruptedException, ExecutionException {
- assertEquals(
- OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC));
-
- // attempt to grab expiredLock
- try (PreparedStatement updateStatement =
- conn.prepareStatement(
- "UPDATE pooling.locks SET expirationTime = timestampadd(second, -1, now()) WHERE resourceId = ?"); ) {
- updateStatement.setString(1, "resource1");
- updateStatement.executeUpdate();
-
- } catch (SQLException e) {
- logger.error("Error in TargetLockTest.testGrabLockSuccess()", e);
- throw new RuntimeException(e);
- }
-
- assertEquals(
- OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC));
-
- // cannot re-lock
- assertEquals(
- OperResult.OPER_DENIED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC));
-
- assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
- }
-
- @Test
- public void testExpiredLocks() throws Exception {
-
- // grab lock
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
-
- // force lock to expire
- try (PreparedStatement lockExpire =
- conn.prepareStatement(
- "UPDATE pooling.locks SET expirationTime = timestampadd(second, -?, now())"); ) {
- lockExpire.setInt(1, MAX_AGE_SEC + 1);
- lockExpire.executeUpdate();
- }
-
- assertEquals(
- OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner2", MAX_AGE_SEC));
- }
-
- @Test
- public void testGrabLockFail() throws InterruptedException, ExecutionException {
-
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
-
- assertEquals(
- OperResult.OPER_DENIED, distLockFeat.beforeLock("resource1", "owner2", MAX_AGE_SEC));
- }
-
- @Test
- public void testSecondGrab_UpdateOk() throws Exception {
- PreparedStatement grabLockInsert = mock(PreparedStatement.class);
- when(grabLockInsert.executeUpdate()).thenThrow(new SQLException(EXPECTED));
-
- PreparedStatement secondGrabUpdate = mock(PreparedStatement.class);
- when(secondGrabUpdate.executeUpdate()).thenReturn(1);
-
- Connection connMock = mock(Connection.class);
- when(connMock.prepareStatement(anyString())).thenReturn(grabLockInsert, secondGrabUpdate);
-
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenReturn(connMock);
-
- assertTrue(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).lock(MAX_AGE_SEC));
- }
-
- @Test
- public void testSecondGrab_UpdateFail_InsertOk() throws Exception {
- PreparedStatement grabLockInsert = mock(PreparedStatement.class);
- when(grabLockInsert.executeUpdate()).thenThrow(new SQLException(EXPECTED));
-
- PreparedStatement secondGrabUpdate = mock(PreparedStatement.class);
- when(secondGrabUpdate.executeUpdate()).thenReturn(0);
-
- PreparedStatement secondGrabInsert = mock(PreparedStatement.class);
- when(secondGrabInsert.executeUpdate()).thenReturn(1);
-
- Connection connMock = mock(Connection.class);
- when(connMock.prepareStatement(anyString())).thenReturn(grabLockInsert, secondGrabUpdate, secondGrabInsert);
-
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenReturn(connMock);
-
- assertTrue(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).lock(MAX_AGE_SEC));
- }
-
- @Test
- public void testSecondGrab_UpdateFail_InsertFail() throws Exception {
- PreparedStatement grabLockInsert = mock(PreparedStatement.class);
- when(grabLockInsert.executeUpdate()).thenThrow(new SQLException(EXPECTED));
-
- PreparedStatement secondGrabUpdate = mock(PreparedStatement.class);
- when(secondGrabUpdate.executeUpdate()).thenReturn(0);
-
- PreparedStatement secondGrabInsert = mock(PreparedStatement.class);
- when(secondGrabInsert.executeUpdate()).thenReturn(0);
-
- Connection connMock = mock(Connection.class);
- when(connMock.prepareStatement(anyString())).thenReturn(grabLockInsert, secondGrabUpdate, secondGrabInsert);
-
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenReturn(connMock);
-
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).lock(MAX_AGE_SEC));
- }
-
- @Test
- public void testUpdateLock() throws Exception {
- // not locked yet - refresh should fail
- assertEquals(
- OperResult.OPER_DENIED, distLockFeat.beforeRefresh("resource1", "owner1", MAX_AGE_SEC));
-
- // now lock it
- assertEquals(
- OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC));
-
- // refresh should work now
- assertEquals(
- OperResult.OPER_ACCEPTED, distLockFeat.beforeRefresh("resource1", "owner1", MAX_AGE_SEC));
-
- assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
-
- // expire the lock
- try (PreparedStatement updateStatement =
- conn.prepareStatement(
- "UPDATE pooling.locks SET expirationTime = timestampadd(second, -1, now()) WHERE resourceId = ?"); ) {
- updateStatement.setString(1, "resource1");
- updateStatement.executeUpdate();
- }
-
- // refresh should fail now
- assertEquals(
- OperResult.OPER_DENIED, distLockFeat.beforeRefresh("resource1", "owner1", MAX_AGE_SEC));
-
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
-
- // test exception case
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED));
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).refresh(MAX_AGE_SEC));
- }
-
- @Test
- public void testUnlock() throws Exception {
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
-
- assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeUnlock("resource1", "owner1"));
- assertEquals(
- OperResult.OPER_ACCEPTED, distLockFeat.beforeLock("resource1", "owner2", MAX_AGE_SEC));
-
- // test exception case
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED));
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).unlock());
- }
-
- @Test
- public void testIsActive() throws Exception {
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
- assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner2"));
-
- // isActive on expiredLock
- try (PreparedStatement updateStatement =
- conn.prepareStatement(
- "UPDATE pooling.locks SET expirationTime = timestampadd(second, -5, now()) WHERE resourceId = ?"); ) {
- updateStatement.setString(1, "resource1");
- updateStatement.executeUpdate();
- }
-
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
-
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
- // Unlock record, next isActive attempt should fail
- distLockFeat.beforeUnlock("resource1", "owner1");
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLockedBy("resource1", "owner1"));
-
- // test exception case for outer "try"
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED));
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isActive());
-
- // test exception case for inner "try"
- PreparedStatement stmt = mock(PreparedStatement.class);
- when(stmt.executeQuery()).thenThrow(new SQLException(EXPECTED));
- Connection connMock = mock(Connection.class);
- when(connMock.prepareStatement(anyString())).thenReturn(stmt);
- dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenReturn(connMock);
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isActive());
- }
-
- @Test
- public void unlockBeforeLock() {
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeUnlock("resource1", "owner1"));
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
- assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeUnlock("resource1", "owner1"));
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeUnlock("resource1", "owner1"));
- }
-
- @Test
- public void testIsLocked() throws Exception {
- assertEquals(OperResult.OPER_DENIED, distLockFeat.beforeIsLocked("resource1"));
- distLockFeat.beforeLock("resource1", "owner1", MAX_AGE_SEC);
- assertEquals(OperResult.OPER_ACCEPTED, distLockFeat.beforeIsLocked("resource1"));
-
- // test exception case for outer "try"
- BasicDataSource dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenThrow(new SQLException(EXPECTED));
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isLocked());
-
- // test exception case for inner "try"
- PreparedStatement stmt = mock(PreparedStatement.class);
- when(stmt.executeQuery()).thenThrow(new SQLException(EXPECTED));
- Connection connMock = mock(Connection.class);
- when(connMock.prepareStatement(anyString())).thenReturn(stmt);
- dataSrc = mock(BasicDataSource.class);
- when(dataSrc.getConnection()).thenReturn(connMock);
- assertFalse(new TargetLock(MY_RESOURCE, MY_UUID, MY_OWNER, dataSrc).isLocked());
- }
-
- private static void getDbConnection() {
- try {
- conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
- } catch (SQLException e) {
- logger.error("Error in TargetLockTest.getDBConnection()", e);
- }
- }
-
- private static void createTable() {
- String createString =
- "create table if not exists pooling.locks "
- + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
- + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))";
- try (PreparedStatement createStmt = conn.prepareStatement(createString); ) {
- createStmt.executeUpdate();
-
- } catch (SQLException e) {
- logger.error("Error in TargetLockTest.createTable()", e);
- }
- }
-}
diff --git a/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties b/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties
index d1a07e82..061cc608 100644
--- a/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties
+++ b/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties
@@ -2,14 +2,14 @@
# ============LICENSE_START=======================================================
# feature-distributed-locking
# ================================================================================
-# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,5 +22,8 @@ javax.persistence.jdbc.driver=org.h2.Driver
javax.persistence.jdbc.url=jdbc:h2:mem:pooling
javax.persistence.jdbc.user=user
javax.persistence.jdbc.password=password
-distributed.locking.lock.aging=150
-distributed.locking.heartbeat.interval=500 \ No newline at end of file
+
+distributed.locking.transient.error.codes=500
+distributed.locking.expire.check.seconds=900
+distributed.locking.retry.seconds=60
+distributed.locking.max.retries=2
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/AlwaysFailLock.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/AlwaysFailLock.java
new file mode 100644
index 00000000..0a4d327b
--- /dev/null
+++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/AlwaysFailLock.java
@@ -0,0 +1,71 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.core.lock;
+
+
+/**
+ * Lock implementation whose operations always fail.
+ */
+public class AlwaysFailLock extends LockImpl {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs the object.
+ */
+ public AlwaysFailLock() {
+ super();
+ }
+
+ /**
+ * Constructs the object.
+ *
+ * @param resourceId identifier of the resource to be locked
+ * @param ownerKey information identifying the owner requesting the lock
+ * @param holdSec amount of time, in seconds, for which the lock should be held once
+ * it has been granted, after which it will automatically be released
+ * @param callback callback to be invoked once the lock is granted, or subsequently
+ * lost; must not be {@code null}
+ */
+ public AlwaysFailLock(String resourceId, String ownerKey, int holdSec, LockCallback callback) {
+ super(LockState.UNAVAILABLE, resourceId, ownerKey, holdSec, callback);
+ }
+
+ /**
+ * Always returns false.
+ */
+ @Override
+ public boolean free() {
+ return false;
+ }
+
+ /**
+ * Always fails and invokes {@link LockCallback#lockUnavailable(Lock)}.
+ */
+ @Override
+ public void extend(int holdSec, LockCallback callback) {
+ synchronized (this) {
+ setHoldSec(holdSec);
+ setCallback(callback);
+ }
+
+ notifyUnavailable();
+ }
+}
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/Lock.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/Lock.java
index de62b24a..b2ed9c7f 100644
--- a/policy-core/src/main/java/org/onap/policy/drools/core/lock/Lock.java
+++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/Lock.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,145 +20,67 @@
package org.onap.policy.drools.core.lock;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map.Entry;
-import org.onap.policy.drools.utils.Pair;
-
/**
- * Lock that is held for a resource. This not only identifies the current owner of the
- * lock, but it also includes a queue of requesters. An item is associated with each
- * requester that is waiting in the queue. Note: this class is <b>not</b> thread-safe.
- *
- * @param <T> type of item to be associated with a request
+ * Lock held on a resource.
*/
-public class Lock<T> {
+public interface Lock {
/**
- * Result returned by <i>removeRequester()</i>.
+ * Frees/release the lock.
+ *
+ * <p/>
+ * Note: client code may choose to invoke this method <i>before</i> the lock has been
+ * granted.
+ *
+ * @return {@code true} if the request was accepted, {@code false} if the lock is
+ * unavailable
*/
- public enum RemoveResult {
- /**
- * The requester was the owner of the lock, and the lock is no longer needed,
- * because there were no other requesters waiting to get the lock.
- */
- UNLOCKED,
-
- /**
- * The requester was the owner of the lock, and has been replaced with the next
- * requester waiting in the queue.
- */
- RELOCKED,
-
- /**
- * The requester had been waiting in the queue, and has now been removed.
- */
- REMOVED,
-
- /**
- * The requester was not the owner, nor was it waiting in the queue.
- */
- NOT_FOUND
- }
+ boolean free();
/**
- * The last owner to grab the lock, never {@code null}.
+ * Determines if the lock is active.
+ *
+ * @return {@code true} if the lock is <b>ACTIVE</b>, {@code false} otherwise
*/
- private String owner;
+ boolean isActive();
/**
- * Requesters waiting to get the lock. Maps the requester (i.e., owner for which the
- * request is being made) to its associated item. Uses a Linked map so that the order
- * of the requesters is maintained. We don't expect many requesters for any given
- * lock, thus we'll start with a small hash size.
+ * Determines if the lock is unavailable. Once a lock object becomes unavailable, it
+ * will never become active again.
+ *
+ * @return {@code true} if the lock is <b>UNAVAILABLE</b>, {@code false} otherwise
*/
- private LinkedHashMap<String, T> requester2item = new LinkedHashMap<>(5);
+ boolean isUnavailable();
/**
- * Constructor.
- *
- * @param owner the current owner of this lock
+ * Determines if this object is waiting for a lock to be granted or denied. This
+ * applies when the lock is first created, or after {@link #extend(int, LockCallback)}
+ * has been invoked.
+ *
+ * @return {@code true} if the lock is <b>WAITING</b>, {@code false} otherwise
*/
- public Lock(String owner) {
- this.owner = owner;
- }
+ boolean isWaiting();
/**
- * Get owner.
- *
- * @return the current owner of the lock, or the last owner of the lock, if the lock
- * is not currently owned. (This will never be {@code null}.)
+ * Gets the ID of the resource to which the lock applies.
+ *
+ * @return the ID of the resource to which the lock applies
*/
- public String getOwner() {
- return owner;
- }
+ String getResourceId();
/**
- * Adds a new requester to the queue of requesters.
- *
- * @param requester the requester
- * @param item to be associated with the requester, must not be {@code null}
- * @return {@code true} if the requester was added, {@code false} if it already owns
- * the lock or is already in the queue
- * @throws IllegalArgumentException if the item is null
+ * Gets the lock's owner key.
+ *
+ * @return the lock's owner key
*/
- public boolean add(String requester, T item) {
- if (item == null) {
- throw SimpleLockManager.makeNullArgException("lock requester item is null");
- }
-
- if (requester.equals(owner)) {
- // requester already owns the lock
- return false;
- }
-
- T prev = requester2item.putIfAbsent(requester, item);
-
- // if there's a previous value, then that means this requester is already
- // waiting for a lock on this resource. In that case, we return false
- return (prev == null);
- }
+ String getOwnerKey();
/**
- * Removes a requester from the lock. The requester may currently own the lock, or it
- * may be in the queue waiting for the lock. Note: as this is agnostic to the type of
- * item associated with the requester, it is unable to notify the new owner that it's
- * the new owner; that is left up to the code that invokes this method.
- *
- * @param requester the requester
- * @param newOwner the new owner info is placed here, if the result is <i>RELOCKED</i>
- * @return the result
+ * Extends a lock an additional amount of time from now. The callback will always be
+ * invoked, and may be invoked <i>before</i> this method returns.
+ *
+ * @param holdSec the additional amount of time to hold the lock, in seconds
+ * @param callback callback to be invoked when the extension completes
*/
- public RemoveResult removeRequester(String requester, Pair<String, T> newOwner) {
-
- if (!requester.equals(owner)) {
- // requester does not currently own the lock - remove it from the
- // queue
- T ent = requester2item.remove(requester);
-
- // if there was an entry in the queue, then return true to indicate
- // that it was removed. Otherwise, return false
- return (ent != null ? RemoveResult.REMOVED : RemoveResult.NOT_FOUND);
- }
-
- /*
- * requester was the owner - find something to take over
- */
- Iterator<Entry<String, T>> it = requester2item.entrySet().iterator();
- if (!it.hasNext()) {
- // no one to take over the lock - it's now unlocked
- return RemoveResult.UNLOCKED;
- }
-
- // there's another requester to take over
- Entry<String, T> ent = it.next();
- it.remove();
-
- owner = ent.getKey();
-
- newOwner.first(owner);
- newOwner.second(ent.getValue());
-
- return RemoveResult.RELOCKED;
- }
+ void extend(int holdSec, LockCallback callback);
}
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiConstants.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockCallback.java
index 8510e3d9..fae1cb43 100644
--- a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiConstants.java
+++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockCallback.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * api-resource-locks
+ * ONAP
* ================================================================================
* Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
@@ -20,20 +20,25 @@
package org.onap.policy.drools.core.lock;
-import lombok.Getter;
-import org.onap.policy.common.utils.services.OrderedServiceImpl;
-
-public class PolicyResourceLockFeatureApiConstants {
+/**
+ * Callback invoked when a lock is granted or lost.
+ *
+ * <p/>
+ * Note: these methods may or may not be invoked by the thread that requested the lock.
+ */
+public interface LockCallback {
/**
- * 'FeatureAPI.impl.getList()' returns an ordered list of objects implementing the
- * 'FeatureAPI' interface.
+ * Called to indicate that a lock has been granted.
+ *
+ * @param lock lock that has been granted
*/
- @Getter
- private static final OrderedServiceImpl<PolicyResourceLockFeatureApi> impl =
- new OrderedServiceImpl<>(PolicyResourceLockFeatureApi.class);
+ void lockAvailable(Lock lock);
- private PolicyResourceLockFeatureApiConstants() {
- // do nothing
- }
+ /**
+ * Called to indicate that a lock is permanently unavailable (e.g., lost, expired).
+ *
+ * @param lock lock that has been lost
+ */
+ void lockUnavailable(Lock lock);
}
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockImpl.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockImpl.java
new file mode 100644
index 00000000..9596dbe8
--- /dev/null
+++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockImpl.java
@@ -0,0 +1,158 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.core.lock;
+
+import java.io.Serializable;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.Setter;
+import lombok.ToString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Lock implementation.
+ */
+@Getter
+@Setter
+@ToString(exclude = {"callback"})
+public class LockImpl implements Lock, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger logger = LoggerFactory.getLogger(LockImpl.class);
+
+ private LockState state;
+ private final String resourceId;
+ private final String ownerKey;
+ private transient LockCallback callback;
+ private int holdSec;
+
+ /**
+ * Constructs the object.
+ */
+ public LockImpl() {
+ this.state = LockState.UNAVAILABLE;
+ this.resourceId = null;
+ this.ownerKey = null;
+ this.callback = null;
+ this.holdSec = 0;
+ }
+
+ /**
+ * Constructs the object.
+ *
+ * @param state the initial lock state
+ * @param resourceId identifier of the resource to be locked
+ * @param ownerKey information identifying the owner requesting the lock
+ * @param holdSec amount of time, in seconds, for which the lock should be held once
+ * it has been granted, after which it will automatically be released
+ * @param callback callback to be invoked once the lock is granted, or subsequently
+ * lost; must not be {@code null}
+ */
+ public LockImpl(@NonNull LockState state, @NonNull String resourceId, @NonNull String ownerKey, int holdSec,
+ @NonNull LockCallback callback) {
+
+ if (holdSec < 0) {
+ throw new IllegalArgumentException("holdSec is negative");
+ }
+
+ this.state = state;
+ this.resourceId = resourceId;
+ this.ownerKey = ownerKey;
+ this.callback = callback;
+ this.holdSec = holdSec;
+ }
+
+ @Override
+ public boolean isActive() {
+ return (getState() == LockState.ACTIVE);
+ }
+
+ @Override
+ public boolean isUnavailable() {
+ return (getState() == LockState.UNAVAILABLE);
+ }
+
+ @Override
+ public boolean isWaiting() {
+ return (getState() == LockState.WAITING);
+ }
+
+ /**
+ * This method always succeeds, unless the lock is already unavailable.
+ */
+ @Override
+ public synchronized boolean free() {
+ if (isUnavailable()) {
+ return false;
+ }
+
+ logger.info("releasing lock: {}", this);
+ setState(LockState.UNAVAILABLE);
+
+ return true;
+ }
+
+ /**
+ * This method always succeeds, unless the lock is already unavailable.
+ */
+ @Override
+ public void extend(int holdSec, LockCallback callback) {
+ synchronized (this) {
+ if (isUnavailable()) {
+ return;
+ }
+
+ logger.info("lock granted: {}", this);
+ setState(LockState.ACTIVE);
+ setHoldSec(holdSec);
+ setCallback(callback);
+ }
+
+ notifyAvailable();
+ }
+
+ /**
+ * Invokes the {@link LockCallback#lockAvailable(Lock)}, <i>from the current
+ * thread</i>. Note: subclasses may choose to invoke the callback from other threads.
+ */
+ public void notifyAvailable() {
+ try {
+ callback.lockAvailable(this);
+
+ } catch (RuntimeException e) {
+ logger.warn("lock callback threw an exception", e);
+ }
+ }
+
+ /**
+ * Invokes the {@link LockCallback#lockUnavailable(Lock)}, <i>from the current
+ * thread</i>. Note: subclasses may choose to invoke the callback from other threads.
+ */
+ public void notifyUnavailable() {
+ try {
+ callback.lockUnavailable(this);
+
+ } catch (RuntimeException e) {
+ logger.warn("lock callback threw an exception", e);
+ }
+ }
+}
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockState.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockState.java
new file mode 100644
index 00000000..41699ce6
--- /dev/null
+++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockState.java
@@ -0,0 +1,43 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.core.lock;
+
+/**
+ * States of a Lock.
+ */
+
+public enum LockState {
+
+ /**
+ * Waiting for the lock request to complete.
+ */
+ WAITING,
+
+ /**
+ * This lock currently holds the resource.
+ */
+ ACTIVE,
+
+ /**
+ * The resource is no longer available to the lock.
+ */
+ UNAVAILABLE
+}
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApi.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApi.java
deleted file mode 100644
index b7968486..00000000
--- a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApi.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * api-resource-locks
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.core.lock;
-
-import org.onap.policy.common.utils.services.OrderedService;
-
-/**
- * Resource locks. Each lock has an "owner", which is intended to be unique across a
- * single instance of a running PolicyEngine.
- *
- * <p>This interface provides a way to invoke optional features at various points in the
- * code. At appropriate points in the application, the code iterates through this list,
- * invoking these optional methods.
- *
- * <p>Implementers may choose to implement a level of locking appropriate to the application.
- * For instance, they may choose to implement an engine-wide locking scheme, or they may
- * choose to implement a global locking scheme (e.g., through a shared DB).
- */
-public interface PolicyResourceLockFeatureApi extends OrderedService {
-
- /**
- * Result of a requested operation.
- */
- public enum OperResult {
-
- /**
- * The implementer accepted the request; no additional locking logic should be
- * performed.
- */
- OPER_ACCEPTED,
-
- /**
- * The implementer denied the request; no additional locking logic should be
- * performed.
- */
- OPER_DENIED,
-
-
- /**
- * The implementer did not handle the request; additional locking logic <i>should
- * be</i> performed.
- */
- OPER_UNHANDLED
- }
-
- /**
- * This method is called before a lock is acquired on a resource.
- *
- * @param resourceId resource id
- * @param owner owner
- * @param holdSec the amount of time, in seconds, that the lock should be held
- * @return the result, where <b>OPER_DENIED</b> indicates that the lock is currently
- * held by another owner
- */
- public default OperResult beforeLock(String resourceId, String owner, int holdSec) {
- return OperResult.OPER_UNHANDLED;
- }
-
- /**
- * This method is called after a lock for a resource has been acquired or denied.
- *
- * @param resourceId resource id
- * @param owner owner
- * @param locked {@code true} if the lock was acquired, {@code false} if it was denied
- * @return {@code true} if the implementer handled the request, {@code false}
- * otherwise
- */
- public default boolean afterLock(String resourceId, String owner, boolean locked) {
- return false;
- }
-
- /**
- * This method is called before a lock is refreshed on a resource. It may be invoked
- * repeatedly to extend the time that a lock is held.
- *
- * @param resourceId resource id
- * @param owner owner
- * @param holdSec the amount of time, in seconds, that the lock should be held
- * @return the result, where <b>OPER_DENIED</b> indicates that the resource is not
- * currently locked by the given owner
- */
- public default OperResult beforeRefresh(String resourceId, String owner, int holdSec) {
- return OperResult.OPER_UNHANDLED;
- }
-
- /**
- * This method is called after a lock for a resource has been refreshed (or after the
- * refresh has been denied).
- *
- * @param resourceId resource id
- * @param owner owner
- * @param locked {@code true} if the lock was acquired, {@code false} if it was denied
- * @return {@code true} if the implementer handled the request, {@code false}
- * otherwise
- */
- public default boolean afterRefresh(String resourceId, String owner, boolean locked) {
- return false;
- }
-
- /**
- * This method is called before a lock on a resource is released.
- *
- * @param resourceId resource id
- * @param owner owner
- * @return the result, where <b>OPER_DENIED</b> indicates that the lock is not
- * currently held by the given owner
- */
- public default OperResult beforeUnlock(String resourceId, String owner) {
- return OperResult.OPER_UNHANDLED;
- }
-
- /**
- * This method is called after a lock on a resource is released.
- *
- * @param resourceId resource id
- * @param owner owner
- * @param unlocked {@code true} if the lock was released, {@code false} if the owner
- * did not have a lock on the resource
- * @return {@code true} if the implementer handled the request, {@code false}
- * otherwise
- */
- public default boolean afterUnlock(String resourceId, String owner, boolean unlocked) {
- return false;
- }
-
- /**
- * This method is called before a check is made to determine if a resource is locked.
- *
- * @param resourceId resource id
- * @return the result, where <b>OPER_ACCEPTED</b> indicates that the resource is
- * locked, while <b>OPER_DENIED</b> indicates that it is not
- */
- public default OperResult beforeIsLocked(String resourceId) {
- return OperResult.OPER_UNHANDLED;
- }
-
- /**
- * This method is called before a check is made to determine if a particular owner
- * holds the lock on a resource.
- *
- * @param resourceId resource id
- * @param owner owner
- * @return the result, where <b>OPER_ACCEPTED</b> indicates that the resource is
- * locked by the given owner, while <b>OPER_DENIED</b> indicates that it is
- * not
- */
- public default OperResult beforeIsLockedBy(String resourceId, String owner) {
- return OperResult.OPER_UNHANDLED;
- }
-}
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockManager.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockManager.java
index 0e73eac1..bbf7d229 100644
--- a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockManager.java
+++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockManager.java
@@ -20,213 +20,36 @@
package org.onap.policy.drools.core.lock;
-import java.util.List;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi.OperResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.onap.policy.common.capabilities.Lockable;
+import org.onap.policy.common.capabilities.Startable;
/**
- * Manager of resource locks. Checks for API implementers.
+ * Manager of resource locks.
*/
-public class PolicyResourceLockManager extends SimpleLockManager {
-
- private static Logger logger = LoggerFactory.getLogger(PolicyResourceLockManager.class);
-
- /**
- * Used by junit tests.
- */
- protected PolicyResourceLockManager() {
- super();
- }
-
- /**
- * Get instance.
- *
- * @return the manager singleton
- */
- public static PolicyResourceLockManager getInstance() {
- return Singleton.instance;
- }
-
- @Override
- public boolean lock(String resourceId, String owner, int holdSec) {
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
- if (owner == null) {
- throw makeNullArgException(MSG_NULL_OWNER);
- }
-
-
- return doBoolIntercept(impl -> impl.beforeLock(resourceId, owner, holdSec), () -> {
-
- // implementer didn't do the work - defer to the superclass
- boolean locked = super.lock(resourceId, owner, holdSec);
-
- doIntercept(false, impl -> impl.afterLock(resourceId, owner, locked));
-
- return locked;
- });
- }
-
- @Override
- public boolean refresh(String resourceId, String owner, int holdSec) {
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
- if (owner == null) {
- throw makeNullArgException(MSG_NULL_OWNER);
- }
-
-
- return doBoolIntercept(impl -> impl.beforeRefresh(resourceId, owner, holdSec), () -> {
-
- // implementer didn't do the work - defer to the superclass
- boolean refreshed = super.refresh(resourceId, owner, holdSec);
-
- doIntercept(false, impl -> impl.afterRefresh(resourceId, owner, refreshed));
-
- return refreshed;
- });
- }
-
- @Override
- public boolean unlock(String resourceId, String owner) {
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
- if (owner == null) {
- throw makeNullArgException(MSG_NULL_OWNER);
- }
-
-
- return doBoolIntercept(impl -> impl.beforeUnlock(resourceId, owner), () -> {
-
- // implementer didn't do the work - defer to the superclass
- boolean unlocked = super.unlock(resourceId, owner);
-
- doIntercept(false, impl -> impl.afterUnlock(resourceId, owner, unlocked));
-
- return unlocked;
- });
- }
-
- /**
- * Is locked.
- *
- * @throws IllegalArgumentException if the resourceId is {@code null}
- */
- @Override
- public boolean isLocked(String resourceId) {
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
-
- return doBoolIntercept(impl -> impl.beforeIsLocked(resourceId), () ->
-
- // implementer didn't do the work - defer to the superclass
- super.isLocked(resourceId)
- );
- }
+public interface PolicyResourceLockManager extends Startable, Lockable {
/**
- * Is locked by.
+ * Requests a lock on a resource. Typically, the lock is not immediately granted,
+ * though a "lock" object is always returned. Once the lock has been granted (or
+ * denied), the callback will be invoked to indicate the result.
*
- * @throws IllegalArgumentException if the resourceId or owner is {@code null}
- */
- @Override
- public boolean isLockedBy(String resourceId, String owner) {
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
- if (owner == null) {
- throw makeNullArgException(MSG_NULL_OWNER);
- }
-
- return doBoolIntercept(impl -> impl.beforeIsLockedBy(resourceId, owner), () ->
-
- // implementer didn't do the work - defer to the superclass
- super.isLockedBy(resourceId, owner)
- );
- }
-
- /**
- * Applies a function to each implementer of the lock feature. Returns as soon as one
- * of them returns a result other than <b>OPER_UNHANDLED</b>. If they all return
- * <b>OPER_UNHANDLED</b>, then it returns the result of applying the default function.
+ * <p/>
+ * Notes:
+ * <dl>
+ * <li>The callback may be invoked <i>before</i> this method returns</li>
+ * <li>The implementation need not honor waitForLock={@code true}</li>
+ * </dl>
*
- * @param interceptFunc intercept function
- * @param defaultFunc default function
- * @return {@code true} if success, {@code false} otherwise
- */
- private boolean doBoolIntercept(Function<PolicyResourceLockFeatureApi, OperResult> interceptFunc,
- Supplier<Boolean> defaultFunc) {
-
- OperResult result = doIntercept(OperResult.OPER_UNHANDLED, interceptFunc);
- if (result != OperResult.OPER_UNHANDLED) {
- return (result == OperResult.OPER_ACCEPTED);
- }
-
- return defaultFunc.get();
- }
-
- /**
- * Applies a function to each implementer of the lock feature. Returns as soon as one
- * of them returns a non-null value.
- *
- * @param continueValue if the implementer returns this value, then it continues to
- * check addition implementers
- * @param func function to be applied to the implementers
- * @return first non-null value returned by an implementer, <i>continueValue</i> if
- * they all returned <i>continueValue</i>
- */
- private <T> T doIntercept(T continueValue, Function<PolicyResourceLockFeatureApi, T> func) {
-
- for (PolicyResourceLockFeatureApi impl : getImplementers()) {
- try {
- T result = func.apply(impl);
- if (result != continueValue) {
- return result;
- }
-
- } catch (RuntimeException e) {
- logger.warn("lock feature {} threw an exception", impl, e);
- }
- }
-
- return continueValue;
- }
-
- // these may be overridden by junit tests
-
- /**
- * Get implementers.
- *
- * @return the list of feature implementers
- */
- protected List<PolicyResourceLockFeatureApi> getImplementers() {
- return PolicyResourceLockFeatureApiConstants.getImpl().getList();
- }
-
- /**
- * Initialization-on-demand holder idiom.
- */
- private static class Singleton {
-
- private static final PolicyResourceLockManager instance = new PolicyResourceLockManager();
-
- /**
- * Not invoked.
- */
- private Singleton() {
- super();
- }
- }
+ * @param resourceId identifier of the resource to be locked
+ * @param ownerKey information identifying the owner requesting the lock
+ * @param holdSec amount of time, in seconds, for which the lock should be held once
+ * it has been granted, after which it will automatically be released
+ * @param callback callback to be invoked once the lock is granted, or subsequently
+ * lost; must not be {@code null}
+ * @param waitForLock {@code true} to wait for the lock, if it is currently locked,
+ * {@code false} otherwise
+ * @return a new lock
+ */
+ public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback,
+ boolean waitForLock);
}
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/SimpleLockManager.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/SimpleLockManager.java
deleted file mode 100644
index 427fbbc6..00000000
--- a/policy-core/src/main/java/org/onap/policy/drools/core/lock/SimpleLockManager.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.core.lock;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import org.onap.policy.common.utils.time.CurrentTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple lock manager. Callbacks are ignored. Does not redirect to lock feature
- * implementers.
- */
-public class SimpleLockManager {
-
- protected static Logger logger = LoggerFactory.getLogger(SimpleLockManager.class);
-
- // messages used in exceptions
- public static final String MSG_NULL_RESOURCE_ID = "null resourceId";
- public static final String MSG_NULL_OWNER = "null owner";
-
- /**
- * Used to access the current time. May be overridden by junit tests.
- */
- private static CurrentTime currentTime = new CurrentTime();
-
- /**
- * Used to synchronize updates to {@link #resource2data} and {@link #locks}.
- */
- private final Object locker = new Object();
-
- /**
- * Maps a resource to its lock data. Lock data is stored in both this and in
- * {@link #locks}.
- */
- private final Map<String, Data> resource2data = new HashMap<>();
-
- /**
- * Lock data, sorted by expiration time. Lock data is stored in both this and in
- * {@link #resource2data}. Whenever a lock operation is performed, this structure is
- * examined and any expired locks are removed; thus no timer threads are needed to
- * remove expired locks.
- */
- private final SortedSet<Data> locks = new TreeSet<>();
-
- /**
- * Constructor.
- */
- public SimpleLockManager() {
- super();
- }
-
- /**
- * Attempts to lock a resource, rejecting the lock if it is already owned, even if
- * it's the same owner; the same owner can use {@link #refresh(String, String, int)
- * refresh()}, instead, to extend a lock on a resource.
- *
- * @param resourceId resource id
- * @param owner owner
- * @param holdSec the amount of time, in seconds, that the lock should be held
- * @return {@code true} if locked, {@code false} if the resource is already locked by
- * a different owner
- * @throws IllegalArgumentException if the resourceId or owner is {@code null}
- */
- public boolean lock(String resourceId, String owner, int holdSec) {
-
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
- if (owner == null) {
- throw makeNullArgException(MSG_NULL_OWNER);
- }
-
- boolean locked = false;
-
- synchronized (locker) {
- cleanUpLocks();
-
- if (!resource2data.containsKey(resourceId)) {
- Data data = new Data(owner, resourceId, currentTime.getMillis() + TimeUnit.SECONDS.toMillis(holdSec));
- resource2data.put(resourceId, data);
- locks.add(data);
- locked = true;
- }
- }
-
- logger.info("lock {} for resource {} owner {}", locked, resourceId, owner);
-
- return locked;
- }
-
- /**
- * Attempts to refresh a lock on a resource.
- *
- * @param resourceId resource id
- * @param owner owner
- * @param holdSec the amount of time, in seconds, that the lock should be held
- * @return {@code true} if locked, {@code false} if the resource is not currently
- * locked by the given owner
- * @throws IllegalArgumentException if the resourceId or owner is {@code null}
- */
- public boolean refresh(String resourceId, String owner, int holdSec) {
-
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
- if (owner == null) {
- throw makeNullArgException(MSG_NULL_OWNER);
- }
-
- boolean refreshed = false;
-
- synchronized (locker) {
- cleanUpLocks();
-
- Data existingLock = resource2data.get(resourceId);
- if (existingLock != null && existingLock.getOwner().equals(owner)) {
- // MUST remove the existing lock from the set
- locks.remove(existingLock);
-
- refreshed = true;
-
- Data data = new Data(owner, resourceId, currentTime.getMillis() + TimeUnit.SECONDS.toMillis(holdSec));
- resource2data.put(resourceId, data);
- locks.add(data);
- }
- }
-
- logger.info("refresh lock {} for resource {} owner {}", refreshed, resourceId, owner);
-
- return refreshed;
- }
-
- /**
- * Unlocks a resource.
- *
- * @param resourceId resource id
- * @param owner owner
- * @return {@code true} if unlocked, {@code false} if the given owner does not
- * currently hold a lock on the resource
- * @throws IllegalArgumentException if the resourceId or owner is {@code null}
- */
- public boolean unlock(String resourceId, String owner) {
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
- if (owner == null) {
- throw makeNullArgException(MSG_NULL_OWNER);
- }
-
- Data data;
-
- synchronized (locker) {
- cleanUpLocks();
-
- if ((data = resource2data.get(resourceId)) != null) {
- if (owner.equals(data.getOwner())) {
- resource2data.remove(resourceId);
- locks.remove(data);
-
- } else {
- data = null;
- }
- }
- }
-
- boolean unlocked = (data != null);
- logger.info("unlock resource {} owner {} = {}", resourceId, owner, unlocked);
-
- return unlocked;
- }
-
- /**
- * Determines if a resource is locked by anyone.
- *
- * @param resourceId resource id
- * @return {@code true} if the resource is locked, {@code false} otherwise
- * @throws IllegalArgumentException if the resourceId is {@code null}
- */
- public boolean isLocked(String resourceId) {
-
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
- boolean locked;
-
- synchronized (locker) {
- cleanUpLocks();
-
- locked = resource2data.containsKey(resourceId);
- }
-
- logger.debug("resource {} isLocked = {}", resourceId, locked);
-
- return locked;
- }
-
- /**
- * Determines if a resource is locked by a particular owner.
- *
- * @param resourceId resource id
- * @param owner owner
- * @return {@code true} if the resource is locked, {@code false} otherwise
- * @throws IllegalArgumentException if the resourceId or owner is {@code null}
- */
- public boolean isLockedBy(String resourceId, String owner) {
-
- if (resourceId == null) {
- throw makeNullArgException(MSG_NULL_RESOURCE_ID);
- }
-
- if (owner == null) {
- throw makeNullArgException(MSG_NULL_OWNER);
- }
-
- Data data;
-
- synchronized (locker) {
- cleanUpLocks();
-
- data = resource2data.get(resourceId);
- }
-
- boolean locked = (data != null && owner.equals(data.getOwner()));
- logger.debug("resource {} isLockedBy {} = {}", resourceId, owner, locked);
-
- return locked;
- }
-
- /**
- * Releases expired locks.
- */
- private void cleanUpLocks() {
- long tcur = currentTime.getMillis();
-
- synchronized (locker) {
- Iterator<Data> it = locks.iterator();
- while (it.hasNext()) {
- Data data = it.next();
- if (data.getExpirationMs() <= tcur) {
- it.remove();
- resource2data.remove(data.getResource());
- } else {
- break;
- }
- }
- }
- }
-
- /**
- * Makes an exception for when an argument is {@code null}.
- *
- * @param msg exception message
- * @return a new Exception
- */
- public static IllegalArgumentException makeNullArgException(String msg) {
- return new IllegalArgumentException(msg);
- }
-
- /**
- * Data for a single Lock. Sorts by expiration time, then resource, and
- * then owner.
- */
- protected static class Data implements Comparable<Data> {
-
- /**
- * Owner of the lock.
- */
- private final String owner;
-
- /**
- * Resource that is locked.
- */
- private final String resource;
-
- /**
- * Time when the lock will expire, in milliseconds.
- */
- private final long texpireMs;
-
- /**
- * Constructor.
- *
- * @param resource resource
- * @param owner owner
- * @param texpireMs time expire in milliseconds
- */
- public Data(String owner, String resource, long texpireMs) {
- this.owner = owner;
- this.resource = resource;
- this.texpireMs = texpireMs;
- }
-
- public String getOwner() {
- return owner;
- }
-
- public String getResource() {
- return resource;
- }
-
- public long getExpirationMs() {
- return texpireMs;
- }
-
- @Override
- public int compareTo(Data data) {
- int diff = Long.compare(texpireMs, data.texpireMs);
- if (diff == 0) {
- diff = resource.compareTo(data.resource);
- }
- if (diff == 0) {
- diff = owner.compareTo(data.owner);
- }
- return diff;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((owner == null) ? 0 : owner.hashCode());
- result = prime * result + ((resource == null) ? 0 : resource.hashCode());
- result = prime * result + (int) (texpireMs ^ (texpireMs >>> 32));
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- Data other = (Data) obj;
- if (owner == null) {
- if (other.owner != null) {
- return false;
- }
- } else if (!owner.equals(other.owner)) {
- return false;
- }
- if (resource == null) {
- if (other.resource != null) {
- return false;
- }
- } else if (!resource.equals(other.resource)) {
- return false;
- }
- return (texpireMs == other.texpireMs);
- }
- }
-}
diff --git a/policy-core/src/test/java/org/onap/policy/drools/core/lock/AlwaysFailLockTest.java b/policy-core/src/test/java/org/onap/policy/drools/core/lock/AlwaysFailLockTest.java
new file mode 100644
index 00000000..ce4ca5fd
--- /dev/null
+++ b/policy-core/src/test/java/org/onap/policy/drools/core/lock/AlwaysFailLockTest.java
@@ -0,0 +1,106 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.core.lock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AlwaysFailLockTest {
+ private static final String RESOURCE = "hello";
+ private static final String OWNER_KEY = "world";
+ private static final int HOLD_SEC = 10;
+ private static final int HOLD_SEC2 = 10;
+
+ private LockCallback callback;
+ private AlwaysFailLock lock;
+
+ /**
+ * Populates {@link #lock}.
+ */
+ @Before
+ public void setUp() {
+ callback = mock(LockCallback.class);
+
+ lock = new AlwaysFailLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback);
+ }
+
+ @Test
+ public void testSerializable() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(lock);
+ }
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ try (ObjectInputStream ois = new ObjectInputStream(bais)) {
+ lock = (AlwaysFailLock) ois.readObject();
+ }
+
+ assertEquals(LockState.UNAVAILABLE, lock.getState());
+ assertEquals(RESOURCE, lock.getResourceId());
+ assertEquals(OWNER_KEY, lock.getOwnerKey());
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+
+ // these fields are transient
+ assertNull(lock.getCallback());
+ }
+
+ @Test
+ public void testAlwaysFailLockNoArgs() {
+ // verify that no-arg constructor doesn't throw an exception
+ new AlwaysFailLock();
+ }
+
+ @Test
+ public void testAlwaysFailLock() {
+ assertTrue(lock.isUnavailable());
+ assertEquals(RESOURCE, lock.getResourceId());
+ assertEquals(OWNER_KEY, lock.getOwnerKey());
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+ assertSame(callback, lock.getCallback());
+ }
+
+ @Test
+ public void testFree() {
+ assertFalse(lock.free());
+ assertTrue(lock.isUnavailable());
+ }
+
+ @Test
+ public void testExtend() {
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+
+ assertEquals(HOLD_SEC2, lock.getHoldSec());
+ assertSame(callback2, lock.getCallback());
+ }
+}
diff --git a/policy-core/src/test/java/org/onap/policy/drools/core/lock/LockImplTest.java b/policy-core/src/test/java/org/onap/policy/drools/core/lock/LockImplTest.java
new file mode 100644
index 00000000..aab04dc4
--- /dev/null
+++ b/policy-core/src/test/java/org/onap/policy/drools/core/lock/LockImplTest.java
@@ -0,0 +1,261 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.core.lock;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LockImplTest {
+ private static final LockState STATE = LockState.WAITING;
+ private static final String RESOURCE = "hello";
+ private static final String OWNER_KEY = "world";
+ private static final int HOLD_SEC = 10;
+ private static final int HOLD_SEC2 = 20;
+ private static final String EXPECTED_EXCEPTION = "expected exception";
+
+ private LockCallback callback;
+ private LockImpl lock;
+
+ /**
+ * Populates {@link #lock}.
+ */
+ @Before
+ public void setUp() {
+ callback = mock(LockCallback.class);
+
+ lock = new LockImpl(STATE, RESOURCE, OWNER_KEY, HOLD_SEC, callback);
+ }
+
+ @Test
+ public void testSerializable() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(lock);
+ }
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ try (ObjectInputStream ois = new ObjectInputStream(bais)) {
+ lock = (LockImpl) ois.readObject();
+ }
+
+ assertEquals(STATE, lock.getState());
+ assertEquals(RESOURCE, lock.getResourceId());
+ assertEquals(OWNER_KEY, lock.getOwnerKey());
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+
+ // these fields are transient
+ assertNull(lock.getCallback());
+ }
+
+ @Test
+ public void testLockImplNoArgs() {
+ // use no-arg constructor
+ lock = new LockImpl();
+ assertEquals(LockState.UNAVAILABLE, lock.getState());
+ assertNull(lock.getResourceId());
+ assertNull(lock.getOwnerKey());
+ assertNull(lock.getCallback());
+ assertEquals(0, lock.getHoldSec());
+ }
+
+ @Test
+ public void testLockImpl_testGetters() {
+ assertEquals(STATE, lock.getState());
+ assertEquals(RESOURCE, lock.getResourceId());
+ assertEquals(OWNER_KEY, lock.getOwnerKey());
+ assertSame(callback, lock.getCallback());
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+
+ // test illegal args
+ assertThatThrownBy(() -> new LockImpl(null, RESOURCE, OWNER_KEY, HOLD_SEC, callback))
+ .hasMessageContaining("state");
+ assertThatThrownBy(() -> new LockImpl(STATE, null, OWNER_KEY, HOLD_SEC, callback))
+ .hasMessageContaining("resourceId");
+ assertThatThrownBy(() -> new LockImpl(STATE, RESOURCE, null, HOLD_SEC, callback))
+ .hasMessageContaining("ownerKey");
+ assertThatIllegalArgumentException().isThrownBy(() -> new LockImpl(STATE, RESOURCE, OWNER_KEY, -1, callback))
+ .withMessageContaining("holdSec");
+ assertThatThrownBy(() -> new LockImpl(STATE, RESOURCE, OWNER_KEY, HOLD_SEC, null))
+ .hasMessageContaining("callback");
+ }
+
+ @Test
+ public void testFree() {
+ assertTrue(lock.free());
+ assertTrue(lock.isUnavailable());
+
+ // should fail this time
+ assertFalse(lock.free());
+ assertTrue(lock.isUnavailable());
+
+ // no call-backs should have been invoked
+ verify(callback, never()).lockAvailable(any());
+ verify(callback, never()).lockUnavailable(any());
+ }
+
+ @Test
+ public void testExtend() {
+ lock.setState(LockState.WAITING);
+
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC2, callback2);
+ assertTrue(lock.isActive());
+ assertEquals(HOLD_SEC2, lock.getHoldSec());
+ assertSame(callback2, lock.getCallback());
+ verify(callback2).lockAvailable(lock);
+ verify(callback2, never()).lockUnavailable(any());
+
+ // first call-back should never have been invoked
+ verify(callback, never()).lockAvailable(any());
+ verify(callback, never()).lockUnavailable(any());
+
+ // extend again
+ LockCallback callback3 = mock(LockCallback.class);
+ lock.extend(HOLD_SEC, callback3);
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+ assertSame(callback3, lock.getCallback());
+ assertTrue(lock.isActive());
+ verify(callback3).lockAvailable(lock);
+ verify(callback3, never()).lockUnavailable(any());
+
+ // other call-backs should not have been invoked again
+ verify(callback, never()).lockAvailable(any());
+ verify(callback, never()).lockUnavailable(any());
+
+ verify(callback2).lockAvailable(any());
+ verify(callback2, never()).lockUnavailable(any());
+
+ assertTrue(lock.free());
+
+ // extend after free - should fail
+ lock.extend(HOLD_SEC2, callback);
+ assertTrue(lock.isUnavailable());
+
+ // call-backs should not have been invoked again
+ verify(callback, never()).lockAvailable(any());
+ verify(callback, never()).lockUnavailable(any());
+
+ verify(callback2).lockAvailable(any());
+ verify(callback2, never()).lockUnavailable(any());
+
+ verify(callback3).lockAvailable(lock);
+ verify(callback3, never()).lockUnavailable(any());
+ }
+
+ @Test
+ public void testNotifyAvailable() {
+ lock.notifyAvailable();
+
+ verify(callback).lockAvailable(any());
+ verify(callback, never()).lockUnavailable(any());
+ }
+
+ @Test
+ public void testNotifyAvailable_Ex() {
+ doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(callback).lockAvailable(any());
+ doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(callback).lockUnavailable(any());
+
+ // should not throw an exception
+ lock.notifyAvailable();
+ }
+
+ @Test
+ public void testNotifyUnavailable() {
+ lock.notifyUnavailable();
+
+ verify(callback, never()).lockAvailable(any());
+ verify(callback).lockUnavailable(any());
+ }
+
+ @Test
+ public void testNotifyUnavailable_Ex() {
+ doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(callback).lockAvailable(any());
+ doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(callback).lockUnavailable(any());
+
+ // should not throw an exception
+ lock.notifyUnavailable();
+ }
+
+ @Test
+ public void testSetState_testIsActive_testIsWaiting_testIsUnavailable() {
+ lock.setState(LockState.WAITING);
+ assertEquals(LockState.WAITING, lock.getState());
+ assertFalse(lock.isActive());
+ assertFalse(lock.isUnavailable());
+ assertTrue(lock.isWaiting());
+
+ lock.setState(LockState.ACTIVE);
+ assertEquals(LockState.ACTIVE, lock.getState());
+ assertTrue(lock.isActive());
+ assertFalse(lock.isUnavailable());
+ assertFalse(lock.isWaiting());
+
+ lock.setState(LockState.UNAVAILABLE);
+ assertEquals(LockState.UNAVAILABLE, lock.getState());
+ assertFalse(lock.isActive());
+ assertTrue(lock.isUnavailable());
+ assertFalse(lock.isWaiting());
+ }
+
+ @Test
+ public void testSetHoldSec() {
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+
+ lock.setHoldSec(HOLD_SEC2);
+ assertEquals(HOLD_SEC2, lock.getHoldSec());
+ }
+
+ @Test
+ public void testSetCallback() {
+ assertSame(callback, lock.getCallback());
+
+ LockCallback callback2 = mock(LockCallback.class);
+ lock.setCallback(callback2);
+ assertSame(callback2, lock.getCallback());
+ }
+
+ @Test
+ public void testToString() {
+ String text = lock.toString();
+
+ assertNotNull(text);
+ assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback").doesNotContain("succeed");
+ }
+}
diff --git a/policy-core/src/test/java/org/onap/policy/drools/core/lock/LockTest.java b/policy-core/src/test/java/org/onap/policy/drools/core/lock/LockTest.java
deleted file mode 100644
index 5a88b02f..00000000
--- a/policy-core/src/test/java/org/onap/policy/drools/core/lock/LockTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.core.lock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.drools.core.lock.Lock.RemoveResult;
-import org.onap.policy.drools.utils.Pair;
-
-public class LockTest {
-
- private static final String OWNER = "my.owner";
- private static final String OWNER2 = "another.owner";
- private static final String OWNER3 = "third.owner";
-
- private static final Integer ITEM2 = 10;
- private static final Integer ITEM3 = 20;
-
- private Lock<Integer> lock;
- private Pair<String, Integer> newOwner;
-
- @Before
- public void setUp() {
- lock = new Lock<>(OWNER);
- newOwner = new Pair<>(null, null);
- }
-
-
- @Test
- public void testLock() {
- assertEquals(OWNER, lock.getOwner());
- }
-
- @Test
- public void testGetOwner() {
- assertEquals(OWNER, lock.getOwner());
- }
-
- @Test
- public void testAdd() {
- assertTrue(lock.add(OWNER2, ITEM2));
- assertTrue(lock.add(OWNER3, ITEM3));
-
- // attempt to re-add owner2 with the same item - should fail
- assertFalse(lock.add(OWNER2, ITEM2));
-
- // attempt to re-add owner2 with a different item - should fail
- assertFalse(lock.add(OWNER2, ITEM3));
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testAdd_ArgEx() {
- lock.add(OWNER2, null);
- }
-
- @Test
- public void testAdd_AlreadyOwner() {
- assertFalse(lock.add(OWNER, ITEM2));
- }
-
- @Test
- public void testAdd_AlreadyInQueue() {
- lock.add(OWNER2, ITEM2);
-
- assertFalse(lock.add(OWNER2, ITEM2));
- }
-
- @Test
- public void testRemoveRequester_Owner_QueueEmpty() {
- assertEquals(RemoveResult.UNLOCKED, lock.removeRequester(OWNER, newOwner));
- }
-
- @Test
- public void testRemoveRequester_Owner_QueueHasOneItem() {
- lock.add(OWNER2, ITEM2);
-
- assertEquals(RemoveResult.RELOCKED, lock.removeRequester(OWNER, newOwner));
- assertEquals(OWNER2, newOwner.first());
- assertEquals(ITEM2, newOwner.second());
-
- assertEquals(RemoveResult.UNLOCKED, lock.removeRequester(OWNER2, newOwner));
- }
-
- @Test
- public void testRemoveRequester_Owner_QueueHasMultipleItems() {
- lock.add(OWNER2, ITEM2);
- lock.add(OWNER3, ITEM3);
-
- assertEquals(RemoveResult.RELOCKED, lock.removeRequester(OWNER, newOwner));
- assertEquals(OWNER2, newOwner.first());
- assertEquals(ITEM2, newOwner.second());
-
- assertEquals(RemoveResult.RELOCKED, lock.removeRequester(OWNER2, newOwner));
- assertEquals(OWNER3, newOwner.first());
- assertEquals(ITEM3, newOwner.second());
-
- assertEquals(RemoveResult.UNLOCKED, lock.removeRequester(OWNER3, newOwner));
- }
-
- @Test
- public void testRemoveRequester_InQueue() {
- lock.add(OWNER2, ITEM2);
-
- assertEquals(RemoveResult.REMOVED, lock.removeRequester(OWNER2, newOwner));
- }
-
- @Test
- public void testRemoveRequester_NeitherOwnerNorInQueue() {
- assertEquals(RemoveResult.NOT_FOUND, lock.removeRequester(OWNER2, newOwner));
- }
-
-}
diff --git a/policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiTest.java b/policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiTest.java
deleted file mode 100644
index 999ae50f..00000000
--- a/policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureApiTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * api-resource-locks
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.core.lock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi.OperResult;
-
-public class PolicyResourceLockFeatureApiTest {
-
- private static final String RESOURCE_ID = "the resource";
- private static final String OWNER = "the owner";
-
- private PolicyResourceLockFeatureApi api;
-
- /**
- * set up.
- */
- @Before
- public void setUp() {
- api = new PolicyResourceLockFeatureApi() {
- @Override
- public int getSequenceNumber() {
- return 0;
- }
- };
- }
-
- @Test
- public void testBeforeLock() {
- assertEquals(OperResult.OPER_UNHANDLED, api.beforeLock(RESOURCE_ID, OWNER, 0));
- }
-
- @Test
- public void testAfterLock() {
- assertFalse(api.afterLock(RESOURCE_ID, OWNER, true));
- assertFalse(api.afterLock(RESOURCE_ID, OWNER, false));
- }
-
- @Test
- public void testBeforeRefresh() {
- assertEquals(OperResult.OPER_UNHANDLED, api.beforeRefresh(RESOURCE_ID, OWNER, 0));
- }
-
- @Test
- public void testAfterRefresh() {
- assertFalse(api.afterRefresh(RESOURCE_ID, OWNER, true));
- assertFalse(api.afterRefresh(RESOURCE_ID, OWNER, false));
- }
-
- @Test
- public void testBeforeUnlock() {
- assertEquals(OperResult.OPER_UNHANDLED, api.beforeUnlock(RESOURCE_ID, OWNER));
- }
-
- @Test
- public void testAfterUnlock() {
- assertFalse(api.afterUnlock(RESOURCE_ID, OWNER, true));
- assertFalse(api.afterUnlock(RESOURCE_ID, OWNER, false));
- }
-
- @Test
- public void testBeforeIsLocked() {
- assertEquals(OperResult.OPER_UNHANDLED, api.beforeIsLocked(RESOURCE_ID));
- }
-
- @Test
- public void testBeforeIsLockedBy() {
- assertEquals(OperResult.OPER_UNHANDLED, api.beforeIsLockedBy(RESOURCE_ID, OWNER));
- }
-}
diff --git a/policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockManagerTest.java b/policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockManagerTest.java
deleted file mode 100644
index f575ce49..00000000
--- a/policy-core/src/test/java/org/onap/policy/drools/core/lock/PolicyResourceLockManagerTest.java
+++ /dev/null
@@ -1,595 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.core.lock;
-
-import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi.OperResult;
-
-public class PolicyResourceLockManagerTest {
-
- private static final int MAX_AGE_SEC = 4 * 60;
-
- private static final String NULL_RESOURCE_ID = "null resourceId";
- private static final String NULL_OWNER = "null owner";
-
- private static final String RESOURCE_A = "resource.a";
- private static final String RESOURCE_B = "resource.b";
- private static final String RESOURCE_C = "resource.c";
-
- private static final String OWNER1 = "owner.one";
- private static final String OWNER2 = "owner.two";
- private static final String OWNER3 = "owner.three";
-
- private PolicyResourceLockFeatureApi impl1;
- private PolicyResourceLockFeatureApi impl2;
- private List<PolicyResourceLockFeatureApi> implList;
-
- private PolicyResourceLockManager mgr;
-
- /**
- * Set up.
- */
- @Before
- public void setUp() {
- impl1 = mock(PolicyResourceLockFeatureApi.class);
- impl2 = mock(PolicyResourceLockFeatureApi.class);
-
- initImplementer(impl1);
- initImplementer(impl2);
-
- // list of feature API implementers
- implList = new LinkedList<>(Arrays.asList(impl1, impl2));
-
- mgr = new PolicyResourceLockManager() {
- @Override
- protected List<PolicyResourceLockFeatureApi> getImplementers() {
- return implList;
- }
- };
- }
-
- /**
- * Initializes an implementer so it always returns {@code null}.
- *
- * @param impl implementer
- */
- private void initImplementer(PolicyResourceLockFeatureApi impl) {
- when(impl.beforeLock(anyString(), anyString(), anyInt())).thenReturn(OperResult.OPER_UNHANDLED);
- when(impl.beforeRefresh(anyString(), anyString(), anyInt())).thenReturn(OperResult.OPER_UNHANDLED);
- when(impl.beforeUnlock(anyString(), anyString())).thenReturn(OperResult.OPER_UNHANDLED);
- when(impl.beforeIsLocked(anyString())).thenReturn(OperResult.OPER_UNHANDLED);
- when(impl.beforeIsLockedBy(anyString(), anyString())).thenReturn(OperResult.OPER_UNHANDLED);
- }
-
- @Test
- public void testLock() throws Exception {
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- verify(impl1).beforeLock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- verify(impl2).beforeLock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- verify(impl1).afterLock(RESOURCE_A, OWNER1, true);
- verify(impl2).afterLock(RESOURCE_A, OWNER1, true);
-
- assertTrue(mgr.isLocked(RESOURCE_A));
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
- assertFalse(mgr.isLocked(RESOURCE_B));
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2));
-
- // null callback - not locked yet
- assertTrue(mgr.lock(RESOURCE_C, OWNER3, MAX_AGE_SEC));
-
- // null callback - already locked
- assertFalse(mgr.lock(RESOURCE_A, OWNER3, MAX_AGE_SEC));
- }
-
- @Test
- public void testLock_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.lock(null, OWNER1, MAX_AGE_SEC))
- .withMessage(NULL_RESOURCE_ID);
-
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.lock(RESOURCE_A, null, MAX_AGE_SEC))
- .withMessage(NULL_OWNER);
-
- // this should not throw an exception
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- }
-
- @Test
- public void testLock_Acquired_BeforeIntercepted() {
- // have impl1 intercept
- when(impl1.beforeLock(RESOURCE_A, OWNER1, MAX_AGE_SEC)).thenReturn(OperResult.OPER_ACCEPTED);
-
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- verify(impl1).beforeLock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- verify(impl2, never()).beforeLock(anyString(), anyString(), anyInt());
-
- verify(impl1, never()).afterLock(anyString(), anyString(), anyBoolean());
- verify(impl2, never()).afterLock(anyString(), anyString(), anyBoolean());
- }
-
- @Test
- public void testLock_Denied_BeforeIntercepted() {
- // have impl1 intercept
- when(impl1.beforeLock(RESOURCE_A, OWNER1, MAX_AGE_SEC)).thenReturn(OperResult.OPER_DENIED);
-
- assertFalse(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- verify(impl1).beforeLock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- verify(impl2, never()).beforeLock(anyString(), anyString(), anyInt());
-
- verify(impl1, never()).afterLock(anyString(), anyString(), anyBoolean());
- verify(impl2, never()).afterLock(anyString(), anyString(), anyBoolean());
- }
-
- @Test
- public void testLock_Acquired_AfterIntercepted() throws Exception {
-
- // impl1 intercepts during afterLock()
- when(impl1.afterLock(RESOURCE_A, OWNER1, true)).thenReturn(true);
-
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- // impl1 sees it, but impl2 does not
- verify(impl1).afterLock(RESOURCE_A, OWNER1, true);
- verify(impl2, never()).afterLock(anyString(), anyString(), anyBoolean());
- }
-
- @Test
- public void testLock_Acquired() throws Exception {
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- verify(impl1).afterLock(RESOURCE_A, OWNER1, true);
- verify(impl2).afterLock(RESOURCE_A, OWNER1, true);
- }
-
- @Test
- public void testLock_Denied_AfterIntercepted() throws Exception {
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // impl1 intercepts during afterLock()
- when(impl1.afterLock(RESOURCE_A, OWNER2, false)).thenReturn(true);
-
- // owner2 tries to lock
- assertFalse(mgr.lock(RESOURCE_A, OWNER2, MAX_AGE_SEC));
-
- // impl1 sees it, but impl2 does not
- verify(impl1).afterLock(RESOURCE_A, OWNER2, false);
- verify(impl2, never()).afterLock(RESOURCE_A, OWNER2, false);
- }
-
- @Test
- public void testLock_Denied() {
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // owner2 tries to lock
- mgr.lock(RESOURCE_A, OWNER2, MAX_AGE_SEC);
-
- verify(impl1).afterLock(RESOURCE_A, OWNER2, false);
- verify(impl2).afterLock(RESOURCE_A, OWNER2, false);
- }
-
- @Test
- public void testRefresh() throws Exception {
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
- assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- verify(impl1).beforeRefresh(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- verify(impl2).beforeRefresh(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- verify(impl1).afterRefresh(RESOURCE_A, OWNER1, true);
- verify(impl2).afterRefresh(RESOURCE_A, OWNER1, true);
-
- assertTrue(mgr.isLocked(RESOURCE_A));
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
- assertFalse(mgr.isLocked(RESOURCE_B));
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2));
-
- // different owner and resource
- assertFalse(mgr.refresh(RESOURCE_C, OWNER3, MAX_AGE_SEC));
-
- // different owner
- assertFalse(mgr.refresh(RESOURCE_A, OWNER3, MAX_AGE_SEC));
- }
-
- @Test
- public void testRefresh_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.refresh(null, OWNER1, MAX_AGE_SEC))
- .withMessage(NULL_RESOURCE_ID);
-
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.refresh(RESOURCE_A, null, MAX_AGE_SEC))
- .withMessage(NULL_OWNER);
-
- // this should not throw an exception
- mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- }
-
- @Test
- public void testRefresh_Acquired_BeforeIntercepted() {
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- // have impl1 intercept
- when(impl1.beforeRefresh(RESOURCE_A, OWNER1, MAX_AGE_SEC)).thenReturn(OperResult.OPER_ACCEPTED);
-
- assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- verify(impl1).beforeRefresh(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- verify(impl2, never()).beforeRefresh(anyString(), anyString(), anyInt());
-
- verify(impl1, never()).afterRefresh(anyString(), anyString(), anyBoolean());
- verify(impl2, never()).afterRefresh(anyString(), anyString(), anyBoolean());
- }
-
- @Test
- public void testRefresh_Denied_BeforeIntercepted() {
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- // have impl1 intercept
- when(impl1.beforeRefresh(RESOURCE_A, OWNER1, MAX_AGE_SEC)).thenReturn(OperResult.OPER_DENIED);
-
- assertFalse(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- verify(impl1).beforeRefresh(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- verify(impl2, never()).beforeRefresh(anyString(), anyString(), anyInt());
-
- verify(impl1, never()).afterRefresh(anyString(), anyString(), anyBoolean());
- verify(impl2, never()).afterRefresh(anyString(), anyString(), anyBoolean());
- }
-
- @Test
- public void testRefresh_Acquired_AfterIntercepted() throws Exception {
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- // impl1 intercepts during afterRefresh()
- when(impl1.afterRefresh(RESOURCE_A, OWNER1, true)).thenReturn(true);
-
- assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- // impl1 sees it, but impl2 does not
- verify(impl1).afterRefresh(RESOURCE_A, OWNER1, true);
- verify(impl2, never()).afterRefresh(anyString(), anyString(), anyBoolean());
- }
-
- @Test
- public void testRefresh_Acquired() throws Exception {
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- verify(impl1).afterRefresh(RESOURCE_A, OWNER1, true);
- verify(impl2).afterRefresh(RESOURCE_A, OWNER1, true);
- }
-
- @Test
- public void testRefresh_Denied_AfterIntercepted() throws Exception {
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // impl1 intercepts during afterRefresh()
- when(impl1.afterRefresh(RESOURCE_A, OWNER2, false)).thenReturn(true);
-
- // owner2 tries to lock
- assertFalse(mgr.refresh(RESOURCE_A, OWNER2, MAX_AGE_SEC));
-
- // impl1 sees it, but impl2 does not
- verify(impl1).afterRefresh(RESOURCE_A, OWNER2, false);
- verify(impl2, never()).afterRefresh(RESOURCE_A, OWNER2, false);
- }
-
- @Test
- public void testRefresh_Denied() {
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // owner2 tries to lock
- mgr.refresh(RESOURCE_A, OWNER2, MAX_AGE_SEC);
-
- verify(impl1).afterRefresh(RESOURCE_A, OWNER2, false);
- verify(impl2).afterRefresh(RESOURCE_A, OWNER2, false);
- }
-
- @Test
- public void testUnlock() throws Exception {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- mgr.lock(RESOURCE_B, OWNER1, MAX_AGE_SEC);
-
- assertTrue(mgr.unlock(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeUnlock(RESOURCE_A, OWNER1);
- verify(impl2).beforeUnlock(RESOURCE_A, OWNER1);
-
- verify(impl1).afterUnlock(RESOURCE_A, OWNER1, true);
- verify(impl2).afterUnlock(RESOURCE_A, OWNER1, true);
- }
-
- @Test
- public void testUnlock_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.unlock(null, OWNER1)).withMessage(NULL_RESOURCE_ID);
-
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.unlock(RESOURCE_A, null)).withMessage(NULL_OWNER);
- }
-
- @Test
- public void testUnlock_BeforeInterceptedTrue() {
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // have impl1 intercept
- when(impl1.beforeUnlock(RESOURCE_A, OWNER1)).thenReturn(OperResult.OPER_ACCEPTED);
-
- assertTrue(mgr.unlock(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeUnlock(RESOURCE_A, OWNER1);
- verify(impl2, never()).beforeUnlock(anyString(), anyString());
-
- verify(impl1, never()).afterUnlock(anyString(), anyString(), anyBoolean());
- verify(impl2, never()).afterUnlock(anyString(), anyString(), anyBoolean());
- }
-
- @Test
- public void testUnlock_BeforeInterceptedFalse() {
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // have impl1 intercept
- when(impl1.beforeUnlock(RESOURCE_A, OWNER1)).thenReturn(OperResult.OPER_DENIED);
-
- assertFalse(mgr.unlock(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeUnlock(RESOURCE_A, OWNER1);
- verify(impl2, never()).beforeUnlock(anyString(), anyString());
-
- verify(impl1, never()).afterUnlock(anyString(), anyString(), anyBoolean());
- verify(impl2, never()).afterUnlock(anyString(), anyString(), anyBoolean());
- }
-
- @Test
- public void testUnlock_Unlocked() {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- assertTrue(mgr.unlock(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeUnlock(RESOURCE_A, OWNER1);
- verify(impl2).beforeUnlock(RESOURCE_A, OWNER1);
-
- verify(impl1).afterUnlock(RESOURCE_A, OWNER1, true);
- verify(impl2).afterUnlock(RESOURCE_A, OWNER1, true);
- }
-
- @Test
- public void testUnlock_Unlocked_AfterIntercepted() {
- // have impl1 intercept
- when(impl1.afterUnlock(RESOURCE_A, OWNER1, true)).thenReturn(true);
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- assertTrue(mgr.unlock(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeUnlock(RESOURCE_A, OWNER1);
- verify(impl2).beforeUnlock(RESOURCE_A, OWNER1);
-
- verify(impl1).afterUnlock(RESOURCE_A, OWNER1, true);
- verify(impl2, never()).afterUnlock(RESOURCE_A, OWNER1, true);
- }
-
- @Test
- public void testUnlock_NotUnlocked() {
- assertFalse(mgr.unlock(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeUnlock(RESOURCE_A, OWNER1);
- verify(impl2).beforeUnlock(RESOURCE_A, OWNER1);
-
- verify(impl1).afterUnlock(RESOURCE_A, OWNER1, false);
- verify(impl2).afterUnlock(RESOURCE_A, OWNER1, false);
- }
-
- @Test
- public void testUnlock_NotUnlocked_AfterIntercepted() {
- // have impl1 intercept
- when(impl1.afterUnlock(RESOURCE_A, OWNER1, false)).thenReturn(true);
-
- assertFalse(mgr.unlock(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeUnlock(RESOURCE_A, OWNER1);
- verify(impl2).beforeUnlock(RESOURCE_A, OWNER1);
-
- verify(impl1).afterUnlock(RESOURCE_A, OWNER1, false);
- verify(impl2, never()).afterUnlock(RESOURCE_A, OWNER1, false);
- }
-
- @Test
- public void testIsLocked_True() {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- assertTrue(mgr.isLocked(RESOURCE_A));
-
- verify(impl1).beforeIsLocked(RESOURCE_A);
- verify(impl2).beforeIsLocked(RESOURCE_A);
- }
-
- @Test
- public void testIsLocked_False() {
- assertFalse(mgr.isLocked(RESOURCE_A));
-
- verify(impl1).beforeIsLocked(RESOURCE_A);
- verify(impl2).beforeIsLocked(RESOURCE_A);
- }
-
- @Test
- public void testIsLocked_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.isLocked(null)).withMessage(NULL_RESOURCE_ID);
- }
-
- @Test
- public void testIsLocked_BeforeIntercepted_True() {
-
- // have impl1 intercept
- when(impl1.beforeIsLocked(RESOURCE_A)).thenReturn(OperResult.OPER_ACCEPTED);;
-
- assertTrue(mgr.isLocked(RESOURCE_A));
-
- verify(impl1).beforeIsLocked(RESOURCE_A);
- verify(impl2, never()).beforeIsLocked(RESOURCE_A);
- }
-
- @Test
- public void testIsLocked_BeforeIntercepted_False() {
-
- // lock it so we can verify that impl1 overrides the superclass isLocker()
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // have impl1 intercept
- when(impl1.beforeIsLocked(RESOURCE_A)).thenReturn(OperResult.OPER_DENIED);
-
- assertFalse(mgr.isLocked(RESOURCE_A));
-
- verify(impl1).beforeIsLocked(RESOURCE_A);
- verify(impl2, never()).beforeIsLocked(RESOURCE_A);
- }
-
- @Test
- public void testIsLockedBy_True() {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeIsLockedBy(RESOURCE_A, OWNER1);
- verify(impl2).beforeIsLockedBy(RESOURCE_A, OWNER1);
- }
-
- @Test
- public void testIsLockedBy_False() {
- // different owner
- mgr.lock(RESOURCE_A, OWNER2, MAX_AGE_SEC);
-
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeIsLockedBy(RESOURCE_A, OWNER1);
- verify(impl2).beforeIsLockedBy(RESOURCE_A, OWNER1);
- }
-
- @Test
- public void testIsLockedBy_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.isLockedBy(null, OWNER1))
- .withMessage(NULL_RESOURCE_ID);
-
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.isLockedBy(RESOURCE_A, null)).withMessage(NULL_OWNER);
- }
-
- @Test
- public void testIsLockedBy_BeforeIntercepted_True() {
-
- // have impl1 intercept
- when(impl1.beforeIsLockedBy(RESOURCE_A, OWNER1)).thenReturn(OperResult.OPER_ACCEPTED);;
-
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeIsLockedBy(RESOURCE_A, OWNER1);
- verify(impl2, never()).beforeIsLockedBy(RESOURCE_A, OWNER1);
- }
-
- @Test
- public void testIsLockedBy_BeforeIntercepted_False() {
-
- // lock it so we can verify that impl1 overrides the superclass isLocker()
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // have impl1 intercept
- when(impl1.beforeIsLockedBy(RESOURCE_A, OWNER1)).thenReturn(OperResult.OPER_DENIED);
-
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER1));
-
- verify(impl1).beforeIsLockedBy(RESOURCE_A, OWNER1);
- verify(impl2, never()).beforeIsLockedBy(RESOURCE_A, OWNER1);
- }
-
- @Test
- public void testGetInstance() {
- PolicyResourceLockManager inst = PolicyResourceLockManager.getInstance();
- assertNotNull(inst);
-
- // should return the same instance each time
- assertEquals(inst, PolicyResourceLockManager.getInstance());
- assertEquals(inst, PolicyResourceLockManager.getInstance());
- }
-
- @Test
- public void testDoIntercept_Empty() {
- // clear the implementer list
- implList.clear();
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- assertTrue(mgr.isLocked(RESOURCE_A));
- assertFalse(mgr.isLocked(RESOURCE_B));
-
- verify(impl1, never()).beforeIsLocked(anyString());
- }
-
- @Test
- public void testDoIntercept_Impl1() {
- when(impl1.beforeIsLocked(RESOURCE_A)).thenReturn(OperResult.OPER_ACCEPTED);;
-
- assertTrue(mgr.isLocked(RESOURCE_A));
-
- verify(impl1).beforeIsLocked(RESOURCE_A);
- verify(impl2, never()).beforeIsLocked(anyString());
- }
-
- @Test
- public void testDoIntercept_Impl2() {
- when(impl2.beforeIsLocked(RESOURCE_A)).thenReturn(OperResult.OPER_ACCEPTED);;
-
- assertTrue(mgr.isLocked(RESOURCE_A));
-
- verify(impl1).beforeIsLocked(RESOURCE_A);
- verify(impl2).beforeIsLocked(RESOURCE_A);
- }
-
- @Test
- public void testDoIntercept_Ex() {
- doThrow(new RuntimeException("expected exception")).when(impl1).beforeIsLocked(RESOURCE_A);
-
- assertFalse(mgr.isLocked(RESOURCE_A));
-
- verify(impl1).beforeIsLocked(RESOURCE_A);
- verify(impl2).beforeIsLocked(RESOURCE_A);
- }
-}
diff --git a/policy-core/src/test/java/org/onap/policy/drools/core/lock/SimpleLockManagerTest.java b/policy-core/src/test/java/org/onap/policy/drools/core/lock/SimpleLockManagerTest.java
deleted file mode 100644
index 51cf68fc..00000000
--- a/policy-core/src/test/java/org/onap/policy/drools/core/lock/SimpleLockManagerTest.java
+++ /dev/null
@@ -1,527 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.core.lock;
-
-import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.LinkedList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.onap.policy.common.utils.time.CurrentTime;
-import org.onap.policy.common.utils.time.TestTime;
-import org.onap.policy.drools.core.lock.SimpleLockManager.Data;
-import org.powermock.reflect.Whitebox;
-
-public class SimpleLockManagerTest {
-
- // Note: this must be a multiple of four
- private static final int MAX_AGE_SEC = 4 * 60;
- private static final int MAX_AGE_MS = MAX_AGE_SEC * 1000;
-
- private static final String EXPECTED_EXCEPTION = "expected exception";
-
- private static final String NULL_RESOURCE_ID = "null resourceId";
- private static final String NULL_OWNER = "null owner";
-
- private static final String RESOURCE_A = "resource.a";
- private static final String RESOURCE_B = "resource.b";
- private static final String RESOURCE_C = "resource.c";
- private static final String RESOURCE_D = "resource.d";
-
- private static final String OWNER1 = "owner.one";
- private static final String OWNER2 = "owner.two";
- private static final String OWNER3 = "owner.three";
-
- /**
- * Name of the "current time" field within the {@link SimpleLockManager} class.
- */
- private static final String TIME_FIELD = "currentTime";
-
- private static CurrentTime savedTime;
-
- private TestTime testTime;
- private SimpleLockManager mgr;
-
- @BeforeClass
- public static void setUpBeforeClass() {
- savedTime = Whitebox.getInternalState(SimpleLockManager.class, TIME_FIELD);
- }
-
- @AfterClass
- public static void tearDownAfterClass() {
- Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, savedTime);
- }
-
- /**
- * Set up.
- */
- @Before
- public void setUp() {
- testTime = new TestTime();
- Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, testTime);
-
- mgr = new SimpleLockManager();
- }
-
- @Test
- public void testCurrentTime() {
- assertNotNull(savedTime);
- }
-
- @Test
- public void testLock() throws Exception {
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- assertTrue(mgr.isLocked(RESOURCE_A));
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
- assertFalse(mgr.isLocked(RESOURCE_B));
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2));
-
- // different owner and resource - should succeed
- assertTrue(mgr.lock(RESOURCE_C, OWNER3, MAX_AGE_SEC));
-
- // different owner - already locked
- assertFalse(mgr.lock(RESOURCE_A, OWNER3, MAX_AGE_SEC));
- }
-
- @Test
- public void testLock_ExtendLock() throws Exception {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // sleep half of the cycle
- testTime.sleep(MAX_AGE_MS / 2);
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
-
- // extend the lock
- mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // verify still locked after sleeping the other half of the cycle
- testTime.sleep(MAX_AGE_MS / 2 + 1);
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
-
- // and should release after another half cycle
- testTime.sleep(MAX_AGE_MS / 2);
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER1));
- }
-
- @Test
- public void testLock_AlreadyLocked() throws Exception {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // same owner
- assertFalse(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- // different owner
- assertFalse(mgr.lock(RESOURCE_A, OWNER2, MAX_AGE_SEC));
- }
-
- @Test
- public void testLock_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.lock(null, OWNER1, MAX_AGE_SEC))
- .withMessage(NULL_RESOURCE_ID);
-
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.lock(RESOURCE_A, null, MAX_AGE_SEC))
- .withMessage(NULL_OWNER);
-
- // this should not throw an exception
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- }
-
- @Test
- public void testRefresh() throws Exception {
- // don't own the lock yet - cannot refresh
- assertFalse(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- assertTrue(mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- // now the lock is owned
- assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- // refresh again
- assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC + 1));
-
- assertTrue(mgr.isLocked(RESOURCE_A));
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
- assertFalse(mgr.isLocked(RESOURCE_B));
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2));
-
- // different owner
- assertFalse(mgr.refresh(RESOURCE_A, OWNER3, MAX_AGE_SEC));
-
- // different resource
- assertFalse(mgr.refresh(RESOURCE_C, OWNER1, MAX_AGE_SEC));
- }
-
- @Test
- public void testRefresh_ExtendLock() throws Exception {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // sleep half of the cycle
- testTime.sleep(MAX_AGE_MS / 2);
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
-
- // extend the lock
- mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // verify still locked after sleeping the other half of the cycle
- testTime.sleep(MAX_AGE_MS / 2 + 1);
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
-
- // and should release after another half cycle
- testTime.sleep(MAX_AGE_MS / 2);
-
- // cannot refresh expired lock
- assertFalse(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER1));
- }
-
- @Test
- public void testRefresh_AlreadyLocked() throws Exception {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // same owner
- assertTrue(mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC));
-
- // different owner
- assertFalse(mgr.refresh(RESOURCE_A, OWNER2, MAX_AGE_SEC));
- assertFalse(mgr.lock(RESOURCE_A, OWNER2, MAX_AGE_SEC));
- }
-
- @Test
- public void testRefresh_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.refresh(null, OWNER1, MAX_AGE_SEC))
- .withMessage(NULL_RESOURCE_ID);
-
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.refresh(RESOURCE_A, null, MAX_AGE_SEC))
- .withMessage(NULL_OWNER);
-
- // this should not throw an exception
- mgr.refresh(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- }
-
- @Test
- public void testUnlock() throws Exception {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // unlock it
- assertTrue(mgr.unlock(RESOURCE_A, OWNER1));
- }
-
- @Test
- public void testUnlock_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.unlock(null, OWNER1)).withMessage(NULL_RESOURCE_ID);
-
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.unlock(RESOURCE_A, null)).withMessage(NULL_OWNER);
- }
-
- @Test
- public void testUnlock_NotLocked() {
- assertFalse(mgr.unlock(RESOURCE_A, OWNER1));
- }
-
- @Test
- public void testUnlock_DiffOwner() {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- assertFalse(mgr.unlock(RESOURCE_A, OWNER2));
- }
-
- @Test
- public void testIsLocked() {
- assertFalse(mgr.isLocked(RESOURCE_A));
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- mgr.lock(RESOURCE_B, OWNER1, MAX_AGE_SEC);
-
- assertTrue(mgr.isLocked(RESOURCE_A));
- assertTrue(mgr.isLocked(RESOURCE_B));
- assertFalse(mgr.isLocked(RESOURCE_C));
-
- // unlock from first resource
- mgr.unlock(RESOURCE_A, OWNER1);
- assertFalse(mgr.isLocked(RESOURCE_A));
- assertTrue(mgr.isLocked(RESOURCE_B));
- assertFalse(mgr.isLocked(RESOURCE_C));
-
- // unlock from second resource
- mgr.unlock(RESOURCE_B, OWNER1);
- assertFalse(mgr.isLocked(RESOURCE_A));
- assertFalse(mgr.isLocked(RESOURCE_B));
- assertFalse(mgr.isLocked(RESOURCE_C));
- }
-
- @Test
- public void testIsLocked_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.isLocked(null)).withMessage(NULL_RESOURCE_ID);
- }
-
- @Test
- public void testIsLockedBy() {
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER1));
-
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- assertFalse(mgr.isLockedBy(RESOURCE_B, OWNER1));
-
- assertTrue(mgr.isLockedBy(RESOURCE_A, OWNER1));
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2));
-
- // unlock from the resource
- mgr.unlock(RESOURCE_A, OWNER1);
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER1));
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2));
- assertFalse(mgr.isLockedBy(RESOURCE_B, OWNER1));
- }
-
- @Test
- public void testIsLockedBy_ArgEx() {
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.isLockedBy(null, OWNER1))
- .withMessage(NULL_RESOURCE_ID);
-
- assertThatIllegalArgumentException().isThrownBy(() -> mgr.isLockedBy(RESOURCE_A, null)).withMessage(NULL_OWNER);
- }
-
- @Test
- public void testIsLockedBy_NotLocked() {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // different resource, thus no lock
- assertFalse(mgr.isLockedBy(RESOURCE_B, OWNER1));
- }
-
- @Test
- public void testIsLockedBy_LockedButNotOwner() {
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
-
- // different owner
- assertFalse(mgr.isLockedBy(RESOURCE_A, OWNER2));
- }
-
- @Test
- public void testCleanUpLocks() throws Exception {
- // note: this assumes that MAX_AGE_MS is divisible by 4
- mgr.lock(RESOURCE_A, OWNER1, MAX_AGE_SEC);
- assertTrue(mgr.isLocked(RESOURCE_A));
-
- testTime.sleep(10);
- mgr.lock(RESOURCE_B, OWNER1, MAX_AGE_SEC);
- assertTrue(mgr.isLocked(RESOURCE_A));
- assertTrue(mgr.isLocked(RESOURCE_B));
-
- testTime.sleep(MAX_AGE_MS / 4);
- mgr.lock(RESOURCE_C, OWNER1, MAX_AGE_SEC);
- assertTrue(mgr.isLocked(RESOURCE_A));
- assertTrue(mgr.isLocked(RESOURCE_B));
- assertTrue(mgr.isLocked(RESOURCE_C));
-
- testTime.sleep(MAX_AGE_MS / 4);
- mgr.lock(RESOURCE_D, OWNER1, MAX_AGE_SEC);
- assertTrue(mgr.isLocked(RESOURCE_A));
- assertTrue(mgr.isLocked(RESOURCE_B));
- assertTrue(mgr.isLocked(RESOURCE_C));
- assertTrue(mgr.isLocked(RESOURCE_D));
-
- // sleep remainder of max age - first two should expire
- testTime.sleep(MAX_AGE_MS / 2);
- assertFalse(mgr.isLocked(RESOURCE_A));
- assertFalse(mgr.isLocked(RESOURCE_B));
- assertTrue(mgr.isLocked(RESOURCE_C));
- assertTrue(mgr.isLocked(RESOURCE_D));
-
- // another quarter - next one should expire
- testTime.sleep(MAX_AGE_MS / 4);
- assertFalse(mgr.isLocked(RESOURCE_C));
- assertTrue(mgr.isLocked(RESOURCE_D));
-
- // another quarter - last one should expire
- testTime.sleep(MAX_AGE_MS / 4);
- assertFalse(mgr.isLocked(RESOURCE_D));
- }
-
- @Test
- public void testMakeNullArgException() {
- IllegalArgumentException ex = SimpleLockManager.makeNullArgException(EXPECTED_EXCEPTION);
- assertEquals(EXPECTED_EXCEPTION, ex.getMessage());
- }
-
- @Test
- public void testDataGetXxx() {
- long ttime = System.currentTimeMillis() + 5;
- Data data = new Data(OWNER1, RESOURCE_A, ttime);
-
- assertEquals(OWNER1, data.getOwner());
- assertEquals(RESOURCE_A, data.getResource());
- assertEquals(ttime, data.getExpirationMs());
- }
-
- @Test
- public void testDataCompareTo() {
- long ttime = System.currentTimeMillis() + 50;
- Data data = new Data(OWNER1, RESOURCE_A, ttime);
- Data dataSame = new Data(OWNER1, RESOURCE_A, ttime);
- Data dataDiffExpire = new Data(OWNER1, RESOURCE_A, ttime + 1);
-
- assertEquals(0, data.compareTo(data));
- assertEquals(0, data.compareTo(dataSame));
-
- assertTrue(data.compareTo(dataDiffExpire) < 0);
- assertTrue(dataDiffExpire.compareTo(data) > 0);
-
- Data dataDiffOwner = new Data(OWNER2, RESOURCE_A, ttime);
- Data dataDiffResource = new Data(OWNER1, RESOURCE_B, ttime);
-
- assertTrue(data.compareTo(dataDiffOwner) < 0);
- assertTrue(dataDiffOwner.compareTo(data) > 0);
-
- assertTrue(data.compareTo(dataDiffResource) < 0);
- assertTrue(dataDiffResource.compareTo(data) > 0);
- }
-
- @Test
- public void testDataHashCode() {
- long ttime = System.currentTimeMillis() + 1;
- Data data = new Data(OWNER1, RESOURCE_A, ttime);
- Data dataSame = new Data(OWNER1, RESOURCE_A, ttime);
- Data dataDiffExpire = new Data(OWNER1, RESOURCE_A, ttime + 1);
- Data dataDiffOwner = new Data(OWNER2, RESOURCE_A, ttime);
-
- int hc1 = data.hashCode();
- assertEquals(hc1, dataSame.hashCode());
-
- assertTrue(hc1 != dataDiffExpire.hashCode());
- assertTrue(hc1 != dataDiffOwner.hashCode());
-
- Data dataDiffResource = new Data(OWNER1, RESOURCE_B, ttime);
- Data dataNullOwner = new Data(null, RESOURCE_A, ttime);
- Data dataNullResource = new Data(OWNER1, null, ttime);
-
- assertTrue(hc1 != dataDiffResource.hashCode());
- assertTrue(hc1 != dataNullOwner.hashCode());
- assertTrue(hc1 != dataNullResource.hashCode());
- }
-
- @Test
- public void testDataEquals() {
- long ttime = System.currentTimeMillis() + 50;
- Data data = new Data(OWNER1, RESOURCE_A, ttime);
- Data dataSame = new Data(OWNER1, RESOURCE_A, ttime);
- Data dataDiffExpire = new Data(OWNER1, RESOURCE_A, ttime + 1);
-
- assertTrue(data.equals(data));
- assertTrue(data.equals(dataSame));
-
- Data dataDiffOwner = new Data(OWNER2, RESOURCE_A, ttime);
- Data dataDiffResource = new Data(OWNER1, RESOURCE_B, ttime);
-
- assertFalse(data.equals(dataDiffExpire));
- assertFalse(data.equals(dataDiffOwner));
- assertFalse(data.equals(dataDiffResource));
-
- assertFalse(data.equals(null));
- assertFalse(data.equals("string"));
-
- Data dataNullOwner = new Data(null, RESOURCE_A, ttime);
- Data dataNullResource = new Data(OWNER1, null, ttime);
-
- assertFalse(dataNullOwner.equals(data));
- assertFalse(dataNullResource.equals(data));
-
- assertTrue(dataNullOwner.equals(new Data(null, RESOURCE_A, ttime)));
- assertTrue(dataNullResource.equals(new Data(OWNER1, null, ttime)));
- }
-
- @Test
- public void testMultiThreaded() throws InterruptedException {
- int nthreads = 10;
- int nlocks = 100;
-
- LinkedList<Thread> threads = new LinkedList<>();
-
- String[] resources = {RESOURCE_A, RESOURCE_B};
-
- final AtomicInteger nfail = new AtomicInteger(0);
-
- CountDownLatch stopper = new CountDownLatch(1);
- CountDownLatch completed = new CountDownLatch(nthreads);
-
- for (int x = 0; x < nthreads; ++x) {
- String owner = "owner." + x;
-
- Thread thread = new Thread(() -> {
-
- for (int y = 0; y < nlocks; ++y) {
- String res = resources[y % resources.length];
-
- try {
- // some locks will be acquired, some denied
- mgr.lock(res, owner, MAX_AGE_SEC);
-
- // do some "work"
- stopper.await(1L, TimeUnit.MILLISECONDS);
-
- mgr.unlock(res, owner);
-
- } catch (InterruptedException expected) {
- Thread.currentThread().interrupt();
- break;
- }
- }
-
- completed.countDown();
- });
-
- thread.setDaemon(true);
- threads.add(thread);
- }
-
- // start the threads
- for (Thread t : threads) {
- t.start();
- }
-
- // wait for them to complete
- completed.await(5000L, TimeUnit.SECONDS);
-
- // stop the threads from sleeping
- stopper.countDown();
-
- completed.await(1L, TimeUnit.SECONDS);
-
- // interrupt those that are still alive
- for (Thread t : threads) {
- if (t.isAlive()) {
- t.interrupt();
- }
- }
-
- assertEquals(0, nfail.get());
- }
-
-}
diff --git a/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java b/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java
index fe31eb50..87001ad6 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/features/PolicyEngineFeatureApi.java
@@ -23,6 +23,7 @@ package org.onap.policy.drools.features;
import java.util.Properties;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.utils.services.OrderedService;
+import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
import org.onap.policy.drools.protocol.configuration.PdpdConfiguration;
import org.onap.policy.drools.system.PolicyEngine;
@@ -271,4 +272,26 @@ public interface PolicyEngineFeatureApi extends OrderedService {
default boolean afterOpen(PolicyEngine engine) {
return false;
}
+
+ /**
+ * Called before the PolicyEngine creates a lock manager.
+ *
+ * @return a lock manager if this feature intercepts and takes ownership of the
+ * operation preventing the invocation of lower priority features. Null,
+ * otherwise
+ */
+ default PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) {
+ return null;
+ }
+
+ /**
+ * Called after the PolicyEngine creates a lock manager.
+ *
+ * @return True if this feature intercepts and takes ownership of the operation
+ * preventing the invocation of lower priority features. False, otherwise
+ */
+ default boolean afterCreateLockManager(PolicyEngine engine, Properties properties,
+ PolicyResourceLockManager lockManager) {
+ return false;
+ }
}
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java
index 653ff72e..cb0749d9 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java
@@ -22,6 +22,7 @@ package org.onap.policy.drools.system;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ScheduledExecutorService;
import org.onap.policy.common.capabilities.Lockable;
import org.onap.policy.common.capabilities.Startable;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
@@ -29,6 +30,8 @@ import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.endpoints.http.server.HttpServletServer;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
import org.onap.policy.drools.features.PolicyEngineFeatureApi;
import org.onap.policy.drools.protocol.configuration.ControllerConfiguration;
import org.onap.policy.drools.protocol.configuration.PdpdConfiguration;
@@ -198,6 +201,11 @@ public interface PolicyEngine extends Startable, Lockable, TopicListener {
List<HttpServletServer> getHttpServers();
/**
+ * Gets a thread pool that can be used to execute background tasks.
+ */
+ ScheduledExecutorService getExecutorService();
+
+ /**
* get properties configuration.
*
* @return properties objects
@@ -280,6 +288,31 @@ public interface PolicyEngine extends Startable, Lockable, TopicListener {
boolean deliver(CommInfrastructure busType, String topic, String event);
/**
+ * Requests a lock on a resource. Typically, the lock is not immediately granted,
+ * though a "lock" object is always returned. Once the lock has been granted (or
+ * denied), the callback will be invoked to indicate the result.
+ *
+ * <p/>
+ * Notes:
+ * <dl>
+ * <li>The callback may be invoked <i>before</i> this method returns</li>
+ * <li>The implementation need not honor waitForLock={@code true}</li>
+ * </dl>
+ *
+ * @param resourceId identifier of the resource to be locked
+ * @param ownerKey information identifying the owner requesting the lock
+ * @param holdSec amount of time, in seconds, for which the lock should be held once
+ * it has been granted, after which it will automatically be released
+ * @param callback callback to be invoked once the lock is granted, or subsequently
+ * lost; must not be {@code null}
+ * @param waitForLock {@code true} to wait for the lock, if it is currently locked,
+ * {@code false} otherwise
+ * @return a new lock
+ */
+ public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback,
+ boolean waitForLock);
+
+ /**
* Invoked when the host goes into the active state.
*/
void activate();
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
index 766848c6..c924fd69 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
@@ -24,7 +24,6 @@ import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERV
import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME;
import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_PORT;
-import com.att.aft.dme2.internal.apache.commons.lang3.StringUtils;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.gson.Gson;
@@ -32,11 +31,16 @@ import com.google.gson.GsonBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Stream;
+import lombok.AccessLevel;
import lombok.Getter;
+import lombok.NonNull;
+import org.apache.commons.lang.StringUtils;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
@@ -54,6 +58,9 @@ import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
import org.onap.policy.drools.core.PolicyContainer;
import org.onap.policy.drools.core.jmx.PdpJmxListener;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
import org.onap.policy.drools.features.PolicyControllerFeatureApi;
import org.onap.policy.drools.features.PolicyControllerFeatureApiConstants;
import org.onap.policy.drools.features.PolicyEngineFeatureApi;
@@ -67,6 +74,7 @@ import org.onap.policy.drools.protocol.configuration.ControllerConfiguration;
import org.onap.policy.drools.protocol.configuration.PdpdConfiguration;
import org.onap.policy.drools.server.restful.RestManager;
import org.onap.policy.drools.server.restful.aaf.AafTelemetryAuthFilter;
+import org.onap.policy.drools.system.internal.SimpleLockManager;
import org.onap.policy.drools.utils.PropertyUtil;
import org.onap.policy.drools.utils.logging.LoggerUtil;
import org.onap.policy.drools.utils.logging.MdcTransaction;
@@ -78,7 +86,6 @@ import org.slf4j.LoggerFactory;
* Policy Engine Manager Implementation.
*/
class PolicyEngineManager implements PolicyEngine {
-
/**
* String literals.
*/
@@ -88,6 +95,9 @@ class PolicyEngineManager implements PolicyEngine {
private static final String ENGINE_STOPPED_MSG = "Policy Engine is stopped";
private static final String ENGINE_LOCKED_MSG = "Policy Engine is locked";
+ public static final String EXECUTOR_THREAD_PROP = "executor.threads";
+ protected static final int DEFAULT_EXECUTOR_THREADS = 5;
+
/**
* logger.
*/
@@ -134,6 +144,17 @@ class PolicyEngineManager implements PolicyEngine {
private List<HttpServletServer> httpServers = new ArrayList<>();
/**
+ * Thread pool used to execute background tasks.
+ */
+ private ScheduledExecutorService executorService = null;
+
+ /**
+ * Lock manager used to create locks.
+ */
+ @Getter(AccessLevel.PROTECTED)
+ private PolicyResourceLockManager lockManager = null;
+
+ /**
* gson parser to decode configuration requests.
*/
private final Gson decoder = new GsonBuilder().disableHtmlEscaping().create();
@@ -214,6 +235,53 @@ class PolicyEngineManager implements PolicyEngine {
}
@Override
+ @JsonIgnore
+ @GsonJsonIgnore
+ public ScheduledExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ private ScheduledExecutorService makeExecutorService(Properties properties) {
+ int nthreads = DEFAULT_EXECUTOR_THREADS;
+ try {
+ nthreads = Integer.valueOf(
+ properties.getProperty(EXECUTOR_THREAD_PROP, String.valueOf(DEFAULT_EXECUTOR_THREADS)));
+
+ } catch (NumberFormatException e) {
+ logger.error("invalid number for " + EXECUTOR_THREAD_PROP + " property", e);
+ }
+
+ return makeScheduledExecutor(nthreads);
+ }
+
+ private void createLockManager(Properties properties) {
+ for (PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ this.lockManager = feature.beforeCreateLockManager(this, properties);
+ if (this.lockManager != null) {
+ return;
+ }
+ } catch (RuntimeException e) {
+ logger.error("{}: feature {} before-create-lock-manager failure because of {}", this,
+ feature.getClass().getName(), e.getMessage(), e);
+ }
+ }
+
+ try {
+ this.lockManager = new SimpleLockManager(this, properties);
+ } catch (RuntimeException e) {
+ logger.error("{}: cannot create simple lock manager because of {}", this, e.getMessage(), e);
+ this.lockManager = new SimpleLockManager(this, new Properties());
+ }
+
+ /* policy-engine dispatch post operation hook */
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterCreateLockManager(this, properties, this.lockManager),
+ (feature, ex) -> logger.error("{}: feature {} after-create-lock-manager failure because of {}",
+ this, feature.getClass().getName(), ex.getMessage(), ex));
+ }
+
+ @Override
public synchronized void configure(Properties properties) {
if (properties == null) {
@@ -257,6 +325,10 @@ class PolicyEngineManager implements PolicyEngine {
logger.error("{}: add-http-servers failed", this, e);
}
+ executorService = makeExecutorService(properties);
+
+ createLockManager(properties);
+
/* policy-engine dispatch post configure hook */
FeatureApiUtils.apply(getEngineProviders(),
feature -> feature.afterConfigure(this),
@@ -499,6 +571,13 @@ class PolicyEngineManager implements PolicyEngine {
AtomicReference<Boolean> success = new AtomicReference<>(true);
+ try {
+ success.compareAndSet(true, this.lockManager.start());
+ } catch (final RuntimeException e) {
+ logger.warn("{}: cannot start lock manager because of {}", this, e.getMessage(), e);
+ success.set(false);
+ }
+
/* Start Policy Engine exclusively-owned (unmanaged) http servers */
attempt(success, this.httpServers,
@@ -639,9 +718,16 @@ class PolicyEngineManager implements PolicyEngine {
(item, ex) -> logger.error("{}: cannot stop http-server {} because of {}", this, item,
ex.getMessage(), ex));
+ try {
+ success.compareAndSet(true, this.lockManager.stop());
+ } catch (final RuntimeException e) {
+ logger.warn("{}: cannot stop lock manager because of {}", this, e.getMessage(), e);
+ success.set(false);
+ }
+
// stop JMX?
- /* policy-engine dispatch pre stop hook */
+ /* policy-engine dispatch post stop hook */
FeatureApiUtils.apply(getEngineProviders(),
feature -> feature.afterStop(this),
(feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this,
@@ -688,6 +774,14 @@ class PolicyEngineManager implements PolicyEngine {
getTopicEndpointManager().shutdown();
getServletFactory().destroy();
+ try {
+ this.lockManager.shutdown();
+ } catch (final RuntimeException e) {
+ logger.warn("{}: cannot shutdown lock manager because of {}", this, e.getMessage(), e);
+ }
+
+ executorService.shutdownNow();
+
// Stop the JMX listener
stopPdpJmxListener();
@@ -806,6 +900,13 @@ class PolicyEngineManager implements PolicyEngine {
success = getTopicEndpointManager().lock() && success;
+ try {
+ success = (this.lockManager == null || this.lockManager.lock()) && success;
+ } catch (final RuntimeException e) {
+ logger.warn("{}: cannot lock() lock manager because of {}", this, e.getMessage(), e);
+ success = false;
+ }
+
/* policy-engine dispatch post lock hook */
FeatureApiUtils.apply(getEngineProviders(),
feature -> feature.afterLock(this),
@@ -833,6 +934,14 @@ class PolicyEngineManager implements PolicyEngine {
this.locked = false;
boolean success = true;
+
+ try {
+ success = (this.lockManager == null || this.lockManager.unlock()) && success;
+ } catch (final RuntimeException e) {
+ logger.warn("{}: cannot unlock() lock manager because of {}", this, e.getMessage(), e);
+ success = false;
+ }
+
final List<PolicyController> controllers = getControllerFactory().inventory();
for (final PolicyController controller : controllers) {
try {
@@ -1167,6 +1276,21 @@ class PolicyEngineManager implements PolicyEngine {
feature.getClass().getName(), ex.getMessage(), ex));
}
+ @Override
+ public Lock createLock(@NonNull String resourceId, @NonNull String ownerKey, int holdSec,
+ @NonNull LockCallback callback, boolean waitForLock) {
+
+ if (holdSec < 0) {
+ throw new IllegalArgumentException("holdSec is negative");
+ }
+
+ if (lockManager == null) {
+ throw new IllegalStateException("lock manager has not been initialized");
+ }
+
+ return lockManager.createLock(resourceId, ownerKey, holdSec, callback, waitForLock);
+ }
+
private boolean controllerConfig(PdpdConfiguration config) {
/* only this one supported for now */
final List<ControllerConfiguration> configControllers = config.getControllers();
@@ -1234,4 +1358,13 @@ class PolicyEngineManager implements PolicyEngine {
protected PolicyEngine getPolicyEngine() {
return PolicyEngineConstants.getManager();
}
+
+ protected ScheduledExecutorService makeScheduledExecutor(int nthreads) {
+ ScheduledThreadPoolExecutor exsvc = new ScheduledThreadPoolExecutor(nthreads);
+ exsvc.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ exsvc.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ exsvc.setRemoveOnCancelPolicy(true);
+
+ return exsvc;
+ }
}
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java
new file mode 100644
index 00000000..f5163e9b
--- /dev/null
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java
@@ -0,0 +1,426 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.system.internal;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+import org.onap.policy.common.utils.time.CurrentTime;
+import org.onap.policy.drools.core.lock.AlwaysFailLock;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.LockImpl;
+import org.onap.policy.drools.core.lock.LockState;
+import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Simple implementation of the Lock Feature. Locks do not span across instances of this
+ * object (i.e., locks do not span across servers).
+ *
+ * <p/>
+ * Note: this implementation does <i>not</i> honor the waitForLocks={@code true}
+ * parameter.
+ *
+ * <p/>
+ * When a lock is deserialized, it will not initially appear in this feature's map; it
+ * will be added to the map once free() or extend() is invoked, provided there isn't
+ * already an entry.
+ */
+public class SimpleLockManager implements PolicyResourceLockManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(SimpleLockManager.class);
+
+ private static final String NOT_LOCKED_MSG = "not locked";
+ private static final String LOCK_LOST_MSG = "lock lost";
+
+ /**
+ * Provider of current time. May be overridden by junit tests.
+ */
+ private static CurrentTime currentTime = new CurrentTime();
+
+ @Getter(AccessLevel.PROTECTED)
+ @Setter(AccessLevel.PROTECTED)
+ private static SimpleLockManager latestInstance = null;
+
+
+ /**
+ * Engine with which this manager is associated.
+ */
+ private final PolicyEngine engine;
+
+ /**
+ * Feature properties.
+ */
+ private final SimpleLockProperties featProps;
+
+ /**
+ * Maps a resource to the lock that owns it.
+ */
+ private final Map<String, SimpleLock> resource2lock = new ConcurrentHashMap<>();
+
+ /**
+ * Thread pool used to check for lock expiration and to notify owners when locks are
+ * lost.
+ */
+ private ScheduledExecutorService exsvc = null;
+
+ /**
+ * Used to cancel the expiration checker on shutdown.
+ */
+ private ScheduledFuture<?> checker = null;
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param engine engine with which this manager is associated
+ * @param properties properties used to configure the manager
+ */
+ public SimpleLockManager(PolicyEngine engine, Properties properties) {
+ try {
+ this.engine = engine;
+ this.featProps = new SimpleLockProperties(properties);
+
+ } catch (PropertyException e) {
+ throw new SimpleLockManagerException(e);
+ }
+ }
+
+ @Override
+ public boolean isAlive() {
+ return (checker != null);
+ }
+
+ @Override
+ public boolean start() {
+ if (checker != null) {
+ return false;
+ }
+
+ exsvc = getThreadPool();
+
+ checker = exsvc.scheduleWithFixedDelay(this::checkExpired, featProps.getExpireCheckSec(),
+ featProps.getExpireCheckSec(), TimeUnit.SECONDS);
+
+ setLatestInstance(this);
+
+ return true;
+ }
+
+ /**
+ * Stops the expiration checker. Does <i>not</i> invoke any lock call-backs.
+ */
+ @Override
+ public synchronized boolean stop() {
+ exsvc = null;
+
+ if (checker == null) {
+ return false;
+ }
+
+ ScheduledFuture<?> checker2 = checker;
+ checker = null;
+
+ checker2.cancel(true);
+
+ return true;
+ }
+
+ @Override
+ public void shutdown() {
+ stop();
+ }
+
+ @Override
+ public boolean isLocked() {
+ return false;
+ }
+
+ @Override
+ public boolean lock() {
+ return true;
+ }
+
+ @Override
+ public boolean unlock() {
+ return true;
+ }
+
+ @Override
+ public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback,
+ boolean waitForLock) {
+
+ if (latestInstance != this) {
+ AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback);
+ lock.notifyUnavailable();
+ return lock;
+ }
+
+ SimpleLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback);
+
+ SimpleLock existingLock = resource2lock.putIfAbsent(resourceId, lock);
+
+ if (existingLock == null) {
+ lock.grant();
+ } else {
+ lock.deny("resource is busy");
+ }
+
+ return lock;
+ }
+
+ /**
+ * Checks for expired locks.
+ */
+ private void checkExpired() {
+ long currentMs = currentTime.getMillis();
+ logger.info("checking for expired locks at {}", currentMs);
+
+ /*
+ * Could do this via an iterator, but using compute() guarantees that the lock
+ * doesn't get extended while it's being removed from the map.
+ */
+ for (Entry<String, SimpleLock> ent : resource2lock.entrySet()) {
+ if (!ent.getValue().expired(currentMs)) {
+ continue;
+ }
+
+ AtomicReference<SimpleLock> lockref = new AtomicReference<>(null);
+
+ resource2lock.computeIfPresent(ent.getKey(), (resourceId, lock) -> {
+ if (lock.expired(currentMs)) {
+ lockref.set(lock);
+ return null;
+ }
+
+ return lock;
+ });
+
+ SimpleLock lock = lockref.get();
+ if (lock != null) {
+ lock.deny("lock expired");
+ }
+ }
+ }
+
+ /**
+ * Simple Lock implementation.
+ */
+ public static class SimpleLock extends LockImpl {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Time, in milliseconds, when the lock expires.
+ */
+ @Getter
+ private long holdUntilMs;
+
+ /**
+ * Feature containing this lock. May be {@code null} until the feature is
+ * identified. Note: this can only be null if the lock has been de-serialized.
+ */
+ private transient SimpleLockManager feature;
+
+ /**
+ * Constructs the object.
+ */
+ public SimpleLock() {
+ this.holdUntilMs = 0;
+ this.feature = null;
+ }
+
+ /**
+ * Constructs the object.
+ *
+ * @param state initial state of the lock
+ * @param resourceId identifier of the resource to be locked
+ * @param ownerKey information identifying the owner requesting the lock
+ * @param holdSec amount of time, in seconds, for which the lock should be held,
+ * after which it will automatically be released
+ * @param callback callback to be invoked once the lock is granted, or
+ * subsequently lost; must not be {@code null}
+ * @param feature feature containing this lock
+ */
+ public SimpleLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback,
+ SimpleLockManager feature) {
+ super(state, resourceId, ownerKey, holdSec, callback);
+ this.feature = feature;
+ }
+
+ /**
+ * Determines if the owner's lock has expired.
+ *
+ * @param currentMs current time, in milliseconds
+ * @return {@code true} if the owner's lock has expired, {@code false} otherwise
+ */
+ public boolean expired(long currentMs) {
+ return (holdUntilMs <= currentMs);
+ }
+
+ /**
+ * Grants this lock. The notification is <i>always</i> invoked via a background
+ * thread.
+ */
+ protected synchronized void grant() {
+ if (isUnavailable()) {
+ return;
+ }
+
+ setState(LockState.ACTIVE);
+ holdUntilMs = currentTime.getMillis() + TimeUnit.SECONDS.toMillis(getHoldSec());
+
+ logger.info("lock granted: {}", this);
+
+ feature.exsvc.execute(this::notifyAvailable);
+ }
+
+ /**
+ * Permanently denies this lock. The notification is invoked via a background
+ * thread, if a feature instance is attached, otherwise it uses the foreground
+ * thread.
+ *
+ * @param reason the reason the lock was denied
+ */
+ protected void deny(String reason) {
+ synchronized (this) {
+ setState(LockState.UNAVAILABLE);
+ }
+
+ logger.info("{}: {}", reason, this);
+
+ if (feature == null) {
+ notifyUnavailable();
+
+ } else {
+ feature.exsvc.execute(this::notifyUnavailable);
+ }
+ }
+
+ @Override
+ public boolean free() {
+ // do a quick check of the state
+ if (isUnavailable()) {
+ return false;
+ }
+
+ logger.info("releasing lock: {}", this);
+
+ if (!attachFeature()) {
+ setState(LockState.UNAVAILABLE);
+ return false;
+ }
+
+ AtomicBoolean result = new AtomicBoolean(false);
+
+ feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
+
+ if (curlock == this) {
+ // this lock was the owner - resource is now available
+ result.set(true);
+ setState(LockState.UNAVAILABLE);
+ return null;
+
+ } else {
+ return curlock;
+ }
+ });
+
+ return result.get();
+ }
+
+ @Override
+ public void extend(int holdSec, LockCallback callback) {
+ if (holdSec < 0) {
+ throw new IllegalArgumentException("holdSec is negative");
+ }
+
+ setHoldSec(holdSec);
+ setCallback(callback);
+
+ // do a quick check of the state
+ if (isUnavailable() || !attachFeature()) {
+ deny(LOCK_LOST_MSG);
+ return;
+ }
+
+ if (feature.resource2lock.get(getResourceId()) == this) {
+ grant();
+ } else {
+ deny(NOT_LOCKED_MSG);
+ }
+ }
+
+ /**
+ * Attaches to the feature instance, if not already attached.
+ *
+ * @return {@code true} if the lock is now attached to a feature, {@code false}
+ * otherwise
+ */
+ private synchronized boolean attachFeature() {
+ if (feature != null) {
+ // already attached
+ return true;
+ }
+
+ feature = latestInstance;
+ if (feature == null) {
+ logger.warn("no feature yet for {}", this);
+ return false;
+ }
+
+ // put this lock into the map
+ feature.resource2lock.putIfAbsent(getResourceId(), this);
+
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "SimpleLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey=" + getOwnerKey()
+ + ", holdSec=" + getHoldSec() + ", holdUntilMs=" + holdUntilMs + "]";
+ }
+ }
+
+ // these may be overridden by junit tests
+
+ protected ScheduledExecutorService getThreadPool() {
+ return engine.getExecutorService();
+ }
+
+ protected SimpleLock makeLock(LockState waiting, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+ return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, this);
+ }
+}
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManagerException.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManagerException.java
new file mode 100644
index 00000000..ff02f39e
--- /dev/null
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManagerException.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.system.internal;
+
+public class SimpleLockManagerException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructor.
+ *
+ * @param ex exception to be wrapped
+ */
+ public SimpleLockManagerException(Exception ex) {
+ super(ex);
+ }
+}
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockProperties.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockProperties.java
new file mode 100644
index 00000000..0d1ca89b
--- /dev/null
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockProperties.java
@@ -0,0 +1,52 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.system.internal;
+
+import java.util.Properties;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.utils.properties.BeanConfigurator;
+import org.onap.policy.common.utils.properties.Property;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+
+
+@Getter
+@Setter
+public class SimpleLockProperties {
+ public static final String PREFIX = "simple.locking.";
+ public static final String EXPIRE_CHECK_SEC = PREFIX + "expire.check.seconds";
+
+ /**
+ * Time, in seconds, to wait between checks for expired locks.
+ */
+ @Property(name = EXPIRE_CHECK_SEC, defaultValue = "900")
+ private int expireCheckSec;
+
+ /**
+ * Constructs the object, populating fields from the properties.
+ *
+ * @param props properties from which to configure this
+ * @throws PropertyException if an error occurs
+ */
+ public SimpleLockProperties(Properties props) throws PropertyException {
+ new BeanConfigurator().configureFromProperties(this, props);
+ }
+}
diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java
index 5e0ead9d..fe1a2345 100644
--- a/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java
+++ b/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineManagerTest.java
@@ -27,12 +27,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -41,6 +43,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.junit.Before;
@@ -53,6 +56,10 @@ import org.onap.policy.common.endpoints.http.server.HttpServletServer;
import org.onap.policy.common.endpoints.http.server.HttpServletServerFactory;
import org.onap.policy.common.utils.gson.GsonTestUtils;
import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.LockImpl;
+import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
import org.onap.policy.drools.features.PolicyControllerFeatureApi;
import org.onap.policy.drools.features.PolicyEngineFeatureApi;
import org.onap.policy.drools.persistence.SystemPersistence;
@@ -61,9 +68,10 @@ import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
import org.onap.policy.drools.protocol.configuration.ControllerConfiguration;
import org.onap.policy.drools.protocol.configuration.DroolsConfiguration;
import org.onap.policy.drools.protocol.configuration.PdpdConfiguration;
+import org.onap.policy.drools.system.internal.SimpleLockManager;
+import org.onap.policy.drools.system.internal.SimpleLockProperties;
public class PolicyEngineManagerTest {
-
private static final String EXPECTED = "expected exception";
private static final String NOOP_STR = CommInfrastructure.NOOP.name();
@@ -76,6 +84,8 @@ public class PolicyEngineManagerTest {
private static final String FEATURE2 = "feature-b";
private static final String MY_TOPIC = "my-topic";
private static final String MESSAGE = "my-message";
+ private static final String MY_OWNER = "my-owner";
+ private static final String MY_RESOURCE = "my-resource";
private static final Object MY_EVENT = new Object();
@@ -125,6 +135,8 @@ public class PolicyEngineManagerTest {
private PdpdConfiguration pdpConfig;
private String pdpConfigJson;
private PolicyEngineManager mgr;
+ private ScheduledExecutorService exsvc;
+ private PolicyResourceLockManager lockmgr;
/**
* Initializes the object to be tested.
@@ -176,6 +188,15 @@ public class PolicyEngineManagerTest {
config3 = new ControllerConfiguration();
config4 = new ControllerConfiguration();
pdpConfig = new PdpdConfiguration();
+ exsvc = mock(ScheduledExecutorService.class);
+ lockmgr = mock(PolicyResourceLockManager.class);
+
+ when(lockmgr.start()).thenReturn(true);
+ when(lockmgr.stop()).thenReturn(true);
+ when(lockmgr.lock()).thenReturn(true);
+ when(lockmgr.unlock()).thenReturn(true);
+
+ when(prov2.beforeCreateLockManager(any(), any())).thenReturn(lockmgr);
when(prov1.getName()).thenReturn(FEATURE1);
when(prov2.getName()).thenReturn(FEATURE2);
@@ -387,6 +408,86 @@ public class PolicyEngineManagerTest {
assertFalse(config.isEmpty());
}
+ /**
+ * Tests that makeExecutorService() uses the value from the thread
+ * property.
+ */
+ @Test
+ public void testMakeExecutorServicePropertyProvided() {
+ PolicyEngineManager mgrspy = spy(mgr);
+
+ properties.setProperty(PolicyEngineManager.EXECUTOR_THREAD_PROP, "3");
+ mgrspy.configure(properties);
+ assertSame(exsvc, mgrspy.getExecutorService());
+ verify(mgrspy).makeScheduledExecutor(3);
+ }
+
+ /**
+ * Tests that makeExecutorService() uses the default thread count when no thread
+ * property is provided.
+ */
+ @Test
+ public void testMakeExecutorServiceNoProperty() {
+ PolicyEngineManager mgrspy = spy(mgr);
+
+ mgrspy.configure(properties);
+ assertSame(exsvc, mgrspy.getExecutorService());
+ verify(mgrspy).makeScheduledExecutor(PolicyEngineManager.DEFAULT_EXECUTOR_THREADS);
+ }
+
+ /**
+ * Tests that makeExecutorService() uses the default thread count when the thread
+ * property is invalid.
+ */
+ @Test
+ public void testMakeExecutorServiceInvalidProperty() {
+ PolicyEngineManager mgrspy = spy(mgr);
+
+ properties.setProperty(PolicyEngineManager.EXECUTOR_THREAD_PROP, "abc");
+ mgrspy.configure(properties);
+ assertSame(exsvc, mgrspy.getExecutorService());
+ verify(mgrspy).makeScheduledExecutor(PolicyEngineManager.DEFAULT_EXECUTOR_THREADS);
+ }
+
+ /**
+ * Tests createLockManager() when beforeCreateLock throws an exception and returns a
+ * manager.
+ */
+ @Test
+ public void testCreateLockManagerHaveProvider() {
+ // first provider throws an exception
+ when(prov1.beforeCreateLockManager(any(), any())).thenThrow(new RuntimeException(EXPECTED));
+
+ mgr.configure(properties);
+ assertSame(lockmgr, mgr.getLockManager());
+ }
+
+ /**
+ * Tests createLockManager() when SimpleLockManager throws an exception.
+ */
+ @Test
+ public void testCreateLockManagerSimpleEx() {
+ when(prov2.beforeCreateLockManager(any(), any())).thenReturn(null);
+
+ // invalid property for SimpleLockManager
+ properties.setProperty(SimpleLockProperties.EXPIRE_CHECK_SEC, "abc");
+ mgr.configure(properties);
+
+ // should create a manager using default properties
+ assertTrue(mgr.getLockManager() instanceof SimpleLockManager);
+ }
+
+ /**
+ * Tests createLockManager() when SimpleLockManager is returned.
+ */
+ @Test
+ public void testCreateLockManagerSimple() {
+ when(prov2.beforeCreateLockManager(any(), any())).thenReturn(null);
+
+ mgr.configure(properties);
+ assertTrue(mgr.getLockManager() instanceof SimpleLockManager);
+ }
+
@Test
public void testConfigureProperties() throws Exception {
// arrange for first provider to throw exceptions
@@ -667,6 +768,12 @@ public class PolicyEngineManagerTest {
when(sink1.start()).thenThrow(new RuntimeException(EXPECTED));
});
+ // lock manager fails to start - still does everything
+ testStart(false, () -> when(lockmgr.start()).thenReturn(false));
+
+ // lock manager throws an exception - still does everything
+ testStart(false, () -> when(lockmgr.start()).thenThrow(new RuntimeException(EXPECTED)));
+
// servlet wait fails - still does everything
testStart(false, () -> when(server1.waitedStart(anyLong())).thenReturn(false));
@@ -796,6 +903,12 @@ public class PolicyEngineManagerTest {
// servlet fails to stop - still does everything
testStop(false, () -> when(server1.stop()).thenReturn(false));
+ // lock manager fails to stop - still does everything
+ testStop(false, () -> when(lockmgr.stop()).thenReturn(false));
+
+ // lock manager throws an exception - still does everything
+ testStop(false, () -> when(lockmgr.stop()).thenThrow(new RuntimeException(EXPECTED)));
+
// other tests
checkBeforeAfter(
(prov, flag) -> when(prov.beforeStop(mgr)).thenReturn(flag),
@@ -861,6 +974,10 @@ public class PolicyEngineManagerTest {
assertTrue(threadStarted);
assertTrue(threadInterrupted);
+
+ // lock manager throws an exception - still does everything
+ testShutdown(() -> doThrow(new RuntimeException(EXPECTED)).when(lockmgr).shutdown());
+
// other tests
checkBeforeAfter(
(prov, flag) -> when(prov.beforeShutdown(mgr)).thenReturn(flag),
@@ -906,6 +1023,8 @@ public class PolicyEngineManagerTest {
verify(prov1).afterShutdown(mgr);
verify(prov2).afterShutdown(mgr);
+
+ verify(exsvc).shutdownNow();
}
@Test
@@ -985,6 +1104,12 @@ public class PolicyEngineManagerTest {
// endpoint manager fails to lock - still does everything
testLock(false, () -> when(endpoint.lock()).thenReturn(false));
+ // lock manager fails to lock - still does everything
+ testLock(false, () -> when(lockmgr.lock()).thenReturn(false));
+
+ // lock manager throws an exception - still does everything
+ testLock(false, () -> when(lockmgr.lock()).thenThrow(new RuntimeException(EXPECTED)));
+
// other tests
checkBeforeAfter(
(prov, flag) -> when(prov.beforeLock(mgr)).thenReturn(flag),
@@ -1055,6 +1180,12 @@ public class PolicyEngineManagerTest {
// endpoint manager fails to unlock - still does everything
testUnlock(false, () -> when(endpoint.unlock()).thenReturn(false));
+ // lock manager fails to lock - still does everything
+ testUnlock(false, () -> when(lockmgr.unlock()).thenReturn(false));
+
+ // lock manager throws an exception - still does everything
+ testUnlock(false, () -> when(lockmgr.unlock()).thenThrow(new RuntimeException(EXPECTED)));
+
// other tests
checkBeforeAfter(
(prov, flag) -> when(prov.beforeUnlock(mgr)).thenReturn(flag),
@@ -1484,6 +1615,32 @@ public class PolicyEngineManagerTest {
}
@Test
+ public void testCreateLock() {
+ Lock lock = mock(Lock.class);
+ LockCallback callback = mock(LockCallback.class);
+ when(lockmgr.createLock(MY_RESOURCE, MY_OWNER, 10, callback, false)).thenReturn(lock);
+
+ // not configured yet, thus no lock manager
+ assertThatIllegalStateException()
+ .isThrownBy(() -> mgr.createLock(MY_RESOURCE, MY_OWNER, 10, callback, false));
+
+ // now configure it and try again
+ mgr.configure(properties);
+ assertSame(lock, mgr.createLock(MY_RESOURCE, MY_OWNER, 10, callback, false));
+
+ // test illegal args
+ assertThatThrownBy(() -> mgr.createLock(null, MY_OWNER, 10, callback, false))
+ .hasMessageContaining("resourceId");
+ assertThatThrownBy(() -> mgr.createLock(MY_RESOURCE, null, 10, callback, false))
+ .hasMessageContaining("ownerKey");
+ assertThatIllegalArgumentException()
+ .isThrownBy(() -> mgr.createLock(MY_RESOURCE, MY_OWNER, -1, callback, false))
+ .withMessageContaining("holdSec");
+ assertThatThrownBy(() -> mgr.createLock(MY_RESOURCE, MY_OWNER, 10, null, false))
+ .hasMessageContaining("callback");
+ }
+
+ @Test
public void testOpen() throws Throwable {
when(prov1.beforeOpen(mgr)).thenThrow(new RuntimeException(EXPECTED));
when(prov1.afterOpen(mgr)).thenThrow(new RuntimeException(EXPECTED));
@@ -1789,6 +1946,11 @@ public class PolicyEngineManagerTest {
return engine;
}
+ @Override
+ protected ScheduledExecutorService makeScheduledExecutor(int nthreads) {
+ return exsvc;
+ }
+
/**
* Shutdown thread with overrides.
*/
diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerExceptionTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerExceptionTest.java
new file mode 100644
index 00000000..7ffc72ff
--- /dev/null
+++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerExceptionTest.java
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.system.internal;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.onap.policy.common.utils.test.ExceptionsTester;
+import org.onap.policy.drools.system.internal.SimpleLockManagerException;
+
+public class SimpleLockManagerExceptionTest extends ExceptionsTester {
+
+ @Test
+ public void test() {
+ assertEquals(1, test(SimpleLockManagerException.class));
+ }
+}
diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java
new file mode 100644
index 00000000..66406898
--- /dev/null
+++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java
@@ -0,0 +1,781 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.system.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.utils.time.CurrentTime;
+import org.onap.policy.common.utils.time.TestTime;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.LockState;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.onap.policy.drools.system.PolicyEngineConstants;
+import org.onap.policy.drools.system.internal.SimpleLockManager.SimpleLock;
+import org.powermock.reflect.Whitebox;
+
+public class SimpleLockManagerTest {
+ private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
+ private static final String TIME_FIELD = "currentTime";
+ private static final String OWNER_KEY = "my key";
+ private static final String RESOURCE = "my resource";
+ private static final String RESOURCE2 = "my resource #2";
+ private static final String RESOURCE3 = "my resource #3";
+ private static final int HOLD_SEC = 100;
+ private static final int HOLD_SEC2 = 120;
+ private static final int HOLD_MS = HOLD_SEC * 1000;
+ private static final int HOLD_MS2 = HOLD_SEC2 * 1000;
+ private static final int MAX_THREADS = 10;
+ private static final int MAX_LOOPS = 50;
+
+ private static CurrentTime saveTime;
+ private static ScheduledExecutorService saveExec;
+ private static ScheduledExecutorService realExec;
+
+ private TestTime testTime;
+ private AtomicInteger nactive;
+ private AtomicInteger nsuccesses;
+ private SimpleLockManager feature;
+
+ @Mock
+ private ScheduledExecutorService exsvc;
+
+ @Mock
+ private PolicyEngine engine;
+
+ @Mock
+ private ScheduledFuture<?> future;
+
+ @Mock
+ private LockCallback callback;
+
+
+ /**
+ * Saves static fields and configures the location of the property files.
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ saveTime = Whitebox.getInternalState(SimpleLockManager.class, TIME_FIELD);
+ saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
+
+ realExec = Executors.newScheduledThreadPool(3);
+ Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
+ }
+
+ /**
+ * Restores static fields.
+ */
+ @AfterClass
+ public static void tearDownAfterClass() {
+ Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, saveTime);
+ Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
+
+ realExec.shutdown();
+ }
+
+ /**
+ * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
+ * tasks.
+ */
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ testTime = new TestTime();
+ nactive = new AtomicInteger(0);
+ nsuccesses = new AtomicInteger(0);
+
+ Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, testTime);
+
+ when(engine.getExecutorService()).thenReturn(exsvc);
+
+ feature = new MyLockingFeature();
+ feature.start();
+ }
+
+ /**
+ * Tests constructor() when properties are invalid.
+ */
+ @Test
+ public void testSimpleLockManagerInvalidProperties() {
+ // use properties containing an invalid value
+ Properties props = new Properties();
+ props.setProperty(SimpleLockProperties.EXPIRE_CHECK_SEC, "abc");
+
+ assertThatThrownBy(() -> new MyLockingFeature(engine, props)).isInstanceOf(SimpleLockManagerException.class);
+ }
+
+ @Test
+ public void testIsAlive() {
+ assertTrue(feature.isAlive());
+
+ feature.stop();
+ assertFalse(feature.isAlive());
+ }
+
+ @Test
+ public void testStart() {
+ assertFalse(feature.start());
+
+ feature.stop();
+ assertTrue(feature.start());
+ }
+
+ @Test
+ public void testStop() {
+ assertTrue(feature.stop());
+ verify(future).cancel(true);
+
+ assertFalse(feature.stop());
+
+ // no more invocations
+ verify(future).cancel(anyBoolean());
+ }
+
+ @Test
+ public void testShutdown() {
+ feature.shutdown();
+
+ verify(future).cancel(true);
+ }
+
+ @Test
+ public void testLockApi() {
+ assertFalse(feature.isLocked());
+ assertTrue(feature.lock());
+ assertTrue(feature.unlock());
+ }
+
+ @Test
+ public void testCreateLock() {
+ // this lock should be granted immediately
+ SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertTrue(lock.isActive());
+ assertEquals(testTime.getMillis() + HOLD_MS, lock.getHoldUntilMs());
+
+ invokeCallback(1);
+
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+
+ // this time it should be busy
+ Lock lock2 = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertFalse(lock2.isActive());
+ assertTrue(lock2.isUnavailable());
+
+ invokeCallback(2);
+
+ verify(callback, never()).lockAvailable(lock2);
+ verify(callback).lockUnavailable(lock2);
+
+ // should have been no change to the original lock
+ assertTrue(lock.isActive());
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+
+ // should work with "true" value also
+ Lock lock3 = feature.createLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, true);
+ assertTrue(lock3.isActive());
+ invokeCallback(3);
+ verify(callback).lockAvailable(lock3);
+ verify(callback, never()).lockUnavailable(lock3);
+ }
+
+ /**
+ * Tests lock() when the feature is not the latest instance.
+ */
+ @Test
+ public void testCreateLockNotLatestInstance() {
+ SimpleLockManager.setLatestInstance(null);
+
+ Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertTrue(lock.isUnavailable());
+ verify(callback, never()).lockAvailable(any());
+ verify(callback).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testCheckExpired() throws InterruptedException {
+ final SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ final SimpleLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
+ final SimpleLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC2, callback, false);
+
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any());
+
+ Runnable checker = captor.getValue();
+
+ // time unchanged - checker should have no impact
+ checker.run();
+ assertTrue(lock.isActive());
+ assertTrue(lock2.isActive());
+ assertTrue(lock3.isActive());
+
+ // expire the first two locks
+ testTime.sleep(HOLD_MS);
+ checker.run();
+ assertFalse(lock.isActive());
+ assertFalse(lock2.isActive());
+ assertTrue(lock3.isActive());
+
+ // run the callbacks
+ captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc, times(5)).execute(captor.capture());
+ captor.getAllValues().forEach(Runnable::run);
+ verify(callback).lockUnavailable(lock);
+ verify(callback).lockUnavailable(lock2);
+ verify(callback, never()).lockUnavailable(lock3);
+
+ // should be able to get a lock on the first two resources
+ assertTrue(feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC + HOLD_SEC2, callback, false).isActive());
+ assertTrue(feature.createLock(RESOURCE2, OWNER_KEY, HOLD_SEC + HOLD_SEC2, callback, false).isActive());
+
+ // lock is still busy on the last resource
+ assertFalse(feature.createLock(RESOURCE3, OWNER_KEY, HOLD_SEC + HOLD_SEC2, callback, false).isActive());
+
+ // expire the last lock
+ testTime.sleep(HOLD_MS2);
+ checker.run();
+ assertFalse(lock3.isActive());
+
+ // run the callback
+ captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc, times(9)).execute(captor.capture());
+ captor.getValue().run();
+ verify(callback).lockUnavailable(lock3);
+ }
+
+ /**
+ * Tests checkExpired(), where the lock is removed from the map between invoking
+ * expired() and compute(). Should cause "null" to be returned by compute().
+ *
+ * @throws InterruptedException if the test is interrupted
+ */
+ @Test
+ public void testCheckExpiredLockDeleted() throws InterruptedException {
+ feature = new MyLockingFeature() {
+ @Override
+ protected SimpleLock makeLock(LockState waiting, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+ return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, feature) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean expired(long currentMs) {
+ // remove the lock from the map
+ free();
+ return true;
+ }
+ };
+ }
+ };
+
+ feature.start();
+
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ invokeCallback(1);
+
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any());
+
+ Runnable checker = captor.getValue();
+
+ checker.run();
+
+ // lock should now be gone and we should be able to get another
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ invokeCallback(2);
+
+ // should have succeeded twice
+ verify(callback, times(2)).lockAvailable(any());
+
+ // lock should not be available now
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ invokeCallback(3);
+ verify(callback).lockUnavailable(any());
+ }
+
+ /**
+ * Tests checkExpired(), where the lock is removed from the map and replaced with a
+ * new lock, between invoking expired() and compute(). Should cause the new lock to be
+ * returned.
+ *
+ * @throws InterruptedException if the test is interrupted
+ */
+ @Test
+ public void testCheckExpiredLockReplaced() throws InterruptedException {
+ feature = new MyLockingFeature() {
+ private boolean madeLock = false;
+
+ @Override
+ protected SimpleLock makeLock(LockState waiting, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+ if (madeLock) {
+ return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, feature);
+ }
+
+ madeLock = true;
+
+ return new SimpleLock(waiting, resourceId, ownerKey, holdSec, callback, feature) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean expired(long currentMs) {
+ // remove the lock from the map and add a new lock
+ free();
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ return true;
+ }
+ };
+ }
+ };
+
+ feature.start();
+
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ invokeCallback(1);
+
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any());
+
+ Runnable checker = captor.getValue();
+
+ checker.run();
+
+ // lock should not be available now
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ invokeCallback(3);
+ verify(callback).lockUnavailable(any());
+ }
+
+ @Test
+ public void testGetThreadPool() {
+ // use a real feature
+ feature = new SimpleLockManager(engine, new Properties());
+
+ // load properties
+ feature.start();
+
+ // should create thread pool
+ feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // should shut down thread pool
+ feature.stop();
+ }
+
+ @Test
+ public void testSimpleLockNoArgs() {
+ SimpleLock lock = new SimpleLock();
+ assertNull(lock.getResourceId());
+ assertNull(lock.getOwnerKey());
+ assertNull(lock.getCallback());
+ assertEquals(0, lock.getHoldSec());
+
+ assertEquals(0, lock.getHoldUntilMs());
+ }
+
+ @Test
+ public void testSimpleLockSimpleLock() {
+ SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertEquals(RESOURCE, lock.getResourceId());
+ assertEquals(OWNER_KEY, lock.getOwnerKey());
+ assertSame(callback, lock.getCallback());
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+
+ assertThatIllegalArgumentException()
+ .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
+ .withMessageContaining("holdSec");
+ }
+
+ @Test
+ public void testSimpleLockSerializable() throws Exception {
+ SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ lock = roundTrip(lock);
+
+ assertTrue(lock.isActive());
+
+ assertEquals(RESOURCE, lock.getResourceId());
+ assertEquals(OWNER_KEY, lock.getOwnerKey());
+ assertNull(lock.getCallback());
+ assertEquals(HOLD_SEC, lock.getHoldSec());
+ }
+
+ @Test
+ public void testSimpleLockExpired() {
+ SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ lock.grant();
+
+ assertFalse(lock.expired(testTime.getMillis()));
+ assertFalse(lock.expired(testTime.getMillis() + HOLD_MS - 1));
+ assertTrue(lock.expired(testTime.getMillis() + HOLD_MS));
+ }
+
+ /**
+ * Tests grant() when the lock is already unavailable.
+ */
+ @Test
+ public void testSimpleLockGrantUnavailable() {
+ SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ lock.setState(LockState.UNAVAILABLE);
+ lock.grant();
+
+ assertTrue(lock.isUnavailable());
+ verify(callback, never()).lockAvailable(any());
+ verify(callback, never()).lockUnavailable(any());
+ }
+
+ @Test
+ public void testSimpleLockFree() {
+ final SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // lock2 should be denied
+ SimpleLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ invokeCallback(2);
+ verify(callback, never()).lockAvailable(lock2);
+ verify(callback).lockUnavailable(lock2);
+
+ // lock2 was denied, so nothing new should happen when freed
+ assertFalse(lock2.free());
+ invokeCallback(2);
+
+ // force lock2 to be active - still nothing should happen
+ Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
+ assertFalse(lock2.free());
+ invokeCallback(2);
+
+ // now free the first lock
+ assertTrue(lock.free());
+ assertEquals(LockState.UNAVAILABLE, lock.getState());
+
+ // should be able to get the lock now
+ SimpleLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ assertTrue(lock3.isActive());
+ }
+
+ /**
+ * Tests that free() works on a serialized lock with a new feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testSimpleLockFreeSerialized() throws Exception {
+ SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ feature = new MyLockingFeature();
+ feature.start();
+
+ lock = roundTrip(lock);
+ assertTrue(lock.free());
+ assertTrue(lock.isUnavailable());
+ }
+
+ /**
+ * Tests free() on a serialized lock without a feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testSimpleLockFreeNoFeature() throws Exception {
+ SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ SimpleLockManager.setLatestInstance(null);
+
+ lock = roundTrip(lock);
+ assertFalse(lock.free());
+ assertTrue(lock.isUnavailable());
+ }
+
+ @Test
+ public void testSimpleLockExtend() {
+ final SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ // lock2 should be denied
+ SimpleLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+ invokeCallback(2);
+ verify(callback, never()).lockAvailable(lock2);
+ verify(callback).lockUnavailable(lock2);
+
+ // lock2 will still be denied
+ lock2.extend(HOLD_SEC, callback);
+ invokeCallback(3);
+ verify(callback, times(2)).lockUnavailable(lock2);
+
+ // force lock2 to be active - should still be denied
+ Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
+ lock2.extend(HOLD_SEC, callback);
+ invokeCallback(4);
+ verify(callback, times(3)).lockUnavailable(lock2);
+
+ assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
+ .withMessageContaining("holdSec");
+
+ // now extend the first lock
+ lock.extend(HOLD_SEC2, callback);
+ assertEquals(HOLD_SEC2, lock.getHoldSec());
+ assertEquals(testTime.getMillis() + HOLD_MS2, lock.getHoldUntilMs());
+ invokeCallback(5);
+ verify(callback).lockAvailable(lock);
+ verify(callback, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests that extend() works on a serialized lock with a new feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testSimpleLockExtendSerialized() throws Exception {
+ SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ feature = new MyLockingFeature();
+ feature.start();
+
+ lock = roundTrip(lock);
+ LockCallback scallback = mock(LockCallback.class);
+
+ lock.extend(HOLD_SEC, scallback);
+ assertTrue(lock.isActive());
+
+ invokeCallback(1);
+ verify(scallback).lockAvailable(lock);
+ verify(scallback, never()).lockUnavailable(lock);
+ }
+
+ /**
+ * Tests extend() on a serialized lock without a feature.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testSimpleLockExtendNoFeature() throws Exception {
+ SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
+
+ SimpleLockManager.setLatestInstance(null);
+
+ lock = roundTrip(lock);
+ LockCallback scallback = mock(LockCallback.class);
+
+ lock.extend(HOLD_SEC, scallback);
+ assertTrue(lock.isUnavailable());
+
+ invokeCallback(1);
+ verify(scallback, never()).lockAvailable(lock);
+ verify(scallback).lockUnavailable(lock);
+ }
+
+ @Test
+ public void testSimpleLockToString() {
+ String text = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
+ assertNotNull(text);
+ assertThat(text).contains("holdUntil").doesNotContain("ownerInfo").doesNotContain("callback");
+ }
+
+ /**
+ * Performs a multi-threaded test of the locking facility.
+ *
+ * @throws InterruptedException if the current thread is interrupted while waiting for
+ * the background threads to complete
+ */
+ @Test
+ public void testMultiThreaded() throws InterruptedException {
+ Whitebox.setInternalState(SimpleLockManager.class, TIME_FIELD, testTime);
+ feature = new SimpleLockManager(PolicyEngineConstants.getManager(), new Properties());
+ feature.start();
+
+ List<MyThread> threads = new ArrayList<>(MAX_THREADS);
+ for (int x = 0; x < MAX_THREADS; ++x) {
+ threads.add(new MyThread());
+ }
+
+ threads.forEach(Thread::start);
+
+ for (MyThread thread : threads) {
+ thread.join(6000);
+ assertFalse(thread.isAlive());
+ }
+
+ for (MyThread thread : threads) {
+ if (thread.err != null) {
+ throw thread.err;
+ }
+ }
+
+ assertTrue(nsuccesses.get() > 0);
+ }
+
+ private SimpleLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
+ boolean waitForLock) {
+ return (SimpleLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
+ }
+
+ private SimpleLock roundTrip(SimpleLock lock) throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(lock);
+ }
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ try (ObjectInputStream ois = new ObjectInputStream(bais)) {
+ return (SimpleLock) ois.readObject();
+ }
+ }
+
+ /**
+ * Invokes the last call-back in the work queue.
+ *
+ * @param nexpected number of call-backs expected in the work queue
+ */
+ private void invokeCallback(int nexpected) {
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(exsvc, times(nexpected)).execute(captor.capture());
+
+ if (nexpected > 0) {
+ captor.getAllValues().get(nexpected - 1).run();
+ }
+ }
+
+ /**
+ * Feature that uses <i>exsvc</i> to execute requests.
+ */
+ private class MyLockingFeature extends SimpleLockManager {
+
+ public MyLockingFeature() {
+ this(engine, new Properties());
+ }
+
+ public MyLockingFeature(PolicyEngine engine, Properties props) {
+ super(engine, props);
+
+ exsvc = mock(ScheduledExecutorService.class);
+ when(engine.getExecutorService()).thenReturn(exsvc);
+
+ when(exsvc.scheduleWithFixedDelay(any(), anyLong(), anyLong(), any())).thenAnswer(answer -> {
+ return future;
+ });
+ }
+ }
+
+ /**
+ * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
+ * extend it, and then unlock it.
+ */
+ private class MyThread extends Thread {
+ AssertionError err = null;
+
+ public MyThread() {
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (int x = 0; x < MAX_LOOPS; ++x) {
+ makeAttempt();
+ }
+
+ } catch (AssertionError e) {
+ err = e;
+ }
+ }
+
+ private void makeAttempt() {
+ try {
+ Semaphore sem = new Semaphore(0);
+
+ LockCallback cb = new LockCallback() {
+ @Override
+ public void lockAvailable(Lock lock) {
+ sem.release();
+ }
+
+ @Override
+ public void lockUnavailable(Lock lock) {
+ sem.release();
+ }
+ };
+
+ Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
+
+ // wait for callback, whether available or unavailable
+ assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
+ if (!lock.isActive()) {
+ return;
+ }
+
+ nsuccesses.incrementAndGet();
+
+ assertEquals(1, nactive.incrementAndGet());
+
+ lock.extend(HOLD_SEC2, cb);
+ assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
+ assertTrue(lock.isActive());
+
+ // decrement BEFORE free()
+ nactive.decrementAndGet();
+
+ assertTrue(lock.free());
+ assertTrue(lock.isUnavailable());
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new AssertionError("interrupted", e);
+ }
+ }
+ }
+}