summaryrefslogtreecommitdiffstats
path: root/feature-distributed-locking/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-distributed-locking/src/main/java')
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java936
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java (renamed from feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java)12
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java136
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java179
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java127
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java282
6 files changed, 1078 insertions, 594 deletions
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java
new file mode 100644
index 00000000..523c0d93
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java
@@ -0,0 +1,936 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.commons.dbcp2.BasicDataSourceFactory;
+import org.onap.policy.common.utils.network.NetworkUtil;
+import org.onap.policy.drools.core.lock.AlwaysFailLock;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.LockImpl;
+import org.onap.policy.drools.core.lock.LockState;
+import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
+import org.onap.policy.drools.features.PolicyEngineFeatureApi;
+import org.onap.policy.drools.persistence.SystemPersistenceConstants;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Distributed implementation of the Lock Feature. Maintains locks across servers using a
+ * shared DB.
+ *
+ * <p/>
+ * Note: this implementation does <i>not</i> honor the waitForLocks={@code true}
+ * parameter.
+ *
+ * <p/>
+ * Additional Notes:
+ * <dl>
+ * <li>The <i>owner</i> field in the DB is not derived from the lock's owner info, but is
+ * instead populated with the {@link #uuidString}.</li>
+ * <li>A periodic check of the DB is made to determine if any of the locks have
+ * expired.</li>
+ * <li>When a lock is deserialized, it will not initially appear in this feature's map; it
+ * will be added to the map once free() or extend() is invoked, provided there isn't
+ * already an entry. In addition, it initially has the host and UUID of the feature
+ * instance that created it. However, as soon as doExtend() completes successfully, the
+ * host and UUID of the lock will be updated to reflect the values within this feature
+ * instance.</li>
+ * </dl>
+ */
+public class DistributedLockManager
+ implements PolicyResourceLockManager, PolicyEngineFeatureApi {
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class);
+
+ private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
+ private static final String LOCK_LOST_MSG = "lock lost";
+ private static final String NOT_LOCKED_MSG = "not locked";
+
+ @Getter(AccessLevel.PROTECTED)
+ @Setter(AccessLevel.PROTECTED)
+ private static DistributedLockManager latestInstance = null;
+
+
+ /**
+ * Name of the host on which this JVM is running.
+ */
+ @Getter
+ private final String hostName;
+
+ /**
+ * UUID of this object.
+ */
+ @Getter
+ private final String uuidString = UUID.randomUUID().toString();
+
+ /**
+ * Maps a resource to the lock that owns it, or is awaiting a request for it. Once a
+ * lock is added to the map, it remains in the map until the lock is lost or until the
+ * unlock request completes.
+ */
+ private final Map<String, DistributedLock> resource2lock = new ConcurrentHashMap<>();
+
+ /**
+ * Engine with which this manager is associated.
+ */
+ private PolicyEngine engine;
+
+ /**
+ * Feature properties.
+ */
+ private DistributedLockProperties featProps;
+
+ /**
+ * Thread pool used to check for lock expiration and to notify owners when locks are
+ * granted or lost.
+ */
+ private ScheduledExecutorService exsvc = null;
+
+ /**
+ * Data source used to connect to the DB.
+ */
+ private BasicDataSource dataSource = null;
+
+
+ /**
+ * Constructs the object.
+ */
+ public DistributedLockManager() {
+ this.hostName = NetworkUtil.getHostname();
+ }
+
+ @Override
+ public int getSequenceNumber() {
+ return 1000;
+ }
+
+ @Override
+ public boolean isAlive() {
+ return (exsvc != null);
+ }
+
+ @Override
+ public boolean start() {
+ // handled via engine API
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ // handled via engine API
+ return true;
+ }
+
+ @Override
+ public void shutdown() {
+ // handled via engine API
+ }
+
+ @Override
+ public boolean isLocked() {
+ return false;
+ }
+
+ @Override
+ public boolean lock() {
+ return true;
+ }
+
+ @Override
+ public boolean unlock() {
+ return true;
+ }
+
+ @Override
+ public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) {
+
+ try {
+ this.engine = engine;
+ this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME));
+ this.exsvc = getThreadPool();
+ this.dataSource = makeDataSource();
+
+ return this;
+
+ } catch (Exception e) {
+ throw new DistributedLockManagerException(e);
+ }
+ }
+
+ @Override
+ public boolean afterStart(PolicyEngine engine) {
+
+ try {
+ exsvc.execute(this::deleteExpiredDbLocks);
+ exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
+
+ setLatestInstance(this);
+
+ } catch (Exception e) {
+ throw new DistributedLockManagerException(e);
+ }
+
+ return false;
+ }
+
+ /**
+ * Make data source.
+ *
+ * @return a new, pooled data source
+ * @throws Exception exception
+ */
+ protected BasicDataSource makeDataSource() throws Exception {
+ Properties props = new Properties();
+ props.put("driverClassName", featProps.getDbDriver());
+ props.put("url", featProps.getDbUrl());
+ props.put("username", featProps.getDbUser());
+ props.put("password", featProps.getDbPwd());
+ props.put("testOnBorrow", "true");
+ props.put("poolPreparedStatements", "true");
+
+ // additional properties are listed in the GenericObjectPool API
+
+ return BasicDataSourceFactory.createDataSource(props);
+ }
+
+ /**
+ * Deletes expired locks from the DB.
+ */
+ private void deleteExpiredDbLocks() {
+ logger.info("deleting all expired locks from the DB");
+
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn
+ .prepareStatement("DELETE FROM pooling.locks WHERE expirationTime <= now()")) {
+
+ int ndel = stmt.executeUpdate();
+ logger.info("deleted {} expired locks from the DB", ndel);
+
+ } catch (SQLException e) {
+ logger.warn("failed to delete expired locks from the DB", e);
+ }
+ }
+
+ /**
+ * Closes the data source. Does <i>not</i> invoke any lock call-backs.
+ */
+ @Override
+ public boolean afterStop(PolicyEngine engine) {
+ exsvc = null;
+ closeDataSource();
+ return false;
+ }
+
+ /**
+ * Closes {@link #dataSource} and sets it to {@code null}.
+ */
+ private void closeDataSource() {
+ try {
+ if (dataSource != null) {
+ dataSource.close();
+ }
+
+ } catch (SQLException e) {
+ logger.error("cannot close the distributed locking DB", e);
+ }
+
+ dataSource = null;
+ }
+
+ @Override
+ public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback,
+ boolean waitForLock) {
+
+ if (latestInstance != this) {
+ AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback);
+ lock.notifyUnavailable();
+ return lock;
+ }
+
+ DistributedLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback);
+
+ DistributedLock existingLock = resource2lock.putIfAbsent(resourceId, lock);
+
+ // do these outside of compute() to avoid blocking other map operations
+ if (existingLock == null) {
+ logger.debug("added lock to map {}", lock);
+ lock.scheduleRequest(lock::doLock);
+ } else {
+ lock.deny("resource is busy", true);
+ }
+
+ return lock;
+ }
+
+ /**
+ * Checks for expired locks.
+ */
+ private void checkExpired() {
+
+ try {
+ logger.info("checking for expired locks");
+ Set<String> expiredIds = new HashSet<>(resource2lock.keySet());
+ identifyDbLocks(expiredIds);
+ expireLocks(expiredIds);
+
+ exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
+
+ } catch (RejectedExecutionException e) {
+ logger.warn("thread pool is no longer accepting requests", e);
+
+ } catch (SQLException | RuntimeException e) {
+ logger.error("error checking expired locks", e);
+ exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS);
+ }
+
+ logger.info("done checking for expired locks");
+ }
+
+ /**
+ * Identifies this feature instance's locks that the DB indicates are still active.
+ *
+ * @param expiredIds IDs of resources that have expired locks. If a resource is still
+ * locked, it's ID is removed from this set
+ * @throws SQLException if a DB error occurs
+ */
+ private void identifyDbLocks(Set<String> expiredIds) throws SQLException {
+ /*
+ * We could query for host and UUIDs that actually appear within the locks, but
+ * those might change while the query is running so no real value in doing that.
+ * On the other hand, there's only a brief instance between the time a
+ * deserialized lock is added to this feature instance and its doExtend() method
+ * updates its host and UUID to match this feature instance. If this happens to
+ * run during that brief instance, then the lock will be lost and the callback
+ * invoked. It isn't worth complicating this code further to handle those highly
+ * unlikely cases.
+ */
+
+ // @formatter:off
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn.prepareStatement(
+ "SELECT resourceId FROM pooling.locks WHERE host=? AND owner=? AND expirationTime > now()")) {
+ // @formatter:on
+
+ stmt.setString(1, hostName);
+ stmt.setString(2, uuidString);
+
+ try (ResultSet resultSet = stmt.executeQuery()) {
+ while (resultSet.next()) {
+ String resourceId = resultSet.getString(1);
+
+ // we have now seen this resource id
+ expiredIds.remove(resourceId);
+ }
+ }
+ }
+ }
+
+ /**
+ * Expires locks for the resources that no longer appear within the DB.
+ *
+ * @param expiredIds IDs of resources that have expired locks
+ */
+ private void expireLocks(Set<String> expiredIds) {
+ for (String resourceId : expiredIds) {
+ AtomicReference<DistributedLock> lockref = new AtomicReference<>(null);
+
+ resource2lock.computeIfPresent(resourceId, (key, lock) -> {
+ if (lock.isActive()) {
+ // it thinks it's active, but it isn't - remove from the map
+ lockref.set(lock);
+ return null;
+ }
+
+ return lock;
+ });
+
+ DistributedLock lock = lockref.get();
+ if (lock != null) {
+ logger.debug("removed lock from map {}", lock);
+ lock.deny(LOCK_LOST_MSG, false);
+ }
+ }
+ }
+
+ /**
+ * Distributed Lock implementation.
+ */
+ public static class DistributedLock extends LockImpl {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Feature containing this lock. May be {@code null} until the feature is
+ * identified. Note: this can only be null if the lock has been de-serialized.
+ */
+ private transient DistributedLockManager feature;
+
+ /**
+ * Host name from the feature instance that created this object. Replaced with the
+ * host name from the current feature instance whenever the lock is successfully
+ * extended.
+ */
+ private String hostName;
+
+ /**
+ * UUID string from the feature instance that created this object. Replaced with
+ * the UUID string from the current feature instance whenever the lock is
+ * successfully extended.
+ */
+ private String uuidString;
+
+ /**
+ * {@code True} if the lock is busy making a request, {@code false} otherwise.
+ */
+ private transient boolean busy = false;
+
+ /**
+ * Request to be performed.
+ */
+ private transient RunnableWithEx request = null;
+
+ /**
+ * Number of times we've retried a request.
+ */
+ private transient int nretries = 0;
+
+ /**
+ * Constructs the object.
+ */
+ public DistributedLock() {
+ this.hostName = "";
+ this.uuidString = "";
+ }
+
+ /**
+ * Constructs the object.
+ *
+ * @param state initial state of the lock
+ * @param resourceId identifier of the resource to be locked
+ * @param ownerKey information identifying the owner requesting the lock
+ * @param holdSec amount of time, in seconds, for which the lock should be held,
+ * after which it will automatically be released
+ * @param callback callback to be invoked once the lock is granted, or
+ * subsequently lost; must not be {@code null}
+ * @param feature feature containing this lock
+ */
+ public DistributedLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback,
+ DistributedLockManager feature) {
+ super(state, resourceId, ownerKey, holdSec, callback);
+
+ this.feature = feature;
+ this.hostName = feature.hostName;
+ this.uuidString = feature.uuidString;
+ }
+
+ /**
+ * Grants this lock. The notification is <i>always</i> invoked via the
+ * <i>foreground</i> thread.
+ */
+ protected void grant() {
+ synchronized (this) {
+ if (isUnavailable()) {
+ return;
+ }
+
+ setState(LockState.ACTIVE);
+ }
+
+ logger.info("lock granted: {}", this);
+
+ notifyAvailable();
+ }
+
+ /**
+ * Permanently denies this lock.
+ *
+ * @param reason the reason the lock was denied
+ * @param foreground {@code true} if the callback can be invoked in the current
+ * (i.e., foreground) thread, {@code false} if it should be invoked via the
+ * executor
+ */
+ protected void deny(String reason, boolean foreground) {
+ synchronized (this) {
+ setState(LockState.UNAVAILABLE);
+ }
+
+ logger.info("{}: {}", reason, this);
+
+ if (feature == null || foreground) {
+ notifyUnavailable();
+
+ } else {
+ feature.exsvc.execute(this::notifyUnavailable);
+ }
+ }
+
+ @Override
+ public boolean free() {
+ // do a quick check of the state
+ if (isUnavailable()) {
+ return false;
+ }
+
+ logger.info("releasing lock: {}", this);
+
+ if (!attachFeature()) {
+ setState(LockState.UNAVAILABLE);
+ return false;
+ }
+
+ AtomicBoolean result = new AtomicBoolean(false);
+
+ feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
+ if (curlock == this && !isUnavailable()) {
+ // this lock was the owner
+ result.set(true);
+ setState(LockState.UNAVAILABLE);
+
+ /*
+ * NOTE: do NOT return null; curlock must remain until doUnlock
+ * completes.
+ */
+ }
+
+ return curlock;
+ });
+
+ if (result.get()) {
+ scheduleRequest(this::doUnlock);
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public void extend(int holdSec, LockCallback callback) {
+ if (holdSec < 0) {
+ throw new IllegalArgumentException("holdSec is negative");
+ }
+
+ setHoldSec(holdSec);
+ setCallback(callback);
+
+ // do a quick check of the state
+ if (isUnavailable() || !attachFeature()) {
+ deny(LOCK_LOST_MSG, true);
+ return;
+ }
+
+ AtomicBoolean success = new AtomicBoolean(false);
+
+ feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
+ if (curlock == this && !isUnavailable()) {
+ success.set(true);
+ setState(LockState.WAITING);
+ }
+
+ // note: leave it in the map until doUnlock() removes it
+
+ return curlock;
+ });
+
+ if (success.get()) {
+ scheduleRequest(this::doExtend);
+
+ } else {
+ deny(NOT_LOCKED_MSG, true);
+ }
+ }
+
+ /**
+ * Attaches to the feature instance, if not already attached.
+ *
+ * @return {@code true} if the lock is now attached to a feature, {@code false}
+ * otherwise
+ */
+ private synchronized boolean attachFeature() {
+ if (feature != null) {
+ // already attached
+ return true;
+ }
+
+ feature = latestInstance;
+ if (feature == null) {
+ logger.warn("no feature yet for {}", this);
+ return false;
+ }
+
+ // put this lock into the map
+ feature.resource2lock.putIfAbsent(getResourceId(), this);
+
+ return true;
+ }
+
+ /**
+ * Schedules a request for execution.
+ *
+ * @param schedreq the request that should be scheduled
+ */
+ private synchronized void scheduleRequest(RunnableWithEx schedreq) {
+ logger.debug("schedule lock action {}", this);
+ nretries = 0;
+ request = schedreq;
+ feature.exsvc.execute(this::doRequest);
+ }
+
+ /**
+ * Reschedules a request for execution, if there is not already a request in the
+ * queue, and if the retry count has not been exhausted.
+ *
+ * @param req request to be rescheduled
+ */
+ private void rescheduleRequest(RunnableWithEx req) {
+ synchronized (this) {
+ if (request != null) {
+ // a new request has already been scheduled - it supersedes "req"
+ logger.debug("not rescheduling lock action {}", this);
+ return;
+ }
+
+ if (nretries++ < feature.featProps.getMaxRetries()) {
+ logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this);
+ request = req;
+ feature.exsvc.schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS);
+ return;
+ }
+ }
+
+ logger.warn("retry count {} exhausted for lock: {}", feature.featProps.getMaxRetries(), this);
+ removeFromMap();
+ }
+
+ /**
+ * Gets, and removes, the next request from the queue. Clears {@link #busy} if
+ * there are no more requests in the queue.
+ *
+ * @param prevReq the previous request that was just run
+ *
+ * @return the next request, or {@code null} if the queue is empty
+ */
+ private synchronized RunnableWithEx getNextRequest(RunnableWithEx prevReq) {
+ if (request == null || request == prevReq) {
+ logger.debug("no more requests for {}", this);
+ busy = false;
+ return null;
+ }
+
+ RunnableWithEx req = request;
+ request = null;
+
+ return req;
+ }
+
+ /**
+ * Executes the current request, if none are currently executing.
+ */
+ private void doRequest() {
+ synchronized (this) {
+ if (busy) {
+ // another thread is already processing the request(s)
+ return;
+ }
+ busy = true;
+ }
+
+ /*
+ * There is a race condition wherein this thread could invoke run() while the
+ * next scheduled thread checks the busy flag and finds that work is being
+ * done and returns, leaving the next work item in "request". In that case,
+ * the next work item may never be executed, thus we use a loop here, instead
+ * of just executing a single request.
+ */
+ RunnableWithEx req = null;
+ while ((req = getNextRequest(req)) != null) {
+ if (feature.resource2lock.get(getResourceId()) != this) {
+ /*
+ * no longer in the map - don't apply the action, as it may interfere
+ * with any newly added Lock object
+ */
+ logger.debug("discard lock action {}", this);
+ synchronized (this) {
+ busy = false;
+ }
+ return;
+ }
+
+ try {
+ /*
+ * Run the request. If it throws an exception, then it will be
+ * rescheduled for execution a little later.
+ */
+ req.run();
+
+ } catch (SQLException e) {
+ logger.warn("request failed for lock: {}", this, e);
+
+ if (feature.featProps.isTransient(e.getErrorCode())) {
+ // retry the request a little later
+ rescheduleRequest(req);
+ } else {
+ removeFromMap();
+ }
+
+ } catch (RuntimeException e) {
+ logger.warn("request failed for lock: {}", this, e);
+ removeFromMap();
+ }
+ }
+ }
+
+ /**
+ * Attempts to add a lock to the DB. Generates a callback, indicating success or
+ * failure.
+ *
+ * @throws SQLException if a DB error occurs
+ */
+ private void doLock() throws SQLException {
+ if (!isWaiting()) {
+ logger.debug("discard doLock {}", this);
+ return;
+ }
+
+ /*
+ * There is a small window in which a client could invoke free() before the DB
+ * is updated. In that case, doUnlock will be added to the queue to run after
+ * this, which will delete the record, as desired. In addition, grant() will
+ * not do anything, because the lock state will have been set to UNAVAILABLE
+ * by free().
+ */
+
+ logger.debug("doLock {}", this);
+ try (Connection conn = feature.dataSource.getConnection()) {
+ boolean success = false;
+ try {
+ success = doDbInsert(conn);
+
+ } catch (SQLException e) {
+ logger.info("failed to insert lock record - attempting update: {}", this, e);
+ success = doDbUpdate(conn);
+ }
+
+ if (success) {
+ grant();
+ return;
+ }
+ }
+
+ removeFromMap();
+ }
+
+ /**
+ * Attempts to remove a lock from the DB. Does <i>not</i> generate a callback if
+ * it fails, as this should only be executed in response to a call to
+ * {@link #free()}.
+ *
+ * @throws SQLException if a DB error occurs
+ */
+ private void doUnlock() throws SQLException {
+ logger.debug("unlock {}", this);
+ try (Connection conn = feature.dataSource.getConnection()) {
+ doDbDelete(conn);
+ }
+
+ removeFromMap();
+ }
+
+ /**
+ * Attempts to extend a lock in the DB. Generates a callback, indicating success
+ * or failure.
+ *
+ * @throws SQLException if a DB error occurs
+ */
+ private void doExtend() throws SQLException {
+ if (!isWaiting()) {
+ logger.debug("discard doExtend {}", this);
+ return;
+ }
+
+ /*
+ * There is a small window in which a client could invoke free() before the DB
+ * is updated. In that case, doUnlock will be added to the queue to run after
+ * this, which will delete the record, as desired. In addition, grant() will
+ * not do anything, because the lock state will have been set to UNAVAILABLE
+ * by free().
+ */
+
+ logger.debug("doExtend {}", this);
+ try (Connection conn = feature.dataSource.getConnection()) {
+ /*
+ * invoker may have called extend() before free() had a chance to insert
+ * the record, thus we have to try to insert, if the update fails
+ */
+ if (doDbUpdate(conn) || doDbInsert(conn)) {
+ grant();
+ return;
+ }
+ }
+
+ removeFromMap();
+ }
+
+ /**
+ * Inserts the lock into the DB.
+ *
+ * @param conn DB connection
+ * @return {@code true} if a record was successfully inserted, {@code false}
+ * otherwise
+ * @throws SQLException if a DB error occurs
+ */
+ protected boolean doDbInsert(Connection conn) throws SQLException {
+ logger.debug("insert lock record {}", this);
+ try (PreparedStatement stmt =
+ conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
+ + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
+
+ stmt.setString(1, getResourceId());
+ stmt.setString(2, feature.hostName);
+ stmt.setString(3, feature.uuidString);
+ stmt.setInt(4, getHoldSec());
+
+ stmt.executeUpdate();
+
+ this.hostName = feature.hostName;
+ this.uuidString = feature.uuidString;
+
+ return true;
+ }
+ }
+
+ /**
+ * Updates the lock in the DB.
+ *
+ * @param conn DB connection
+ * @return {@code true} if a record was successfully updated, {@code false}
+ * otherwise
+ * @throws SQLException if a DB error occurs
+ */
+ protected boolean doDbUpdate(Connection conn) throws SQLException {
+ logger.debug("update lock record {}", this);
+ try (PreparedStatement stmt =
+ conn.prepareStatement("UPDATE pooling.locks SET resourceId=?, host=?, owner=?,"
+ + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?"
+ + " AND ((host=? AND owner=?) OR expirationTime < now())")) {
+
+ stmt.setString(1, getResourceId());
+ stmt.setString(2, feature.hostName);
+ stmt.setString(3, feature.uuidString);
+ stmt.setInt(4, getHoldSec());
+
+ stmt.setString(5, getResourceId());
+ stmt.setString(6, this.hostName);
+ stmt.setString(7, this.uuidString);
+
+ if (stmt.executeUpdate() != 1) {
+ return false;
+ }
+
+ this.hostName = feature.hostName;
+ this.uuidString = feature.uuidString;
+
+ return true;
+ }
+ }
+
+ /**
+ * Deletes the lock from the DB.
+ *
+ * @param conn DB connection
+ * @throws SQLException if a DB error occurs
+ */
+ protected void doDbDelete(Connection conn) throws SQLException {
+ logger.debug("delete lock record {}", this);
+ try (PreparedStatement stmt =
+ conn.prepareStatement("DELETE pooling.locks WHERE resourceId=? AND host=? AND owner=?")) {
+
+ stmt.setString(1, getResourceId());
+ stmt.setString(2, this.hostName);
+ stmt.setString(3, this.uuidString);
+
+ stmt.executeUpdate();
+ }
+ }
+
+ /**
+ * Removes the lock from the map, and sends a notification using the current
+ * thread.
+ */
+ private void removeFromMap() {
+ logger.debug("remove lock from map {}", this);
+ feature.resource2lock.remove(getResourceId(), this);
+
+ synchronized (this) {
+ if (!isUnavailable()) {
+ deny(LOCK_LOST_MSG, true);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "DistributedLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey="
+ + getOwnerKey() + ", holdSec=" + getHoldSec() + ", hostName=" + hostName + ", uuidString="
+ + uuidString + "]";
+ }
+ }
+
+ @FunctionalInterface
+ private static interface RunnableWithEx {
+ void run() throws SQLException;
+ }
+
+ // these may be overridden by junit tests
+
+ protected Properties getProperties(String fileName) {
+ return SystemPersistenceConstants.getManager().getProperties(fileName);
+ }
+
+ protected ScheduledExecutorService getThreadPool() {
+ return engine.getExecutorService();
+ }
+
+ protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
+ LockCallback callback) {
+ return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this);
+ }
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java
index 55fc4fab..e720f9a1 100644
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* feature-distributed-locking
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,15 +20,15 @@
package org.onap.policy.distributed.locking;
-public class DistributedLockingFeatureException extends RuntimeException {
+public class DistributedLockManagerException extends RuntimeException {
private static final long serialVersionUID = 1L;
/**
* Constructor.
- *
+ *
* @param ex exception to be wrapped
*/
- public DistributedLockingFeatureException(Exception ex) {
+ public DistributedLockManagerException(Exception ex) {
super(ex);
}
}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java
new file mode 100644
index 00000000..f470c8e2
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java
@@ -0,0 +1,136 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.utils.properties.BeanConfigurator;
+import org.onap.policy.common.utils.properties.Property;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+
+
+@Getter
+@Setter
+public class DistributedLockProperties {
+ public static final String PREFIX = "distributed.locking.";
+
+ public static final String DB_DRIVER = "javax.persistence.jdbc.driver";
+ public static final String DB_URL = "javax.persistence.jdbc.url";
+ public static final String DB_USER = "javax.persistence.jdbc.user";
+ public static final String DB_PASS = "javax.persistence.jdbc.password";
+ public static final String TRANSIENT_ERROR_CODES = PREFIX + "transient.error.codes";
+ public static final String EXPIRE_CHECK_SEC = PREFIX + "expire.check.seconds";
+ public static final String RETRY_SEC = PREFIX + "retry.seconds";
+ public static final String MAX_RETRIES = PREFIX + "max.retries";
+
+ /**
+ * Database driver.
+ */
+ @Property(name = DB_DRIVER)
+ private String dbDriver;
+
+ /**
+ * Database url.
+ */
+ @Property(name = DB_URL)
+ private String dbUrl;
+
+ /**
+ * Database user.
+ */
+ @Property(name = DB_USER)
+ private String dbUser;
+
+ /**
+ * Database password.
+ */
+ @Property(name = DB_PASS)
+ private String dbPwd;
+
+ /**
+ * Vendor-specific error codes that are "transient", meaning they may go away if the
+ * command is repeated (e.g., connection issue), as opposed to something like a syntax
+ * error or a duplicate key.
+ */
+ @Property(name = TRANSIENT_ERROR_CODES)
+ private String errorCodeStrings;
+
+ private final Set<Integer> transientErrorCodes;
+
+ /**
+ * Time, in seconds, to wait between checks for expired locks.
+ */
+ @Property(name = EXPIRE_CHECK_SEC, defaultValue = "900")
+ private int expireCheckSec;
+
+ /**
+ * Number of seconds to wait before retrying, after a DB error.
+ */
+ @Property(name = RETRY_SEC, defaultValue = "60")
+ private int retrySec;
+
+ /**
+ * Maximum number of times to retry a DB operation.
+ */
+ @Property(name = MAX_RETRIES, defaultValue = "2")
+ private int maxRetries;
+
+ /**
+ * Constructs the object, populating fields from the properties.
+ *
+ * @param props properties from which to configure this
+ * @throws PropertyException if an error occurs
+ */
+ public DistributedLockProperties(Properties props) throws PropertyException {
+ new BeanConfigurator().configureFromProperties(this, props);
+
+ Set<Integer> set = new HashSet<>();
+ for (String text : errorCodeStrings.split(",")) {
+ text = text.trim();
+ if (text.isEmpty()) {
+ continue;
+ }
+
+ try {
+ set.add(Integer.valueOf(text));
+
+ } catch (NumberFormatException e) {
+ throw new PropertyException(TRANSIENT_ERROR_CODES, "errorCodeStrings", e);
+ }
+ }
+
+ transientErrorCodes = Collections.unmodifiableSet(set);
+ }
+
+ /**
+ * Determines if an error is transient.
+ *
+ * @param errorCode error code to check
+ * @return {@code true} if the error is transient, {@code false} otherwise
+ */
+ public boolean isTransient(int errorCode) {
+ return transientErrorCodes.contains(errorCode);
+ }
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java
deleted file mode 100644
index d5e07a30..00000000
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Properties;
-import java.util.UUID;
-import org.apache.commons.dbcp2.BasicDataSource;
-import org.apache.commons.dbcp2.BasicDataSourceFactory;
-import org.onap.policy.common.utils.properties.exception.PropertyException;
-import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi;
-import org.onap.policy.drools.features.PolicyEngineFeatureApi;
-import org.onap.policy.drools.persistence.SystemPersistenceConstants;
-import org.onap.policy.drools.system.PolicyEngine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DistributedLockingFeature implements PolicyEngineFeatureApi, PolicyResourceLockFeatureApi {
-
- /**
- * Logger instance.
- */
- private static final Logger logger = LoggerFactory.getLogger(DistributedLockingFeature.class);
-
- /**
- * Properties Configuration Name.
- */
- public static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
-
- /**
- * Properties for locking feature.
- */
- private DistributedLockingProperties lockProps;
-
- /**
- * Data source used to connect to the DB containing locks.
- */
- private BasicDataSource dataSource;
-
- /**
- * UUID.
- */
- private static final UUID uuid = UUID.randomUUID();
-
- @Override
- public int getSequenceNumber() {
- return 1000;
- }
-
- @Override
- public OperResult beforeLock(String resourceId, String owner, int holdSec) {
-
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.lock(holdSec) ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeRefresh(String resourceId, String owner, int holdSec) {
-
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.refresh(holdSec) ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeUnlock(String resourceId, String owner) {
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.unlock() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeIsLockedBy(String resourceId, String owner) {
- TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource);
-
- return (lock.isActive() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public OperResult beforeIsLocked(String resourceId) {
- TargetLock lock = new TargetLock(resourceId, uuid, "dummyOwner", dataSource);
-
- return (lock.isLocked() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED);
- }
-
- @Override
- public boolean afterStart(PolicyEngine engine) {
-
- try {
- this.lockProps = new DistributedLockingProperties(SystemPersistenceConstants.getManager()
- .getProperties(DistributedLockingFeature.CONFIGURATION_PROPERTIES_NAME));
- this.dataSource = makeDataSource();
- } catch (PropertyException e) {
- logger.error("DistributedLockingFeature feature properies have not been loaded", e);
- throw new DistributedLockingFeatureException(e);
- } catch (InterruptedException e) {
- logger.error("DistributedLockingFeature failed to create data source", e);
- Thread.currentThread().interrupt();
- throw new DistributedLockingFeatureException(e);
- } catch (Exception e) {
- logger.error("DistributedLockingFeature failed to create data source", e);
- throw new DistributedLockingFeatureException(e);
- }
-
- cleanLockTable();
-
- return false;
- }
-
- /**
- * Make data source.
- *
- * @return a new, pooled data source
- * @throws Exception exception
- */
- protected BasicDataSource makeDataSource() throws Exception {
- Properties props = new Properties();
- props.put("driverClassName", lockProps.getDbDriver());
- props.put("url", lockProps.getDbUrl());
- props.put("username", lockProps.getDbUser());
- props.put("password", lockProps.getDbPwd());
- props.put("testOnBorrow", "true");
- props.put("poolPreparedStatements", "true");
-
- // additional properties are listed in the GenericObjectPool API
-
- return BasicDataSourceFactory.createDataSource(props);
- }
-
- /**
- * This method kills the heartbeat thread and calls refreshLockTable which removes
- * any records from the db where the current host is the owner.
- */
- @Override
- public boolean beforeShutdown(PolicyEngine engine) {
- cleanLockTable();
- return false;
- }
-
- /**
- * This method removes all records owned by the current host from the db.
- */
- private void cleanLockTable() {
-
- try (Connection conn = dataSource.getConnection();
- PreparedStatement statement = conn.prepareStatement(
- "DELETE FROM pooling.locks WHERE host = ? OR expirationTime < now()")
- ) {
-
- statement.setString(1, uuid.toString());
- statement.executeUpdate();
-
- } catch (SQLException e) {
- logger.error("error in refreshLockTable()", e);
- }
-
- }
-}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java
deleted file mode 100644
index 0ed5930d..00000000
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import java.util.Properties;
-import org.onap.policy.common.utils.properties.BeanConfigurator;
-import org.onap.policy.common.utils.properties.Property;
-import org.onap.policy.common.utils.properties.exception.PropertyException;
-
-
-public class DistributedLockingProperties {
-
- /**
- * Feature properties all begin with this prefix.
- */
- public static final String PREFIX = "distributed.locking.";
-
- public static final String DB_DRIVER = "javax.persistence.jdbc.driver";
- public static final String DB_URL = "javax.persistence.jdbc.url";
- public static final String DB_USER = "javax.persistence.jdbc.user";
- public static final String DB_PWD = "javax.persistence.jdbc.password";
-
- /**
- * Properties from which this was constructed.
- */
- private final Properties source;
-
- /**
- * Database driver.
- */
- @Property(name = DB_DRIVER)
- private String dbDriver;
-
- /**
- * Database url.
- */
- @Property(name = DB_URL)
- private String dbUrl;
-
- /**
- * Database user.
- */
- @Property(name = DB_USER)
- private String dbUser;
-
- /**
- * Database password.
- */
- @Property(name = DB_PWD)
- private String dbPwd;
-
- /**
- * Constructs the object, populating fields from the properties.
- *
- * @param props properties from which to configure this
- * @throws PropertyException if an error occurs
- */
- public DistributedLockingProperties(Properties props) throws PropertyException {
- source = props;
-
- new BeanConfigurator().configureFromProperties(this, props);
- }
-
-
- public Properties getSource() {
- return source;
- }
-
-
- public String getDbDriver() {
- return dbDriver;
- }
-
-
- public String getDbUrl() {
- return dbUrl;
- }
-
-
- public String getDbUser() {
- return dbUser;
- }
-
-
- public String getDbPwd() {
- return dbPwd;
- }
-
-
- public void setDbDriver(String dbDriver) {
- this.dbDriver = dbDriver;
- }
-
-
- public void setDbUrl(String dbUrl) {
- this.dbUrl = dbUrl;
- }
-
-
- public void setDbUser(String dbUser) {
- this.dbUser = dbUser;
- }
-
-
- public void setDbPwd(String dbPwd) {
- this.dbPwd = dbPwd;
- }
-
-}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java
deleted file mode 100644
index 42e1f92f..00000000
--- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-distributed-locking
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.distributed.locking;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.UUID;
-import org.apache.commons.dbcp2.BasicDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TargetLock {
-
- private static final Logger logger = LoggerFactory.getLogger(TargetLock.class);
-
- /**
- * The Target resource we want to lock.
- */
- private String resourceId;
-
- /**
- * Data source used to connect to the DB containing locks.
- */
- private BasicDataSource dataSource;
-
- /**
- * UUID .
- */
- private UUID uuid;
-
- /**
- * Owner.
- */
- private String owner;
-
- /**
- * Constructs a TargetLock object.
- *
- * @param resourceId ID of the entity we want to lock
- * @param dataSource used to connect to the DB containing locks
- */
- public TargetLock(String resourceId, UUID uuid, String owner, BasicDataSource dataSource) {
- this.resourceId = resourceId;
- this.uuid = uuid;
- this.owner = owner;
- this.dataSource = dataSource;
- }
-
- /**
- * Obtain a lock.
- * @param holdSec the amount of time, in seconds, that the lock should be held
- */
- public boolean lock(int holdSec) {
-
- return grabLock(holdSec);
- }
-
- /**
- * Refresh a lock.
- *
- * @param holdSec the amount of time, in seconds, that the lock should be held
- * @return {@code true} if the lock was refreshed, {@code false} if the resource is
- * not currently locked by the given owner
- */
- public boolean refresh(int holdSec) {
- return updateLock(holdSec);
- }
-
- /**
- * Unlock a resource by deleting it's associated record in the db.
- */
- public boolean unlock() {
- return deleteLock();
- }
-
- /**
- * "Grabs" lock by attempting to insert a new record in the db.
- * If the insert fails due to duplicate key error resource is already locked
- * so we call secondGrab.
- * @param holdSec the amount of time, in seconds, that the lock should be held
- */
- private boolean grabLock(int holdSec) {
-
- // try to insert a record into the table(thereby grabbing the lock)
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement statement = conn.prepareStatement(
- "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
- + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
-
- int index = 1;
- statement.setString(index++, this.resourceId);
- statement.setString(index++, this.uuid.toString());
- statement.setString(index++, this.owner);
- statement.setInt(index++, holdSec);
- statement.executeUpdate();
- }
-
- catch (SQLException e) {
- logger.error("error in TargetLock.grabLock()", e);
- return secondGrab(holdSec);
- }
-
- return true;
- }
-
- /**
- * A second attempt at grabbing a lock. It first attempts to update the lock in case it is expired.
- * If that fails, it attempts to insert a new record again
- * @param holdSec the amount of time, in seconds, that the lock should be held
- */
- private boolean secondGrab(int holdSec) {
-
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement updateStatement = conn.prepareStatement(
- "UPDATE pooling.locks SET host = ?, owner = ?, "
- + "expirationTime = timestampadd(second, ?, now()) "
- + "WHERE resourceId = ? AND expirationTime < now()");
-
- PreparedStatement insertStatement = conn.prepareStatement(
- "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
- + "values (?, ?, ?, timestampadd(second, ?, now()))");) {
-
- int index = 1;
- updateStatement.setString(index++, this.uuid.toString());
- updateStatement.setString(index++, this.owner);
- updateStatement.setInt(index++, holdSec);
- updateStatement.setString(index++, this.resourceId);
-
- // The lock was expired and we grabbed it.
- // return true
- if (updateStatement.executeUpdate() == 1) {
- return true;
- }
-
- // If our update does not return 1 row, the lock either has not expired
- // or it was removed. Try one last grab
- else {
- index = 1;
- insertStatement.setString(index++, this.resourceId);
- insertStatement.setString(index++, this.uuid.toString());
- insertStatement.setString(index++, this.owner);
- insertStatement.setInt(index++, holdSec);
-
- // If our insert returns 1 we successfully grabbed the lock
- return (insertStatement.executeUpdate() == 1);
- }
-
- } catch (SQLException e) {
- logger.error("error in TargetLock.secondGrab()", e);
- return false;
- }
-
- }
-
- /**
- * Updates the DB record associated with the lock.
- *
- * @param holdSec the amount of time, in seconds, that the lock should be held
- * @return {@code true} if the record was updated, {@code false} otherwise
- */
- private boolean updateLock(int holdSec) {
-
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement updateStatement = conn.prepareStatement(
- "UPDATE pooling.locks SET host = ?, owner = ?, "
- + "expirationTime = timestampadd(second, ?, now()) "
- + "WHERE resourceId = ? AND owner = ? AND expirationTime >= now()")) {
-
- int index = 1;
- updateStatement.setString(index++, this.uuid.toString());
- updateStatement.setString(index++, this.owner);
- updateStatement.setInt(index++, holdSec);
- updateStatement.setString(index++, this.resourceId);
- updateStatement.setString(index++, this.owner);
-
- // refresh succeeded iff a record was updated
- return (updateStatement.executeUpdate() == 1);
-
- } catch (SQLException e) {
- logger.error("error in TargetLock.refreshLock()", e);
- return false;
- }
-
- }
-
- /**
- *To remove a lock we simply delete the record from the db .
- */
- private boolean deleteLock() {
-
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement deleteStatement = conn.prepareStatement(
- "DELETE FROM pooling.locks WHERE resourceId = ? AND owner = ? AND host = ?")) {
-
- deleteStatement.setString(1, this.resourceId);
- deleteStatement.setString(2, this.owner);
- deleteStatement.setString(3, this.uuid.toString());
-
- return (deleteStatement.executeUpdate() == 1);
-
- } catch (SQLException e) {
- logger.error("error in TargetLock.deleteLock()", e);
- return false;
- }
-
- }
-
- /**
- * Is the lock active.
- */
- public boolean isActive() {
- try (Connection conn = dataSource.getConnection();
-
- PreparedStatement selectStatement = conn.prepareStatement(
- "SELECT * FROM pooling.locks "
- + "WHERE resourceId = ? AND host = ? AND owner= ? AND expirationTime >= now()")) {
-
- selectStatement.setString(1, this.resourceId);
- selectStatement.setString(2, this.uuid.toString());
- selectStatement.setString(3, this.owner);
- try (ResultSet result = selectStatement.executeQuery()) {
-
- // This will return true if the
- // query returned at least one row
- return result.first();
- }
-
- }
-
- catch (SQLException e) {
- logger.error("error in TargetLock.isActive()", e);
- return false;
- }
-
- }
-
- /**
- * Is the resource locked.
- */
- public boolean isLocked() {
-
- try (Connection conn = dataSource.getConnection();
- PreparedStatement selectStatement = conn
- .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND expirationTime >= now()")) {
-
- selectStatement.setString(1, this.resourceId);
- try (ResultSet result = selectStatement.executeQuery()) {
- // This will return true if the
- // query returned at least one row
- return result.first();
- }
- } catch (SQLException e) {
- logger.error("error in TargetLock.isActive()", e);
- return false;
- }
- }
-
-}