summaryrefslogtreecommitdiffstats
path: root/feature-distributed-locking/src/main/java/org
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2019-09-24 10:51:21 -0400
committerJim Hahn <jrh3@att.com>2019-10-17 15:40:32 -0400
commit6e0b450abe7e62fa47ffe14e95a67d035174dbdb (patch)
treee91c7bfb7365f9a06ad5674bc83e041b7237e378 /feature-distributed-locking/src/main/java/org
parent1528214803af722cd660b7c4a3129f3de5b4ea7f (diff)
Reimplement Lock API using Lock objects
Modified PolicyResourceLockManager to just return a feature, deferring the lock() call/method to the feature, itself. The manager was also modified so that, if it can't find an enabled provider, it will return a default provider, whose lock() methods always fail. Once a feature has been identified, the manager will cache it for use thereafter. Modified the feature API to return lock objects and simplified the interface to remove the beforeXxx and afterXxx methods. Moved the unlock and refresh methods from the feature API into the lock class, renaming them to free and extend, respectively. Added a separate, feature-simple-locking project, which implements a simple version of the locking feature, over a single JVM. Extensively revised the distributed locking feature to fit in with the new API. Added support for persistence so that the various LockImpl classes can be serialized and still function correctly when they are deserialized back into new feature instances Added default implementations of free & extend to LockImpl. Modified API to take the ownerKey string, instead of the owner object. Removed Extractor as unneeded - may add via another review, if still useful. Updates per review comments: - Updated licenses in feature-simple-locking - Added beforeCreateLock & afterCreateLock to feature API - Moved SimpleLockingFeature into policy-management so that it's always available - Moved the executor service, "exsvc", into PolicyEngine - Moved Extrator into policy-utils - Changed Extractor logging level for exceptions - Fixed feature sequence numbers - Fixed mixing of seconds and milliseconds - Renamed exsvc - Modified to use property method with default value - Configured scheduled executor - Added suffix to Extractor.register() - Eliminated Feature Api and tied lock manager into engine - Added non-null checks to LockImpl parameters - Added non-null checks to createLock() parameters - Checked that lockManager is initialized Change-Id: Iddba38157ddc5f7277656979c0e679e5489eb7b1 Issue-ID: POLICY-2113 Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'feature-distributed-locking/src/main/java/org')
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java936
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java (renamed from feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java)12
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java136
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java179
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java127
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java282
6 files changed, 1078 insertions, 594 deletions
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java
new file mode 100644
index 00000000..523c0d93
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java
@@ -0,0 +1,936 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.commons.dbcp2.BasicDataSourceFactory;
+import org.onap.policy.common.utils.network.NetworkUtil;
+import org.onap.policy.drools.core.lock.AlwaysFailLock;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.LockImpl;
+import org.onap.policy.drools.core.lock.LockState;
+import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
+import org.onap.policy.drools.features.PolicyEngineFeatureApi;
+import org.onap.policy.drools.persistence.SystemPersistenceConstants;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Distributed implementation of the Lock Feature. Maintains locks across servers using a
+ * shared DB.
+ *
+ * <p/>
+ * Note: this implementation does <i>not</i> honor the waitForLocks={@code true}
+ * parameter.
+ *
+ * <p/>
+ * Additional Notes:
+ * <dl>
+ * <li>The <i>owner</i> field in the DB is not derived from the lock's owner info, but is
+ * instead populated with the {@link #uuidString}.</li>
+ * <li>A periodic check of the DB is made to determine if any of the locks have
+ * expired.</li>
+ * <li>When a lock is deserialized, it will not initially appear in this feature's map; it
+ * will be added to the map once free() or extend() is invoked, provided there isn't
+ * already an entry. In addition, it initially has the host and UUID of the feature
+ * instance that created it. However, as soon as doExtend() completes successfully, the
+ * host and UUID of the lock will be updated to reflect the values within this feature
+ * instance.</li>
+ * </dl>
+ */
+public class DistributedLockManager
+ implements PolicyResourceLockManager, PolicyEngineFeatureApi {
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class);
+
+ private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
+ private static final String LOCK_LOST_MSG = "lock lost";
+ private static final String NOT_LOCKED_MSG = "not locked";
+
+ @Getter(AccessLevel.PROTECTED)
+ @Setter(AccessLevel.PROTECTED)
+ private static DistributedLockManager latestInstance = null;
+
+
+ /**
+ * Name of the host on which this JVM is running.
+ */
+ @Getter
+ private final String hostName;
+
+ /**
+ * UUID of this object.
+ */
+ @Getter
+ private final String uuidString = UUID.randomUUID().toString();
+
+ /**
+ * Maps a resource to the lock that owns it, or is awaiting a request for it. Once a
+ * lock is added to the map, it remains in the map until the lock is lost or until the
+ * unlock request completes.
+ */
+ private final Map<String, DistributedLock> resource2lock = new ConcurrentHashMap<>();
+
+ /**
+ * Engine with which this manager is associated.
+ */
+ private PolicyEngine engine;
+
+ /**
+ * Feature properties.
+ */
+ private DistributedLockProperties featProps;
+
+ /**
+ * Thread pool used to check for lock expiration and to notify owners when locks are
+ * granted or lost.
+ */
+ private ScheduledExecutorService exsvc = null;
+
+ /**
+ * Data source used to connect to the DB.
+ */
+ private BasicDataSource dataSource = null;
+
+
+ /**
+ * Constructs the object.
+ */
+ public DistributedLockManager() {
+ this.hostName = NetworkUtil.getHostname();
+ }
+
+ @Override
+ public int getSequenceNumber() {
+ return 1000;
+ }
+
+ @Override
+ public boolean isAlive() {
+ return (exsvc != null);
+ }
+
+ @Override
+ public boolean start() {
+ // handled via engine API
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ // handled via engine API
+ return true;
+ }
+
+ @Override
+ public void shutdown() {
+ // handled via engine API
+ }
+
+ @Override
+ public boolean isLocked() {
+ return false;
+ }
+
+ @Override
+ public boolean lock() {
+ return true;
+ }
+
+ @Override
+ public boolean unlock() {
+ return true;
+ }
+
+ @Override
+ public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) {
+
+ try {
+ this.engine = engine;
+ this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME));
+ this.exsvc = getThreadPool();
+ this.dataSource = makeDataSource();
+
+ return this;
+
+ } catch (Exception e) {
+ throw new DistributedLockManagerException(e);
+ }
+ }
+
+ @Override
+ public boolean afterStart(PolicyEngine engine) {
+
+ try {
+ exsvc.execute(this::deleteExpiredDbLocks);
+ exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
+
+ setLatestInstance(this);
+
+ } catch (Exception e) {
+ throw new DistributedLockManagerException(e);
+ }
+
+ return false;
+ }
+
+ /**
+ * Make data source.
+ *
+ * @return a new, pooled data source
+ * @throws Exception exception
+ */
+ protected BasicDataSource makeDataSource() throws Exception {
+ Properties props = new Properties();
+ props.put("driverClassName", featProps.getDbDriver());
+ props.put("url", featProps.getDbUrl());
+ props.put("username", featProps.getDbUser());
+ props.put("password", featProps.getDbPwd());
+ props.put("testOnBorrow", "true");
+ props.put("poolPreparedStatements", "true");
+
+ // additional properties are listed in the GenericObjectPool API
+
+ return BasicDataSourceFactory.createDataSource(props);
+ }
+
+ /**
+ * Deletes expired locks from the DB.
+ */
+ private void deleteExpiredDbLocks() {
+ logger.info("deleting all expired locks from the DB");
+
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn
+ .prepareStatement("DELETE FROM pooling.locks WHERE expirationTime <= now()")) {
+
+ int ndel = stmt.executeUpdate();
+ logger.info("deleted {} expired locks from the DB", ndel);
+
+ } catch (SQLException e) {
+ logger.warn("failed to delete expired locks from the DB", e);
+ }
+ }
+
+ /**
+ * Closes the data source. Does <i>not</i> invoke any lock call-backs.
+ */
+ @Override
+ public boolean afterStop(PolicyEngine engine) {
+ exsvc = null;
+ closeDataSource();
+ return false;
+ }
+
+ /**
+ * Closes {@link #dataSource} and sets it to {@code null}.
+ */
+ private void closeDataSource() {
+ try {
+ if (dataSource != null) {
+ dataSource.close();
+ }
+
+ } catch (SQLException e) {
+ logger.error("cannot close the distributed locking DB", e);
+ }
+
+ dataSource = null;
+ }
+
+ @Override
+ public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback,
+ boolean waitForLock) {
+
+ if (latestInstance != this) {
+ AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback);
+ lock.notifyUnavailable();
+ return lock;
+ }
+
+ DistributedLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback);
+
+ DistributedLock existingLock = resource2lock.putIfAbsent(resourceId, lock);
+
+ // do these outside of compute() to avoid blocking other map operations
+ if (existingLock == null) {
+ logger.debug("added lock to map {}", lock);
+ lock.scheduleRequest(lock::doLock);
+ } else {
+ lock.deny("resource is busy", true);
+ }
+
+ return lock;
+ }
+
+ /**
+ * Checks for expired locks.
+ */
+ private void checkExpired() {
+
+ try {
+ logger.info("checking for expired locks");
+ Set<String> expiredIds = new HashSet<>(resource2lock.keySet());
+ identifyDbLocks(expiredIds);
+ expireLocks(expiredIds);
+
+ exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
+
+ } catch (RejectedExecutionException e) {
+ logger.warn("thread pool is no longer accepting requests", e);
+
+ } catch (SQLException | RuntimeException e) {
+ logger.error("error checking expired locks", e);
+ exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS);
+ }
+
+ logger.info("done checking for expired locks");
+ }
+
+ /**
+ * Identifies this feature instance's locks that the DB indicates are still active.
+ *
+ * @param expiredIds IDs of resources that have expired locks. If a resource is still
+ * locked, it's ID is removed from this set
+ * @throws SQLException if a DB error occurs
+ */
+ private void identifyDbLocks(Set<String> expiredIds) throws SQLException {
+ /*
+ * We could query for host and UUIDs that actually appear within the locks, but
+ * those might change while the query is running so no real value in doing that.
+ * On the other hand, there's only a brief instance between the time a
+ * deserialized lock is added to this feature instance and its doExtend() method
+ * updates its host and UUID to match this feature instance. If this happens to
+ * run during that brief instance, then the lock will be lost and the callback
+ * invoked. It isn't worth complicating this code further to handle those highly
+ * unlikely cases.
+ */
+
+ // @formatter:off
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn.prepareStatement(
+ "SELECT resourceId FROM pooling.locks WHERE host=? AND owner=? AND expirationTime > now()")) {
+ // @formatter:on
+
+ stmt.setString(1, hostName);
+ stmt.setString(2, uuidString);
+
+ try (ResultSet resultSet = stmt.executeQuery()) {
+ while (resultSet.next()) {
+ String resourceId = resultSet.getString(1);
+
+ // we have now seen this resource id
+ expiredIds.remove(resourceId);
+ }
+ }
+ }
+ }
+
+ /**
+ * Expires locks for the resources that no longer appear within the DB.
+ *
+ * @param expiredIds IDs of resources that have expired locks
+ */
+ private void expireLocks(Set<String> expiredIds) {
+ for (String resourceId : expiredIds) {
+ AtomicReference<DistributedLock> lockref = new AtomicReference<>(null);
+
+ resource2lock.computeIfPresent(resourceId, (key, lock) -> {
+ if (lock.isActive()) {
+ // it thinks it's active, but it isn't - remove from the map
+ lockref.set(lock);
+ return null;
+ }
+
+ return lock;
+ });
+
+ DistributedLock lock = lockref.get();
+ if (lock != null) {
+ logger.debug("removed lock from map {}", lock);
+ lock.deny(LOCK_LOST_MSG, false);
+ }
+ }
+ }
+
+ /**
+ * Distributed Lock implementation.
+ */
+ public static class DistributedLock extends LockImpl {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Feature containing this lock. May be {@code null} until the feature is
+ * identified. Note: this can only be null if the lock has been de-serialized.
+ */
+ private transient DistributedLockManager feature;
+
+ /**
+ * Host name from the feature instance that created this object. Replaced with the
+ * host name from the current feature instance whenever the lock is successfully
+ * extended.
+ */
+ private String hostName;
+
+ /**
+ * UUID string from the feature instance that created this object. Replaced with
+ * the UUID string from the current feature instance whenever the lock is
+ * successfully extended.
+ */
+ private String uuidString;
+
+ /**
+ * {@code True} if the lock is busy making a request, {@code false} otherwise.
+ */
+ private transient boolean busy = false;
+
+ /**
+ * Request to be performed.
+ */
+ private transient RunnableWithEx request = null;
+
+ /**
+ * Number of times we've retried a request.
+ */
+ private transient int nretries = 0;
+
+ /**
+ * Constructs the object.
+ */
+ public DistributedLock() {
+ this.hostName = "";
+ this.uuidString = "";
+ }
+
+ /**
+ * Constructs the object.
+ *
+ * @param state initial state of the lock
+ * @param resourceId identifier of the resource to be locked
+ * @param ownerKey information identifying the owner requesting the lock
+ * @param holdSec amount of time, in seconds, for which the lock should be held,
+ * after which it will automatically be released
+ * @param callback callback to be invoked once the lock is granted, or
+ * subsequently lost; must not be {@code null}
+ * @param feature feature containing this lock
+ */
+ public DistributedLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback,
+ DistributedLockManager feature) {
+ super(state, resourceId, ownerKey, holdSec, callback);
+
+ this.feature = feature;
+ this.hostName = feature.hostName;
+ this.uuidString = feature.uuidString;
+ }
+
+ /**
+ * Grants this lock. The notification is <i>always</i> invoked via the
+ * <i>foreground</i> thread.
+ */
+ protected void grant() {
+ synchronized (this) {
+ if (isUnavailable()) {
+ return;
+ }
+
+ setState(LockState.ACTIVE);
+ }
+
+ logger.info("lock granted: {}", this);
+
+ notifyAvailable();
+ }
+
+ /**
+ * Permanently denies this lock.
+ *
+ * @param reason the reason the lock was denied
+ * @param foreground {@code true} if the callback can be invoked in the current
+ * (i.e., foreground) thread, {@code false} if it should be invoked via the
+ * executor
+ */
+ protected void deny(String reason, boolean foreground) {
+ synchronized (this) {
+ setState(LockState.UNAVAILABLE);
+ }
+
+ logger.info("{}: {}", reason, this);
+
+ if (feature == null || foreground) {
+ notifyUnavailable();
+
+ } else {
+ feature.exsvc.execute(this::notifyUnavailable);
+ }
+ }
+
+ @Override
+ public boolean free() {
+ // do a quick check of the state
+ if (isUnavailable()) {
+ return false;
+ }
+
+ logger.info("releasing lock: {}", this);
+
+ if (!attachFeature()) {
+ setState(LockState.UNAVAILABLE);
+ return false;
+ }
+
+ AtomicBoolean result = new AtomicBoolean(false);
+
+ feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
+ if (curlock == this && !isUnavailable()) {
+ // this lock was the owner
+ result.set(true);
+ setState(LockState.UNAVAILABLE);
+
+ /*
+ * NOTE: do NOT return null; curlock must remain until doUnlock
+ * completes.
+ */
+ }
+
+ return curlock;
+ });
+
+ if (result.get()) {
+ scheduleRequest(this::doUnlock);
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public void extend(int holdSec, LockCallback callback) {
+ if (holdSec < 0) {
+ throw new IllegalArgumentException("holdSec is negative");
+ }
+
+ setHoldSec(holdSec);
+ setCallback(callback);
+
+ // do a quick check of the state
+ if (isUnavailable() || !attachFeature()) {
+ deny(LOCK_LOST_MSG, true);
+ return;
+ }
+
+ AtomicBoolean success = new AtomicBoolean(false);
+
+ feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
+ if (curlock == this && !isUnavailable()) {
+ success.set(true);
+ setState(LockState.WAITING);
+ }
+
+ // note: leave it in the map until doUnlock() removes it
+
+ return curlock;
+ });
+
+ if (success.get()) {
+ scheduleRequest(this::doExtend);
+
+ } else {
+ deny(NOT_LOCKED_MSG, true);
+ }
+ }
+
+ /**
+ * Attaches to the feature instance, if not already attached.
+ *
+ * @return {@code true} if the lock is now attached to a feature, {@code false}
+ * otherwise
+ */
+ private synchronized boolean attachFeature() {
+ if (feature != null) {
+ // already attached
+ return true;
+ }
+
+ feature = latestInstance;
+ if (feature == null) {
+ logger.warn("no feature yet for {}", this);
+ return false;
+ }
+
+ // put this lock into the map
+ feature.resource2lock.putIfAbsent(getResourceId(), this);
+
+ return true;
+ }
+
+ /**
+ * Schedules a request for execution.
+ *
+ * @param schedreq the request that should be scheduled
+ */
+ private synchronized void scheduleRequest(RunnableWithEx schedreq) {
+ logger.debug("schedule lock action {}", this);
+ nretries = 0;
+ request = schedreq;
+ feature.exsvc.execute(this::doRequest);
+ }
+
+ /**
+ * Reschedules a request for execution, if there is not already a request in the
+ * queue, and if the retry count has not been exhausted.
+ *
+ * @param req request to be rescheduled
+ */
+ private void rescheduleRequest(RunnableWithEx req) {
+ synchronized (this) {
+ if (request != null) {
+ // a new request has already been scheduled - it supersedes "req"
+ logger.debug("not rescheduling lock action {}", this);
+ return;
+ }
+
+ if (nretries++ < feature.featProps.getMaxRetries()) {
+ logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this);
+ request = req;
+ feature.exsvc.schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS);
+ return;
+ }
+ }
+
+ logger.warn("retry count {} exhausted for lock: {}", feature.featProps.getMaxRetries(), this);
+ removeFromMap();
+ }
+
+ /**
+ * Gets, and removes, the next request from the queue. Clears {@link #busy} if
+ * there are no more requests in the queue.
+ *
+ * @param prevReq the previous request that was just run
+ *
+ * @return the next request, or {@code null} if the queue is empty
+ */
+ private synchronized RunnableWithEx getNextRequest(RunnableWithEx prevReq) {
+ if (request == null || request == prevReq) {
+ logger.debug("no more requests for {}", this);
+ busy = false;
+ return null;
+ }
+
+ RunnableWithEx req = request;
+ request = null;
+
+ return req;
+ }
+
+ /**
+ * Executes the current request, if none are currently executing.
+ */
+ private void doRequest() {
+ synchronized (this) {
+ if (busy) {
+ // another thread is already processing the request(s)
+ return;
+ }
+ busy = true;
+ }
+
+ /*
+ * There is a race condition wherein this thread could invoke run() while the
+ * next scheduled thread checks the busy flag and finds that work is being
+ * done and returns, leaving the next work item in "request". In that case,
+ * the next work item may never be executed, thus we use a loop here, instead
+ * of just executing a single request.
+ */
+ RunnableWithEx req = null;
+ while ((req = getNextRequest(req)) != null) {
+ if (feature.resource2lock.get(getResourceId()) != this) {
+ /*
+ * no longer in the map - don't apply the action, as it may interfere
+ * with any newly added Lock object
+ */
+ logger.debug("discard lock action {}", this);
+ synchronized (this) {
+ busy = false;
+ }
+ return;
+ }
+
+ try {
+ /*
+ * Run the request. If it throws an exception, then it will be
+ * rescheduled for execution a little later.
+ */
+ req.run();
+
+ } catch (SQLException e) {
+ logger.warn("request failed for lock: {}", this, e);
+
+ if (feature.featProps.isTransient(e.getErrorCode())) {
+ // retry the request a little later
+ rescheduleRequest(req);
+ } else {
+ removeFromMap();
+ }
+
+ } catch (RuntimeException e) {
+ logger.warn("request failed for lock: {}", this, e);
+ removeFromMap();
+ }
+ }
+ }
+
+ /**
+ * Attempts to add a lock to the DB. Generates a callback, indicating success or
+ * failure.
+ *
+ * @throws SQLException if a DB error occurs
+ */
+ private void doLock() throws SQLException {
+ if (!isWaiting()) {
+ logger.debug("discard doLock {}", this);
+ return;
+ }
+
+ /*
+ * There is a small window in which a client could invoke free() before the DB
+ * is updated. In that case, doUnlock will be added to the queue to run after
+ * this, which will delete the record, as desired. In addition, grant() will
+ * not do anything, because the lock state will have been set to UNAVAILABLE
+ * by free().
+ */
+
+ logger.debug("doLock {}", this);
+ try (Connection conn = feature.dataSource.getConnection()) {
+ boolean success = false;
+ try {
+ success = doDbInsert(conn);
+
+ } catch (SQLException e) {
+ logger.info("failed to insert lock record - attempting update: {}", this, e);
+ success = doDbUpdate(conn);
+ }
+
+ if (success) {
+ grant();
+ return;
+ }
+ }
+
+ removeFromMap();
+ }
+
+ /**
+ * Attempts to remove a lock from the DB. Does <i>not</i> generate a callback if
+ * it fails, as this should only be executed in response to a call to
+ * {@link #free()}.
+ *
+ * @throws SQLException if a DB error occurs
+ */
+ private void doUnlock() throws SQLException {
+ logger.debug("unlock {}", this);
+ try (Connection conn = feature.dataSource.getConnection()) {
+ doDbDelete(conn);
+ }
+
+ removeFromMap();
+ }
+
+ /**
+ * Attempts to extend a lock in the DB. Generates a callback, indicating success
+ * or failure.
+ *
+ * @throws SQLException if a DB error occurs
+ */
+ private void doExtend() throws SQLException {
+ if (!isWaiting()) {
+ logger.debug("discard doExtend {}", this);
+ return;
+ }
+
+ /*
+ * There is a small window in which a client could invoke free() before the DB
+ * is updated. In that case, doUnlock will be added to the queue to run after
+ * this, which will delete the record, as desired. In addition, grant() will
+ * not do anything, because the lock state will have been set to UNAVAILABLE
+ * by free().
+ */
+
+ logger.debug("doExtend {}", this);
+ try (Connection conn = feature.dataSource.getConnection()) {
+ /*
+ * invoker may have called extend() before free() had a chance to insert
+ * the record, thus we have to try to insert, if the update fails
+ */
+ if (doDbUpdate(conn) || doDbInsert(conn)) {
+ grant();
+ return;
+ }
+ }
+
+ removeFromMap();
+ }
+
+ /**
+ * Inserts the lock into the DB.
+ *
+ * @param conn DB connection
+ * @return {@code true} if a record was successfully inserted, {@code false}
+ * otherwise
+ * @throws SQLException if a DB error occurs
+ */
+ protected boolean doDbInsert(Connection conn) throws SQLException {
+ logger.debug("insert lock record {}", this);
+ try (PreparedStatement stmt =
+ conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
+ + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
+
+ stmt.setString(1, getResourceId());
+ stmt.setString(2, feature.hostName);
+ stmt.setString(3, feature.uuidString);
+ stmt.setInt(4, getHoldSec());
+
+ stmt.executeUpdate();
+
+ this.hostName = feature.hostName;
+ this.uuidString = feature.uuidString;
+
+ return true;
+ }
+ }
+
+ /**
+ * Updates the lock in the DB.
+ *
+ * @param conn DB connection
+ * @return {@code true} if a record was successfully updated, {@code false}
+ * otherwise
+ * @throws SQLException if a DB error occurs
+ */
+ protected boolean doDbUpdate(Connection conn) throws SQLException {
+ logger.debug("update lock record {}", this);
+ try (PreparedStatement stmt =
+ conn.prepareStatement("UPDATE pooling.locks SET resourceId=?, host=?, owner=?,"
+ + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?"
+ + " AND ((host=? AND owner=?) OR expirationTime < now())")) {
+
+ stmt.setString(1, getResourceId());
+ stmt.setString(2, feature.hostName);
+ stmt.setString(3, feature.uuidString);
+ stmt.setInt(4, getHoldSec());
+
+ stmt.setString(5, getResourceId());
+ stmt.setString(6, this.hostName);
+ stmt.setString(7, this.uuidString);
+
+ if (stmt.executeUpdate() != 1) {
+ return false;
+ }
+
+ this.hostName = feature.hostName;
+ this.uuidString = feature.uuidString;
+
+ return true;
+ }
+ }
+
+ /**
+ * Deletes the lock from the DB.
+ *
+ * @param conn DB connection
+ * @throws SQLException if a DB error occurs
+ */
+ protected void doDbDelete(Connection conn) throws SQLException {
+ logger.debug("delete lock record {}", this);
+ try (PreparedStatement stmt =
+ conn.prepareStatement("DELETE pooling.locks WHERE resourceId=? AND host=? AND owner=?")) {
+
+ stmt.setString(1, getResourceId());
+ stmt.setString(2, this.hostName);
+ stmt.setString(3, this.uuidString);
+
+ stmt.executeUpdate();
+ }
+ }
+
+ /**
+ * Removes the lock from the map, and sends a notification using the current
+ * thread.
+ */
+ private void removeFromMap() {
+ logger.debug("remove lock from map {}", this);
+ feature.resource2lock.remove(getResourceId(), this);
+
+ synchronized (this) {
+ if (!isUnavailable()) {
+ deny(LOCK_LOST_MSG, true);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "DistributedLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey="
+ + getOwnerKey() + ", holdSec=" + getHoldSec() + ", hostName=" + hostName + ", uuidString="
+ + uuidString + "]";
+ }
+ }
+
+ @FunctionalInterface
+ private static interface RunnableWithEx {
+ void run() throws SQLException;
+ }
+
+ // these may be overridden by junit tests
+
+ protected Properties getProperties(String fileName) {
+ return SystemPersistenceConstants.getManager().getProperties(fileName);
+ }
+
+ protected ScheduledExecutorService getThreadPool() {
+ return engine.getExecutorService();
+ }
+
+ protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+ return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this);
+ }
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java
index 55fc4fab..e720f9a1 100644
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* feature-distributed-locking
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,15 +20,15 @@
package org.onap.policy.distributed.locking;
-public class DistributedLockingFeatureException extends RuntimeException {
+public class DistributedLockManagerException extends RuntimeException {
private static final long serialVersionUID = 1L;
/**
* Constructor.
- *
+ *
* @param ex exception to be wrapped
*/
- public DistributedLockingFeatureException(Exception ex) {
+ public DistributedLockManagerException(Exception ex) {
super(ex);
}
}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java
new file mode 100644
index 00000000..f470c8e2
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java
@@ -0,0 +1,136 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.utils.properties.BeanConfigurator;
+import org.onap.policy.common.utils.properties.Property;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+
+
+@Getter
+@Setter
+public class DistributedLockProperties {
+ public static final String PREFIX = "distributed.locking.";
+
+ public static final String DB_DRIVER = "javax.persistence.jdbc.driver";
+ public static final String DB_URL = "javax.persistence.jdbc.url";
+ public static final String DB_USER = "javax.persistence.jdbc.user";
+ public static final String DB_PASS = "javax.persistence.jdbc.password";
+ public static final String TRANSIENT_ERROR_CODES = PREFIX + "transient.error.codes";
+ public static final String EXPIRE_CHECK_SEC = PREFIX + "expire.check.seconds";
+ public static final String RETRY_SEC = PREFIX + "retry.seconds";
+ public static final String MAX_RETRIES = PREFIX + "max.retries";
+
+ /**
+ * Database driver.
+ */
+ @Property(name = DB_DRIVER)
+ private String dbDriver;
+
+ /**
+ * Database url.
+ */
+ @Property(name = DB_URL)
+ private String dbUrl;
+
+ /**
+ * Database user.
+ */
+ @Property(name = DB_USER)
+ private String dbUser;
+
+ /**
+ * Database password.
+ */
+ @Property(name = DB_PASS)
+ private String dbPwd;
+
+ /**
+ * Vendor-specific error codes that are "transient", meaning they may go away if the
+ * command is repeated (e.g., connection issue), as opposed to something like a syntax
+ * error or a duplicate key.
+ */
+ @Property(name = TRANSIENT_ERROR_CODES)
+ private String errorCodeStrings;
+
+ private final Set<Integer> transientErrorCodes;
+
+ /**
+ * Time, in seconds, to wait between checks for expired locks.
+ */
+ @Property(name = EXPIRE_CHECK_SEC, defaultValue = "900")
+ private int expireCheckSec;
+
+ /**
+ * Number of seconds to wait before retrying, after a DB error.
+ */
+ @Property(name = RETRY_SEC, defaultValue = "60")
+ private int retrySec;
+
+ /**
+ * Maximum number of times to retry a DB operation.
+ */
+ @Property(name = MAX_RETRIES, defaultValue = "2")
+ private int maxRetries;
+
+ /**
+ * Constructs the object, populating fields from the properties.
+ *
+ * @param props properties from which to configure this
+ * @throws PropertyException if an error occurs
+ */
+ public DistributedLockProperties(Properties props) throws PropertyException {
+ new BeanConfigurator().configureFromProperties(this, props);
+
+ Set<Integer> set = new HashSet<>();
+ for (String text : errorCodeStrings.split(",")) {
+ text = text.trim();
+ if (text.isEmpty()) {
+ continue;
+ }
+
+ try {
+ set.add(Integer.valueOf(text));
+
+ } catch (NumberFormatException e) {
+ throw new PropertyException(TRANSIENT_ERROR_CODES, "errorCodeStrings", e);
+ }
+ }
+
+ transientErrorCodes = Collections.unmodifiableSet(set);
+ }
+
+ /**
+ * Determines if an error is transient.
+ *
+ * @param errorCode error code to check
+ * @return {@code true} if the error is transient, {@code false} otherwise
+ */
+ public boolean isTransient(int errorCode) {
+ return transientErrorCodes.contains(errorCode);
+ }
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java
deleted file mode 100644
index d5e07a30..00000000
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Properties;
-import java.util.UUID;
-import org.apache.commons.dbcp2.BasicDataSource;
-import org.apache.commons.dbcp2.BasicDataSourceFactory;
-import org.onap.policy.common.utils.properties.exception.PropertyException;
-import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi;
-import org.onap.policy.drools.features.PolicyEngineFeatureApi;
-import org.onap.policy.drools.persistence.SystemPersistenceConstants;
-import org.onap.policy.drools.system.PolicyEngine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DistributedLockingFeature implements PolicyEngineFeatureApi, PolicyResourceLockFeatureApi {
-
- /**
- * Logger instance.
- */
- private static final Logger logger = LoggerFactory.getLogger(DistributedLockingFeature.class);
-
- /**
- * Properties Configuration Name.
- */
- public static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
-
- /**
- * Properties for locking feature.
- */
- private DistributedLockingProperties lockProps;
-
- /**
- * Data source used to connect to the DB containing locks.
- */
- private BasicDataSource dataSource;
-
- /**
- * UUID.
- */
- private static final UUID uuid = UUID.randomUUID();
-
- @Override
- public int getSequenceNumber() {
- return 1000;
- }
-
- @Override
- public OperResult beforeLock(String resourceId, String owner, int holdSec) {
-
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.lock(holdSec) ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeRefresh(String resourceId, String owner, int holdSec) {
-
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.refresh(holdSec) ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeUnlock(String resourceId, String owner) {
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.unlock() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeIsLockedBy(String resourceId, String owner) {
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.isActive() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeIsLocked(String resourceId) {
- TargetLock lock = new TargetLock(resourceId, uuid, "dummyOwner", dataSource);
-
- return (lock.isLocked() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public boolean afterStart(PolicyEngine engine) {
-
- try {
- this.lockProps = new DistributedLockingProperties(SystemPersistenceConstants.getManager()
- .getProperties(DistributedLockingFeature.CONFIGURATION_PROPERTIES_NAME));
- this.dataSource = makeDataSource();
- } catch (PropertyException e) {
- logger.error("DistributedLockingFeature feature properies have not been loaded", e);
- throw new DistributedLockingFeatureException(e);
- } catch (InterruptedException e) {
- logger.error("DistributedLockingFeature failed to create data source", e);
- Thread.currentThread().interrupt();
- throw new DistributedLockingFeatureException(e);
- } catch (Exception e) {
- logger.error("DistributedLockingFeature failed to create data source", e);
- throw new DistributedLockingFeatureException(e);
- }
-
- cleanLockTable();
-
- return false;
- }
-
- /**
- * Make data source.
- *
- * @return a new, pooled data source
- * @throws Exception exception
- */
- protected BasicDataSource makeDataSource() throws Exception {
- Properties props = new Properties();
- props.put("driverClassName", lockProps.getDbDriver());
- props.put("url", lockProps.getDbUrl());
- props.put("username", lockProps.getDbUser());
- props.put("password", lockProps.getDbPwd());
- props.put("testOnBorrow", "true");
- props.put("poolPreparedStatements", "true");
-
- // additional properties are listed in the GenericObjectPool API
-
- return BasicDataSourceFactory.createDataSource(props);
- }
-
- /**
- * This method kills the heartbeat thread and calls refreshLockTable which removes
- * any records from the db where the current host is the owner.
- */
- @Override
- public boolean beforeShutdown(PolicyEngine engine) {
- cleanLockTable();
- return false;
- }
-
- /**
- * This method removes all records owned by the current host from the db.
- */
- private void cleanLockTable() {
-
- try (Connection conn = dataSource.getConnection();
- PreparedStatement statement = conn.prepareStatement(
- "DELETE FROM pooling.locks WHERE host = ? OR expirationTime < now()")
- ) {
-
- statement.setString(1, uuid.toString());
- statement.executeUpdate();
-
- } catch (SQLException e) {
- logger.error("error in refreshLockTable()", e);
- }
-
- }
-}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java
deleted file mode 100644
index 0ed5930d..00000000
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import java.util.Properties;
-import org.onap.policy.common.utils.properties.BeanConfigurator;
-import org.onap.policy.common.utils.properties.Property;
-import org.onap.policy.common.utils.properties.exception.PropertyException;
-
-
-public class DistributedLockingProperties {
-
- /**
- * Feature properties all begin with this prefix.
- */
- public static final String PREFIX = "distributed.locking.";
-
- public static final String DB_DRIVER = "javax.persistence.jdbc.driver";
- public static final String DB_URL = "javax.persistence.jdbc.url";
- public static final String DB_USER = "javax.persistence.jdbc.user";
- public static final String DB_PWD = "javax.persistence.jdbc.password";
-
- /**
- * Properties from which this was constructed.
- */
- private final Properties source;
-
- /**
- * Database driver.
- */
- @Property(name = DB_DRIVER)
- private String dbDriver;
-
- /**
- * Database url.
- */
- @Property(name = DB_URL)
- private String dbUrl;
-
- /**
- * Database user.
- */
- @Property(name = DB_USER)
- private String dbUser;
-
- /**
- * Database password.
- */
- @Property(name = DB_PWD)
- private String dbPwd;
-
- /**
- * Constructs the object, populating fields from the properties.
- *
- * @param props properties from which to configure this
- * @throws PropertyException if an error occurs
- */
- public DistributedLockingProperties(Properties props) throws PropertyException {
- source = props;
-
- new BeanConfigurator().configureFromProperties(this, props);
- }
-
-
- public Properties getSource() {
- return source;
- }
-
-
- public String getDbDriver() {
- return dbDriver;
- }
-
-
- public String getDbUrl() {
- return dbUrl;
- }
-
-
- public String getDbUser() {
- return dbUser;
- }
-
-
- public String getDbPwd() {
- return dbPwd;
- }
-
-
- public void setDbDriver(String dbDriver) {
- this.dbDriver = dbDriver;
- }
-
-
- public void setDbUrl(String dbUrl) {
- this.dbUrl = dbUrl;
- }
-
-
- public void setDbUser(String dbUser) {
- this.dbUser = dbUser;
- }
-
-
- public void setDbPwd(String dbPwd) {
- this.dbPwd = dbPwd;
- }
-
-}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java
deleted file mode 100644
index 42e1f92f..00000000
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.UUID;
-import org.apache.commons.dbcp2.BasicDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TargetLock {
-
- private static final Logger logger = LoggerFactory.getLogger(TargetLock.class);
-
- /**
- * The Target resource we want to lock.
- */
- private String resourceId;
-
- /**
- * Data source used to connect to the DB containing locks.
- */
- private BasicDataSource dataSource;
-
- /**
- * UUID .
- */
- private UUID uuid;
-
- /**
- * Owner.
- */
- private String owner;
-
- /**
- * Constructs a TargetLock object.
- *
- * @param resourceId ID of the entity we want to lock
- * @param dataSource used to connect to the DB containing locks
- */
- public TargetLock(String resourceId, UUID uuid, String owner, BasicDataSource dataSource) {
- this.resourceId = resourceId;
- this.uuid = uuid;
- this.owner = owner;
- this.dataSource = dataSource;
- }
-
- /**
- * Obtain a lock.
- * @param holdSec the amount of time, in seconds, that the lock should be held
- */
- public boolean lock(int holdSec) {
-
- return grabLock(holdSec);
- }
-
- /**
- * Refresh a lock.
- *
- * @param holdSec the amount of time, in seconds, that the lock should be held
- * @return {@code true} if the lock was refreshed, {@code false} if the resource is
- * not currently locked by the given owner
- */
- public boolean refresh(int holdSec) {
- return updateLock(holdSec);
- }
-
- /**
- * Unlock a resource by deleting it's associated record in the db.
- */
- public boolean unlock() {
- return deleteLock();
- }
-
- /**
- * "Grabs" lock by attempting to insert a new record in the db.
- * If the insert fails due to duplicate key error resource is already locked
- * so we call secondGrab.
- * @param holdSec the amount of time, in seconds, that the lock should be held
- */
- private boolean grabLock(int holdSec) {
-
- // try to insert a record into the table(thereby grabbing the lock)
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement statement = conn.prepareStatement(
- "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
- + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
-
- int index = 1;
- statement.setString(index++, this.resourceId);
- statement.setString(index++, this.uuid.toString());
- statement.setString(index++, this.owner);
- statement.setInt(index++, holdSec);
- statement.executeUpdate();
- }
-
- catch (SQLException e) {
- logger.error("error in TargetLock.grabLock()", e);
- return secondGrab(holdSec);
- }
-
- return true;
- }
-
- /**
- * A second attempt at grabbing a lock. It first attempts to update the lock in case it is expired.
- * If that fails, it attempts to insert a new record again
- * @param holdSec the amount of time, in seconds, that the lock should be held
- */
- private boolean secondGrab(int holdSec) {
-
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement updateStatement = conn.prepareStatement(
- "UPDATE pooling.locks SET host = ?, owner = ?, "
- + "expirationTime = timestampadd(second, ?, now()) "
- + "WHERE resourceId = ? AND expirationTime < now()");
-
- PreparedStatement insertStatement = conn.prepareStatement(
- "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
- + "values (?, ?, ?, timestampadd(second, ?, now()))");) {
-
- int index = 1;
- updateStatement.setString(index++, this.uuid.toString());
- updateStatement.setString(index++, this.owner);
- updateStatement.setInt(index++, holdSec);
- updateStatement.setString(index++, this.resourceId);
-
- // The lock was expired and we grabbed it.
- // return true
- if (updateStatement.executeUpdate() == 1) {
- return true;
- }
-
- // If our update does not return 1 row, the lock either has not expired
- // or it was removed. Try one last grab
- else {
- index = 1;
- insertStatement.setString(index++, this.resourceId);
- insertStatement.setString(index++, this.uuid.toString());
- insertStatement.setString(index++, this.owner);
- insertStatement.setInt(index++, holdSec);
-
- // If our insert returns 1 we successfully grabbed the lock
- return (insertStatement.executeUpdate() == 1);
- }
-
- } catch (SQLException e) {
- logger.error("error in TargetLock.secondGrab()", e);
- return false;
- }
-
- }
-
- /**
- * Updates the DB record associated with the lock.
- *
- * @param holdSec the amount of time, in seconds, that the lock should be held
- * @return {@code true} if the record was updated, {@code false} otherwise
- */
- private boolean updateLock(int holdSec) {
-
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement updateStatement = conn.prepareStatement(
- "UPDATE pooling.locks SET host = ?, owner = ?, "
- + "expirationTime = timestampadd(second, ?, now()) "
- + "WHERE resourceId = ? AND owner = ? AND expirationTime >= now()")) {
-
- int index = 1;
- updateStatement.setString(index++, this.uuid.toString());
- updateStatement.setString(index++, this.owner);
- updateStatement.setInt(index++, holdSec);
- updateStatement.setString(index++, this.resourceId);
- updateStatement.setString(index++, this.owner);
-
- // refresh succeeded iff a record was updated
- return (updateStatement.executeUpdate() == 1);
-
- } catch (SQLException e) {
- logger.error("error in TargetLock.refreshLock()", e);
- return false;
- }
-
- }
-
- /**
- *To remove a lock we simply delete the record from the db .
- */
- private boolean deleteLock() {
-
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement deleteStatement = conn.prepareStatement(
- "DELETE FROM pooling.locks WHERE resourceId = ? AND owner = ? AND host = ?")) {
-
- deleteStatement.setString(1, this.resourceId);
- deleteStatement.setString(2, this.owner);
- deleteStatement.setString(3, this.uuid.toString());
-
- return (deleteStatement.executeUpdate() == 1);
-
- } catch (SQLException e) {
- logger.error("error in TargetLock.deleteLock()", e);
- return false;
- }
-
- }
-
- /**
- * Is the lock active.
- */
- public boolean isActive() {
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement selectStatement = conn.prepareStatement(
- "SELECT * FROM pooling.locks "
- + "WHERE resourceId = ? AND host = ? AND owner= ? AND expirationTime >= now()")) {
-
- selectStatement.setString(1, this.resourceId);
- selectStatement.setString(2, this.uuid.toString());
- selectStatement.setString(3, this.owner);
- try (ResultSet result = selectStatement.executeQuery()) {
-
- // This will return true if the
- // query returned at least one row
- return result.first();
- }
-
- }
-
- catch (SQLException e) {
- logger.error("error in TargetLock.isActive()", e);
- return false;
- }
-
- }
-
- /**
- * Is the resource locked.
- */
- public boolean isLocked() {
-
- try (Connection conn = dataSource.getConnection();
- PreparedStatement selectStatement = conn
- .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND expirationTime >= now()")) {
-
- selectStatement.setString(1, this.resourceId);
- try (ResultSet result = selectStatement.executeQuery()) {
- // This will return true if the
- // query returned at least one row
- return result.first();
- }
- } catch (SQLException e) {
- logger.error("error in TargetLock.isActive()", e);
- return false;
- }
- }
-
-}